1use std::sync::Arc;
6
7use iota_types::{
8 base_types::{IotaAddress, ObjectID, TransactionDigest},
9 committee::{Committee, EpochId},
10 digests::TransactionEventsDigest,
11 effects::{TransactionEffects, TransactionEvents},
12 error::IotaError,
13 messages_checkpoint::{
14 CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16 },
17 object::Object,
18 storage::{
19 AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20 ObjectStore, ReadStore, RestIndexes, RestStateReader, WriteStore,
21 error::{Error as StorageError, Result},
22 },
23 transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28use tracing::instrument;
29
30use crate::{
31 authority::AuthorityState,
32 checkpoints::CheckpointStore,
33 epoch::committee_store::CommitteeStore,
34 execution_cache::ExecutionCacheTraitPointers,
35 rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
36};
37
38#[derive(Clone)]
39pub struct RocksDbStore {
40 cache_traits: ExecutionCacheTraitPointers,
41
42 committee_store: Arc<CommitteeStore>,
43 checkpoint_store: Arc<CheckpointStore>,
44 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
46 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
47}
48
49impl RocksDbStore {
50 pub fn new(
51 cache_traits: ExecutionCacheTraitPointers,
52 committee_store: Arc<CommitteeStore>,
53 checkpoint_store: Arc<CheckpointStore>,
54 ) -> Self {
55 Self {
56 cache_traits,
57 committee_store,
58 checkpoint_store,
59 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
60 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
61 }
62 }
63
64 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
65 self.cache_traits
66 .object_cache_reader
67 .try_multi_get_objects_by_key(object_keys)
68 }
69
70 pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
71 Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
72 }
73}
74
75impl ReadStore for RocksDbStore {
76 fn try_get_checkpoint_by_digest(
77 &self,
78 digest: &CheckpointDigest,
79 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
80 self.checkpoint_store
81 .get_checkpoint_by_digest(digest)
82 .map_err(Into::into)
83 }
84
85 fn try_get_checkpoint_by_sequence_number(
86 &self,
87 sequence_number: CheckpointSequenceNumber,
88 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
89 self.checkpoint_store
90 .get_checkpoint_by_sequence_number(sequence_number)
91 .map_err(Into::into)
92 }
93
94 fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
95 self.checkpoint_store
96 .get_highest_verified_checkpoint()
97 .map(|maybe_checkpoint| {
98 maybe_checkpoint
99 .expect("storage should have been initialized with genesis checkpoint")
100 })
101 .map_err(Into::into)
102 }
103
104 fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
105 self.checkpoint_store
106 .get_highest_synced_checkpoint()
107 .map(|maybe_checkpoint| {
108 maybe_checkpoint
109 .expect("storage should have been initialized with genesis checkpoint")
110 })
111 .map_err(Into::into)
112 }
113
114 fn try_get_lowest_available_checkpoint(
115 &self,
116 ) -> Result<CheckpointSequenceNumber, StorageError> {
117 let highest_pruned_cp = self
118 .checkpoint_store
119 .get_highest_pruned_checkpoint_seq_number()
120 .map_err(Into::<StorageError>::into)?;
121
122 if highest_pruned_cp == 0 {
123 Ok(0)
124 } else {
125 Ok(highest_pruned_cp + 1)
126 }
127 }
128
129 fn try_get_full_checkpoint_contents_by_sequence_number(
130 &self,
131 sequence_number: CheckpointSequenceNumber,
132 ) -> Result<Option<FullCheckpointContents>, StorageError> {
133 self.checkpoint_store
134 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
135 .map_err(Into::into)
136 }
137
138 fn try_get_full_checkpoint_contents(
139 &self,
140 digest: &CheckpointContentsDigest,
141 ) -> Result<Option<FullCheckpointContents>, StorageError> {
142 if let Some(seq_num) = self
144 .checkpoint_store
145 .get_sequence_number_by_contents_digest(digest)
146 .map_err(iota_types::storage::error::Error::custom)?
147 {
148 let contents = self
149 .checkpoint_store
150 .get_full_checkpoint_contents_by_sequence_number(seq_num)
151 .map_err(iota_types::storage::error::Error::custom)?;
152 if contents.is_some() {
153 return Ok(contents);
154 }
155 }
156
157 self.checkpoint_store
164 .get_checkpoint_contents(digest)
165 .map_err(iota_types::storage::error::Error::custom)?
166 .map(|contents| {
167 let mut transactions = Vec::with_capacity(contents.size());
168 for tx in contents.iter() {
169 if let (Some(t), Some(e)) = (
170 self.try_get_transaction(&tx.transaction)?,
171 self.cache_traits
172 .transaction_cache_reader
173 .try_get_effects(&tx.effects)
174 .map_err(iota_types::storage::error::Error::custom)?,
175 ) {
176 transactions.push(iota_types::base_types::ExecutionData::new(
177 (*t).clone().into_inner(),
178 e,
179 ))
180 } else {
181 return Result::<
182 Option<FullCheckpointContents>,
183 iota_types::storage::error::Error,
184 >::Ok(None);
185 }
186 }
187 Ok(Some(
188 FullCheckpointContents::from_contents_and_execution_data(
189 contents,
190 transactions.into_iter(),
191 ),
192 ))
193 })
194 .transpose()
195 .map(|contents| contents.flatten())
196 .map_err(iota_types::storage::error::Error::custom)
197 }
198
199 fn try_get_committee(
200 &self,
201 epoch: EpochId,
202 ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
203 Ok(self.committee_store.get_committee(&epoch).unwrap())
204 }
205
206 fn try_get_transaction(
207 &self,
208 digest: &TransactionDigest,
209 ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
210 self.cache_traits
211 .transaction_cache_reader
212 .try_get_transaction_block(digest)
213 .map_err(StorageError::custom)
214 }
215
216 fn try_get_transaction_effects(
217 &self,
218 digest: &TransactionDigest,
219 ) -> Result<Option<TransactionEffects>, StorageError> {
220 self.cache_traits
221 .transaction_cache_reader
222 .try_get_executed_effects(digest)
223 .map_err(StorageError::custom)
224 }
225
226 fn try_get_events(
227 &self,
228 digest: &TransactionEventsDigest,
229 ) -> Result<Option<TransactionEvents>, StorageError> {
230 self.cache_traits
231 .transaction_cache_reader
232 .try_get_events(digest)
233 .map_err(StorageError::custom)
234 }
235
236 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
237 self.checkpoint_store
238 .get_highest_executed_checkpoint()
239 .map_err(iota_types::storage::error::Error::custom)?
240 .ok_or_else(|| {
241 iota_types::storage::error::Error::missing("unable to get latest checkpoint")
242 })
243 }
244
245 fn try_get_checkpoint_contents_by_digest(
246 &self,
247 digest: &CheckpointContentsDigest,
248 ) -> iota_types::storage::error::Result<
249 Option<iota_types::messages_checkpoint::CheckpointContents>,
250 > {
251 self.checkpoint_store
252 .get_checkpoint_contents(digest)
253 .map_err(iota_types::storage::error::Error::custom)
254 }
255
256 fn try_get_checkpoint_contents_by_sequence_number(
257 &self,
258 sequence_number: CheckpointSequenceNumber,
259 ) -> iota_types::storage::error::Result<
260 Option<iota_types::messages_checkpoint::CheckpointContents>,
261 > {
262 match self.try_get_checkpoint_by_sequence_number(sequence_number) {
263 Ok(Some(checkpoint)) => {
264 self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
265 }
266 Ok(None) => Ok(None),
267 Err(e) => Err(e),
268 }
269 }
270}
271
272impl ObjectStore for RocksDbStore {
273 fn try_get_object(
274 &self,
275 object_id: &iota_types::base_types::ObjectID,
276 ) -> iota_types::storage::error::Result<Option<Object>> {
277 self.cache_traits.object_store.try_get_object(object_id)
278 }
279
280 fn try_get_object_by_key(
281 &self,
282 object_id: &iota_types::base_types::ObjectID,
283 version: iota_types::base_types::VersionNumber,
284 ) -> iota_types::storage::error::Result<Option<Object>> {
285 self.cache_traits
286 .object_store
287 .try_get_object_by_key(object_id, version)
288 }
289}
290
291impl WriteStore for RocksDbStore {
292 #[instrument(level = "trace", skip_all)]
293 fn try_insert_checkpoint(
294 &self,
295 checkpoint: &VerifiedCheckpoint,
296 ) -> Result<(), iota_types::storage::error::Error> {
297 if let Some(EndOfEpochData {
298 next_epoch_committee,
299 ..
300 }) = checkpoint.end_of_epoch_data.as_ref()
301 {
302 let next_committee = next_epoch_committee.iter().cloned().collect();
303 let committee =
304 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
305 self.try_insert_committee(committee)?;
306 }
307
308 self.checkpoint_store
309 .insert_verified_checkpoint(checkpoint)
310 .map_err(Into::into)
311 }
312
313 fn try_update_highest_synced_checkpoint(
314 &self,
315 checkpoint: &VerifiedCheckpoint,
316 ) -> Result<(), iota_types::storage::error::Error> {
317 let mut locked = self.highest_synced_checkpoint.lock();
318 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
319 return Ok(());
320 }
321 self.checkpoint_store
322 .update_highest_synced_checkpoint(checkpoint)
323 .map_err(iota_types::storage::error::Error::custom)?;
324 *locked = Some(checkpoint.sequence_number);
325 Ok(())
326 }
327
328 fn try_update_highest_verified_checkpoint(
329 &self,
330 checkpoint: &VerifiedCheckpoint,
331 ) -> Result<(), iota_types::storage::error::Error> {
332 let mut locked = self.highest_verified_checkpoint.lock();
333 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
334 return Ok(());
335 }
336 self.checkpoint_store
337 .update_highest_verified_checkpoint(checkpoint)
338 .map_err(iota_types::storage::error::Error::custom)?;
339 *locked = Some(checkpoint.sequence_number);
340 Ok(())
341 }
342
343 fn try_insert_checkpoint_contents(
344 &self,
345 checkpoint: &VerifiedCheckpoint,
346 contents: VerifiedCheckpointContents,
347 ) -> Result<(), iota_types::storage::error::Error> {
348 self.cache_traits
349 .state_sync_store
350 .try_multi_insert_transaction_and_effects(contents.transactions())
351 .map_err(iota_types::storage::error::Error::custom)?;
352 self.checkpoint_store
353 .insert_verified_checkpoint_contents(checkpoint, contents)
354 .map_err(Into::into)
355 }
356
357 fn try_insert_committee(
358 &self,
359 new_committee: Committee,
360 ) -> Result<(), iota_types::storage::error::Error> {
361 self.committee_store
362 .insert_new_committee(&new_committee)
363 .unwrap();
364 Ok(())
365 }
366}
367
368pub struct RestReadStore {
369 state: Arc<AuthorityState>,
370 rocks: RocksDbStore,
371}
372
373impl RestReadStore {
374 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
375 Self { state, rocks }
376 }
377
378 fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
379 self.state.rest_index.as_deref().ok_or_else(|| {
380 iota_types::storage::error::Error::custom("rest index store is disabled")
381 })
382 }
383}
384
385impl ObjectStore for RestReadStore {
386 fn try_get_object(
387 &self,
388 object_id: &iota_types::base_types::ObjectID,
389 ) -> iota_types::storage::error::Result<Option<Object>> {
390 self.rocks.try_get_object(object_id)
391 }
392
393 fn try_get_object_by_key(
394 &self,
395 object_id: &iota_types::base_types::ObjectID,
396 version: iota_types::base_types::VersionNumber,
397 ) -> iota_types::storage::error::Result<Option<Object>> {
398 self.rocks.try_get_object_by_key(object_id, version)
399 }
400}
401
402impl ReadStore for RestReadStore {
403 fn try_get_committee(
404 &self,
405 epoch: EpochId,
406 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
407 self.rocks.try_get_committee(epoch)
408 }
409
410 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
411 self.rocks.try_get_latest_checkpoint()
412 }
413
414 fn try_get_highest_verified_checkpoint(
415 &self,
416 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
417 self.rocks.try_get_highest_verified_checkpoint()
418 }
419
420 fn try_get_highest_synced_checkpoint(
421 &self,
422 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
423 self.rocks.try_get_highest_synced_checkpoint()
424 }
425
426 fn try_get_lowest_available_checkpoint(
427 &self,
428 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
429 self.rocks.try_get_lowest_available_checkpoint()
430 }
431
432 fn try_get_checkpoint_by_digest(
433 &self,
434 digest: &CheckpointDigest,
435 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
436 self.rocks.try_get_checkpoint_by_digest(digest)
437 }
438
439 fn try_get_checkpoint_by_sequence_number(
440 &self,
441 sequence_number: CheckpointSequenceNumber,
442 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
443 self.rocks
444 .try_get_checkpoint_by_sequence_number(sequence_number)
445 }
446
447 fn try_get_checkpoint_contents_by_digest(
448 &self,
449 digest: &CheckpointContentsDigest,
450 ) -> iota_types::storage::error::Result<
451 Option<iota_types::messages_checkpoint::CheckpointContents>,
452 > {
453 self.rocks.try_get_checkpoint_contents_by_digest(digest)
454 }
455
456 fn try_get_checkpoint_contents_by_sequence_number(
457 &self,
458 sequence_number: CheckpointSequenceNumber,
459 ) -> iota_types::storage::error::Result<
460 Option<iota_types::messages_checkpoint::CheckpointContents>,
461 > {
462 self.rocks
463 .try_get_checkpoint_contents_by_sequence_number(sequence_number)
464 }
465
466 fn try_get_transaction(
467 &self,
468 digest: &TransactionDigest,
469 ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
470 self.rocks.try_get_transaction(digest)
471 }
472
473 fn try_get_transaction_effects(
474 &self,
475 digest: &TransactionDigest,
476 ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
477 self.rocks.try_get_transaction_effects(digest)
478 }
479
480 fn try_get_events(
481 &self,
482 digest: &TransactionEventsDigest,
483 ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
484 self.rocks.try_get_events(digest)
485 }
486
487 fn try_get_full_checkpoint_contents_by_sequence_number(
488 &self,
489 sequence_number: CheckpointSequenceNumber,
490 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
491 self.rocks
492 .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
493 }
494
495 fn try_get_full_checkpoint_contents(
496 &self,
497 digest: &CheckpointContentsDigest,
498 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
499 self.rocks.try_get_full_checkpoint_contents(digest)
500 }
501}
502
503impl RestStateReader for RestReadStore {
504 fn get_lowest_available_checkpoint_objects(
505 &self,
506 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
507 let highest_pruned_cp = self
508 .state
509 .get_object_cache_reader()
510 .try_get_highest_pruned_checkpoint()
511 .map_err(StorageError::custom)?;
512
513 if highest_pruned_cp == 0 {
514 Ok(0)
515 } else {
516 Ok(highest_pruned_cp + 1)
517 }
518 }
519
520 fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
521 Ok(self.state.get_chain_identifier())
522 }
523
524 fn get_epoch_last_checkpoint(
525 &self,
526 epoch_id: EpochId,
527 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
528 self.rocks
529 .checkpoint_store
530 .get_epoch_last_checkpoint(epoch_id)
531 .map_err(iota_types::storage::error::Error::custom)
532 }
533
534 fn indexes(&self) -> Option<&dyn RestIndexes> {
535 self.index().ok().map(|index| index as _)
536 }
537}
538
539impl RestIndexes for RestIndexStore {
540 fn get_transaction_checkpoint(
541 &self,
542 digest: &TransactionDigest,
543 ) -> iota_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
544 self.get_transaction_info(digest)
545 .map(|maybe_info| maybe_info.map(|info| info.checkpoint))
546 .map_err(StorageError::custom)
547 }
548
549 fn account_owned_objects_info_iter(
550 &self,
551 owner: IotaAddress,
552 cursor: Option<ObjectID>,
553 ) -> Result<Box<dyn Iterator<Item = AccountOwnedObjectInfo> + '_>> {
554 let iter = self.owner_iter(owner, cursor)?.map(
555 |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
556 AccountOwnedObjectInfo {
557 owner,
558 object_id,
559 version,
560 type_,
561 }
562 },
563 );
564
565 Ok(Box::new(iter) as _)
566 }
567
568 fn dynamic_field_iter(
569 &self,
570 parent: ObjectID,
571 cursor: Option<ObjectID>,
572 ) -> iota_types::storage::error::Result<
573 Box<dyn Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_>,
574 > {
575 let iter = self.dynamic_field_iter(parent, cursor)?;
576
577 Ok(Box::new(iter) as _)
578 }
579
580 fn get_coin_info(
581 &self,
582 coin_type: &StructTag,
583 ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
584 self.get_coin_info(coin_type)?
585 .map(
586 |CoinIndexInfo {
587 coin_metadata_object_id,
588 treasury_object_id,
589 }| CoinInfo {
590 coin_metadata_object_id,
591 treasury_object_id,
592 },
593 )
594 .pipe(Ok)
595 }
596}