1use std::sync::Arc;
6
7use iota_types::{
8 base_types::{IotaAddress, ObjectID, TransactionDigest},
9 committee::{Committee, EpochId},
10 digests::TransactionEventsDigest,
11 effects::{TransactionEffects, TransactionEvents},
12 error::IotaError,
13 messages_checkpoint::{
14 CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16 },
17 object::Object,
18 storage::{
19 AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20 ObjectStore, ReadStore, RestIndexes, RestStateReader, WriteStore,
21 error::{Error as StorageError, Result},
22 },
23 transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28
29use crate::{
30 authority::AuthorityState,
31 checkpoints::CheckpointStore,
32 epoch::committee_store::CommitteeStore,
33 execution_cache::ExecutionCacheTraitPointers,
34 rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
35};
36
37#[derive(Clone)]
38pub struct RocksDbStore {
39 cache_traits: ExecutionCacheTraitPointers,
40
41 committee_store: Arc<CommitteeStore>,
42 checkpoint_store: Arc<CheckpointStore>,
43 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
45 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
46}
47
48impl RocksDbStore {
49 pub fn new(
50 cache_traits: ExecutionCacheTraitPointers,
51 committee_store: Arc<CommitteeStore>,
52 checkpoint_store: Arc<CheckpointStore>,
53 ) -> Self {
54 Self {
55 cache_traits,
56 committee_store,
57 checkpoint_store,
58 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
59 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
60 }
61 }
62
63 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
64 self.cache_traits
65 .object_cache_reader
66 .try_multi_get_objects_by_key(object_keys)
67 }
68
69 pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
70 Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
71 }
72}
73
74impl ReadStore for RocksDbStore {
75 fn try_get_checkpoint_by_digest(
76 &self,
77 digest: &CheckpointDigest,
78 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
79 self.checkpoint_store
80 .get_checkpoint_by_digest(digest)
81 .map_err(Into::into)
82 }
83
84 fn try_get_checkpoint_by_sequence_number(
85 &self,
86 sequence_number: CheckpointSequenceNumber,
87 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
88 self.checkpoint_store
89 .get_checkpoint_by_sequence_number(sequence_number)
90 .map_err(Into::into)
91 }
92
93 fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
94 self.checkpoint_store
95 .get_highest_verified_checkpoint()
96 .map(|maybe_checkpoint| {
97 maybe_checkpoint
98 .expect("storage should have been initialized with genesis checkpoint")
99 })
100 .map_err(Into::into)
101 }
102
103 fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
104 self.checkpoint_store
105 .get_highest_synced_checkpoint()
106 .map(|maybe_checkpoint| {
107 maybe_checkpoint
108 .expect("storage should have been initialized with genesis checkpoint")
109 })
110 .map_err(Into::into)
111 }
112
113 fn try_get_lowest_available_checkpoint(
114 &self,
115 ) -> Result<CheckpointSequenceNumber, StorageError> {
116 let highest_pruned_cp = self
117 .checkpoint_store
118 .get_highest_pruned_checkpoint_seq_number()
119 .map_err(Into::<StorageError>::into)?;
120
121 if highest_pruned_cp == 0 {
122 Ok(0)
123 } else {
124 Ok(highest_pruned_cp + 1)
125 }
126 }
127
128 fn try_get_full_checkpoint_contents_by_sequence_number(
129 &self,
130 sequence_number: CheckpointSequenceNumber,
131 ) -> Result<Option<FullCheckpointContents>, StorageError> {
132 self.checkpoint_store
133 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
134 .map_err(Into::into)
135 }
136
137 fn try_get_full_checkpoint_contents(
138 &self,
139 digest: &CheckpointContentsDigest,
140 ) -> Result<Option<FullCheckpointContents>, StorageError> {
141 if let Some(seq_num) = self
143 .checkpoint_store
144 .get_sequence_number_by_contents_digest(digest)
145 .map_err(iota_types::storage::error::Error::custom)?
146 {
147 let contents = self
148 .checkpoint_store
149 .get_full_checkpoint_contents_by_sequence_number(seq_num)
150 .map_err(iota_types::storage::error::Error::custom)?;
151 if contents.is_some() {
152 return Ok(contents);
153 }
154 }
155
156 self.checkpoint_store
163 .get_checkpoint_contents(digest)
164 .map_err(iota_types::storage::error::Error::custom)?
165 .map(|contents| {
166 let mut transactions = Vec::with_capacity(contents.size());
167 for tx in contents.iter() {
168 if let (Some(t), Some(e)) = (
169 self.try_get_transaction(&tx.transaction)?,
170 self.cache_traits
171 .transaction_cache_reader
172 .try_get_effects(&tx.effects)
173 .map_err(iota_types::storage::error::Error::custom)?,
174 ) {
175 transactions.push(iota_types::base_types::ExecutionData::new(
176 (*t).clone().into_inner(),
177 e,
178 ))
179 } else {
180 return Result::<
181 Option<FullCheckpointContents>,
182 iota_types::storage::error::Error,
183 >::Ok(None);
184 }
185 }
186 Ok(Some(
187 FullCheckpointContents::from_contents_and_execution_data(
188 contents,
189 transactions.into_iter(),
190 ),
191 ))
192 })
193 .transpose()
194 .map(|contents| contents.flatten())
195 .map_err(iota_types::storage::error::Error::custom)
196 }
197
198 fn try_get_committee(
199 &self,
200 epoch: EpochId,
201 ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
202 Ok(self.committee_store.get_committee(&epoch).unwrap())
203 }
204
205 fn try_get_transaction(
206 &self,
207 digest: &TransactionDigest,
208 ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
209 self.cache_traits
210 .transaction_cache_reader
211 .try_get_transaction_block(digest)
212 .map_err(StorageError::custom)
213 }
214
215 fn try_get_transaction_effects(
216 &self,
217 digest: &TransactionDigest,
218 ) -> Result<Option<TransactionEffects>, StorageError> {
219 self.cache_traits
220 .transaction_cache_reader
221 .try_get_executed_effects(digest)
222 .map_err(StorageError::custom)
223 }
224
225 fn try_get_events(
226 &self,
227 digest: &TransactionEventsDigest,
228 ) -> Result<Option<TransactionEvents>, StorageError> {
229 self.cache_traits
230 .transaction_cache_reader
231 .try_get_events(digest)
232 .map_err(StorageError::custom)
233 }
234
235 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
236 self.checkpoint_store
237 .get_highest_executed_checkpoint()
238 .map_err(iota_types::storage::error::Error::custom)?
239 .ok_or_else(|| {
240 iota_types::storage::error::Error::missing("unable to get latest checkpoint")
241 })
242 }
243
244 fn try_get_checkpoint_contents_by_digest(
245 &self,
246 digest: &CheckpointContentsDigest,
247 ) -> iota_types::storage::error::Result<
248 Option<iota_types::messages_checkpoint::CheckpointContents>,
249 > {
250 self.checkpoint_store
251 .get_checkpoint_contents(digest)
252 .map_err(iota_types::storage::error::Error::custom)
253 }
254
255 fn try_get_checkpoint_contents_by_sequence_number(
256 &self,
257 sequence_number: CheckpointSequenceNumber,
258 ) -> iota_types::storage::error::Result<
259 Option<iota_types::messages_checkpoint::CheckpointContents>,
260 > {
261 match self.try_get_checkpoint_by_sequence_number(sequence_number) {
262 Ok(Some(checkpoint)) => {
263 self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
264 }
265 Ok(None) => Ok(None),
266 Err(e) => Err(e),
267 }
268 }
269}
270
271impl ObjectStore for RocksDbStore {
272 fn try_get_object(
273 &self,
274 object_id: &iota_types::base_types::ObjectID,
275 ) -> iota_types::storage::error::Result<Option<Object>> {
276 self.cache_traits.object_store.try_get_object(object_id)
277 }
278
279 fn try_get_object_by_key(
280 &self,
281 object_id: &iota_types::base_types::ObjectID,
282 version: iota_types::base_types::VersionNumber,
283 ) -> iota_types::storage::error::Result<Option<Object>> {
284 self.cache_traits
285 .object_store
286 .try_get_object_by_key(object_id, version)
287 }
288}
289
290impl WriteStore for RocksDbStore {
291 fn try_insert_checkpoint(
292 &self,
293 checkpoint: &VerifiedCheckpoint,
294 ) -> Result<(), iota_types::storage::error::Error> {
295 if let Some(EndOfEpochData {
296 next_epoch_committee,
297 ..
298 }) = checkpoint.end_of_epoch_data.as_ref()
299 {
300 let next_committee = next_epoch_committee.iter().cloned().collect();
301 let committee =
302 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
303 self.try_insert_committee(committee)?;
304 }
305
306 self.checkpoint_store
307 .insert_verified_checkpoint(checkpoint)
308 .map_err(Into::into)
309 }
310
311 fn try_update_highest_synced_checkpoint(
312 &self,
313 checkpoint: &VerifiedCheckpoint,
314 ) -> Result<(), iota_types::storage::error::Error> {
315 let mut locked = self.highest_synced_checkpoint.lock();
316 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
317 return Ok(());
318 }
319 self.checkpoint_store
320 .update_highest_synced_checkpoint(checkpoint)
321 .map_err(iota_types::storage::error::Error::custom)?;
322 *locked = Some(checkpoint.sequence_number);
323 Ok(())
324 }
325
326 fn try_update_highest_verified_checkpoint(
327 &self,
328 checkpoint: &VerifiedCheckpoint,
329 ) -> Result<(), iota_types::storage::error::Error> {
330 let mut locked = self.highest_verified_checkpoint.lock();
331 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
332 return Ok(());
333 }
334 self.checkpoint_store
335 .update_highest_verified_checkpoint(checkpoint)
336 .map_err(iota_types::storage::error::Error::custom)?;
337 *locked = Some(checkpoint.sequence_number);
338 Ok(())
339 }
340
341 fn try_insert_checkpoint_contents(
342 &self,
343 checkpoint: &VerifiedCheckpoint,
344 contents: VerifiedCheckpointContents,
345 ) -> Result<(), iota_types::storage::error::Error> {
346 self.cache_traits
347 .state_sync_store
348 .try_multi_insert_transaction_and_effects(contents.transactions())
349 .map_err(iota_types::storage::error::Error::custom)?;
350 self.checkpoint_store
351 .insert_verified_checkpoint_contents(checkpoint, contents)
352 .map_err(Into::into)
353 }
354
355 fn try_insert_committee(
356 &self,
357 new_committee: Committee,
358 ) -> Result<(), iota_types::storage::error::Error> {
359 self.committee_store
360 .insert_new_committee(&new_committee)
361 .unwrap();
362 Ok(())
363 }
364}
365
366pub struct RestReadStore {
367 state: Arc<AuthorityState>,
368 rocks: RocksDbStore,
369}
370
371impl RestReadStore {
372 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
373 Self { state, rocks }
374 }
375
376 fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
377 self.state.rest_index.as_deref().ok_or_else(|| {
378 iota_types::storage::error::Error::custom("rest index store is disabled")
379 })
380 }
381}
382
383impl ObjectStore for RestReadStore {
384 fn try_get_object(
385 &self,
386 object_id: &iota_types::base_types::ObjectID,
387 ) -> iota_types::storage::error::Result<Option<Object>> {
388 self.rocks.try_get_object(object_id)
389 }
390
391 fn try_get_object_by_key(
392 &self,
393 object_id: &iota_types::base_types::ObjectID,
394 version: iota_types::base_types::VersionNumber,
395 ) -> iota_types::storage::error::Result<Option<Object>> {
396 self.rocks.try_get_object_by_key(object_id, version)
397 }
398}
399
400impl ReadStore for RestReadStore {
401 fn try_get_committee(
402 &self,
403 epoch: EpochId,
404 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
405 self.rocks.try_get_committee(epoch)
406 }
407
408 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
409 self.rocks.try_get_latest_checkpoint()
410 }
411
412 fn try_get_highest_verified_checkpoint(
413 &self,
414 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
415 self.rocks.try_get_highest_verified_checkpoint()
416 }
417
418 fn try_get_highest_synced_checkpoint(
419 &self,
420 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
421 self.rocks.try_get_highest_synced_checkpoint()
422 }
423
424 fn try_get_lowest_available_checkpoint(
425 &self,
426 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
427 self.rocks.try_get_lowest_available_checkpoint()
428 }
429
430 fn try_get_checkpoint_by_digest(
431 &self,
432 digest: &CheckpointDigest,
433 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
434 self.rocks.try_get_checkpoint_by_digest(digest)
435 }
436
437 fn try_get_checkpoint_by_sequence_number(
438 &self,
439 sequence_number: CheckpointSequenceNumber,
440 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
441 self.rocks
442 .try_get_checkpoint_by_sequence_number(sequence_number)
443 }
444
445 fn try_get_checkpoint_contents_by_digest(
446 &self,
447 digest: &CheckpointContentsDigest,
448 ) -> iota_types::storage::error::Result<
449 Option<iota_types::messages_checkpoint::CheckpointContents>,
450 > {
451 self.rocks.try_get_checkpoint_contents_by_digest(digest)
452 }
453
454 fn try_get_checkpoint_contents_by_sequence_number(
455 &self,
456 sequence_number: CheckpointSequenceNumber,
457 ) -> iota_types::storage::error::Result<
458 Option<iota_types::messages_checkpoint::CheckpointContents>,
459 > {
460 self.rocks
461 .try_get_checkpoint_contents_by_sequence_number(sequence_number)
462 }
463
464 fn try_get_transaction(
465 &self,
466 digest: &TransactionDigest,
467 ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
468 self.rocks.try_get_transaction(digest)
469 }
470
471 fn try_get_transaction_effects(
472 &self,
473 digest: &TransactionDigest,
474 ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
475 self.rocks.try_get_transaction_effects(digest)
476 }
477
478 fn try_get_events(
479 &self,
480 digest: &TransactionEventsDigest,
481 ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
482 self.rocks.try_get_events(digest)
483 }
484
485 fn try_get_full_checkpoint_contents_by_sequence_number(
486 &self,
487 sequence_number: CheckpointSequenceNumber,
488 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
489 self.rocks
490 .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
491 }
492
493 fn try_get_full_checkpoint_contents(
494 &self,
495 digest: &CheckpointContentsDigest,
496 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
497 self.rocks.try_get_full_checkpoint_contents(digest)
498 }
499}
500
501impl RestStateReader for RestReadStore {
502 fn get_lowest_available_checkpoint_objects(
503 &self,
504 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
505 let highest_pruned_cp = self
506 .state
507 .get_object_cache_reader()
508 .try_get_highest_pruned_checkpoint()
509 .map_err(StorageError::custom)?;
510
511 if highest_pruned_cp == 0 {
512 Ok(0)
513 } else {
514 Ok(highest_pruned_cp + 1)
515 }
516 }
517
518 fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
519 Ok(self.state.get_chain_identifier())
520 }
521
522 fn get_epoch_last_checkpoint(
523 &self,
524 epoch_id: EpochId,
525 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
526 self.rocks
527 .checkpoint_store
528 .get_epoch_last_checkpoint(epoch_id)
529 .map_err(iota_types::storage::error::Error::custom)
530 }
531
532 fn indexes(&self) -> Option<&dyn RestIndexes> {
533 self.index().ok().map(|index| index as _)
534 }
535}
536
537impl RestIndexes for RestIndexStore {
538 fn get_transaction_checkpoint(
539 &self,
540 digest: &TransactionDigest,
541 ) -> iota_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
542 self.get_transaction_info(digest)
543 .map(|maybe_info| maybe_info.map(|info| info.checkpoint))
544 .map_err(StorageError::custom)
545 }
546
547 fn account_owned_objects_info_iter(
548 &self,
549 owner: IotaAddress,
550 cursor: Option<ObjectID>,
551 ) -> Result<Box<dyn Iterator<Item = AccountOwnedObjectInfo> + '_>> {
552 let iter = self.owner_iter(owner, cursor)?.map(
553 |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
554 AccountOwnedObjectInfo {
555 owner,
556 object_id,
557 version,
558 type_,
559 }
560 },
561 );
562
563 Ok(Box::new(iter) as _)
564 }
565
566 fn dynamic_field_iter(
567 &self,
568 parent: ObjectID,
569 cursor: Option<ObjectID>,
570 ) -> iota_types::storage::error::Result<
571 Box<dyn Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_>,
572 > {
573 let iter = self.dynamic_field_iter(parent, cursor)?;
574
575 Ok(Box::new(iter) as _)
576 }
577
578 fn get_coin_info(
579 &self,
580 coin_type: &StructTag,
581 ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
582 self.get_coin_info(coin_type)?
583 .map(
584 |CoinIndexInfo {
585 coin_metadata_object_id,
586 treasury_object_id,
587 }| CoinInfo {
588 coin_metadata_object_id,
589 treasury_object_id,
590 },
591 )
592 .pipe(Ok)
593 }
594}