1use std::sync::Arc;
6
7use iota_types::{
8 base_types::{IotaAddress, ObjectID, TransactionDigest},
9 committee::{Committee, EpochId},
10 digests::TransactionEventsDigest,
11 effects::{TransactionEffects, TransactionEvents},
12 error::IotaError,
13 messages_checkpoint::{
14 CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16 },
17 object::Object,
18 storage::{
19 AccountOwnedObjectInfo, CoinInfo, DynamicFieldIndexInfo, DynamicFieldKey, ObjectKey,
20 ObjectStore, ReadStore, RestStateReader, WriteStore,
21 error::{Error as StorageError, Result},
22 },
23 transaction::VerifiedTransaction,
24};
25use move_core_types::language_storage::StructTag;
26use parking_lot::Mutex;
27use tap::Pipe;
28
29use crate::{
30 authority::AuthorityState,
31 checkpoints::CheckpointStore,
32 epoch::committee_store::CommitteeStore,
33 execution_cache::ExecutionCacheTraitPointers,
34 rest_index::{CoinIndexInfo, OwnerIndexInfo, OwnerIndexKey, RestIndexStore},
35};
36
37#[derive(Clone)]
38pub struct RocksDbStore {
39 cache_traits: ExecutionCacheTraitPointers,
40
41 committee_store: Arc<CommitteeStore>,
42 checkpoint_store: Arc<CheckpointStore>,
43 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
45 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
46}
47
48impl RocksDbStore {
49 pub fn new(
50 cache_traits: ExecutionCacheTraitPointers,
51 committee_store: Arc<CommitteeStore>,
52 checkpoint_store: Arc<CheckpointStore>,
53 ) -> Self {
54 Self {
55 cache_traits,
56 committee_store,
57 checkpoint_store,
58 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
59 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
60 }
61 }
62
63 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
64 self.cache_traits
65 .object_cache_reader
66 .multi_get_objects_by_key(object_keys)
67 }
68
69 pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
70 Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
71 }
72}
73
74impl ReadStore for RocksDbStore {
75 fn get_checkpoint_by_digest(
76 &self,
77 digest: &CheckpointDigest,
78 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
79 self.checkpoint_store
80 .get_checkpoint_by_digest(digest)
81 .map_err(Into::into)
82 }
83
84 fn get_checkpoint_by_sequence_number(
85 &self,
86 sequence_number: CheckpointSequenceNumber,
87 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
88 self.checkpoint_store
89 .get_checkpoint_by_sequence_number(sequence_number)
90 .map_err(Into::into)
91 }
92
93 fn get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
94 self.checkpoint_store
95 .get_highest_verified_checkpoint()
96 .map(|maybe_checkpoint| {
97 maybe_checkpoint
98 .expect("storage should have been initialized with genesis checkpoint")
99 })
100 .map_err(Into::into)
101 }
102
103 fn get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
104 self.checkpoint_store
105 .get_highest_synced_checkpoint()
106 .map(|maybe_checkpoint| {
107 maybe_checkpoint
108 .expect("storage should have been initialized with genesis checkpoint")
109 })
110 .map_err(Into::into)
111 }
112
113 fn get_lowest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber, StorageError> {
114 let highest_pruned_cp = self
115 .checkpoint_store
116 .get_highest_pruned_checkpoint_seq_number()
117 .map_err(Into::<StorageError>::into)?;
118
119 if highest_pruned_cp == 0 {
120 Ok(0)
121 } else {
122 Ok(highest_pruned_cp + 1)
123 }
124 }
125
126 fn get_full_checkpoint_contents_by_sequence_number(
127 &self,
128 sequence_number: CheckpointSequenceNumber,
129 ) -> Result<Option<FullCheckpointContents>, StorageError> {
130 self.checkpoint_store
131 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
132 .map_err(Into::into)
133 }
134
135 fn get_full_checkpoint_contents(
136 &self,
137 digest: &CheckpointContentsDigest,
138 ) -> Result<Option<FullCheckpointContents>, StorageError> {
139 if let Some(seq_num) = self
141 .checkpoint_store
142 .get_sequence_number_by_contents_digest(digest)
143 .map_err(iota_types::storage::error::Error::custom)?
144 {
145 let contents = self
146 .checkpoint_store
147 .get_full_checkpoint_contents_by_sequence_number(seq_num)
148 .map_err(iota_types::storage::error::Error::custom)?;
149 if contents.is_some() {
150 return Ok(contents);
151 }
152 }
153
154 self.checkpoint_store
161 .get_checkpoint_contents(digest)
162 .map_err(iota_types::storage::error::Error::custom)?
163 .map(|contents| {
164 let mut transactions = Vec::with_capacity(contents.size());
165 for tx in contents.iter() {
166 if let (Some(t), Some(e)) = (
167 self.get_transaction(&tx.transaction)?,
168 self.cache_traits
169 .transaction_cache_reader
170 .get_effects(&tx.effects)
171 .map_err(iota_types::storage::error::Error::custom)?,
172 ) {
173 transactions.push(iota_types::base_types::ExecutionData::new(
174 (*t).clone().into_inner(),
175 e,
176 ))
177 } else {
178 return Result::<
179 Option<FullCheckpointContents>,
180 iota_types::storage::error::Error,
181 >::Ok(None);
182 }
183 }
184 Ok(Some(
185 FullCheckpointContents::from_contents_and_execution_data(
186 contents,
187 transactions.into_iter(),
188 ),
189 ))
190 })
191 .transpose()
192 .map(|contents| contents.flatten())
193 .map_err(iota_types::storage::error::Error::custom)
194 }
195
196 fn get_committee(
197 &self,
198 epoch: EpochId,
199 ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
200 Ok(self.committee_store.get_committee(&epoch).unwrap())
201 }
202
203 fn get_transaction(
204 &self,
205 digest: &TransactionDigest,
206 ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
207 self.cache_traits
208 .transaction_cache_reader
209 .get_transaction_block(digest)
210 .map_err(StorageError::custom)
211 }
212
213 fn get_transaction_effects(
214 &self,
215 digest: &TransactionDigest,
216 ) -> Result<Option<TransactionEffects>, StorageError> {
217 self.cache_traits
218 .transaction_cache_reader
219 .get_executed_effects(digest)
220 .map_err(StorageError::custom)
221 }
222
223 fn get_events(
224 &self,
225 digest: &TransactionEventsDigest,
226 ) -> Result<Option<TransactionEvents>, StorageError> {
227 self.cache_traits
228 .transaction_cache_reader
229 .get_events(digest)
230 .map_err(StorageError::custom)
231 }
232
233 fn get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
234 self.checkpoint_store
235 .get_highest_executed_checkpoint()
236 .map_err(iota_types::storage::error::Error::custom)?
237 .ok_or_else(|| {
238 iota_types::storage::error::Error::missing("unable to get latest checkpoint")
239 })
240 }
241
242 fn get_checkpoint_contents_by_digest(
243 &self,
244 digest: &CheckpointContentsDigest,
245 ) -> iota_types::storage::error::Result<
246 Option<iota_types::messages_checkpoint::CheckpointContents>,
247 > {
248 self.checkpoint_store
249 .get_checkpoint_contents(digest)
250 .map_err(iota_types::storage::error::Error::custom)
251 }
252
253 fn get_checkpoint_contents_by_sequence_number(
254 &self,
255 sequence_number: CheckpointSequenceNumber,
256 ) -> iota_types::storage::error::Result<
257 Option<iota_types::messages_checkpoint::CheckpointContents>,
258 > {
259 match self.get_checkpoint_by_sequence_number(sequence_number) {
260 Ok(Some(checkpoint)) => {
261 self.get_checkpoint_contents_by_digest(&checkpoint.content_digest)
262 }
263 Ok(None) => Ok(None),
264 Err(e) => Err(e),
265 }
266 }
267}
268
269impl ObjectStore for RocksDbStore {
270 fn get_object(
271 &self,
272 object_id: &iota_types::base_types::ObjectID,
273 ) -> iota_types::storage::error::Result<Option<Object>> {
274 self.cache_traits.object_store.get_object(object_id)
275 }
276
277 fn get_object_by_key(
278 &self,
279 object_id: &iota_types::base_types::ObjectID,
280 version: iota_types::base_types::VersionNumber,
281 ) -> iota_types::storage::error::Result<Option<Object>> {
282 self.cache_traits
283 .object_store
284 .get_object_by_key(object_id, version)
285 }
286}
287
288impl WriteStore for RocksDbStore {
289 fn insert_checkpoint(
290 &self,
291 checkpoint: &VerifiedCheckpoint,
292 ) -> Result<(), iota_types::storage::error::Error> {
293 if let Some(EndOfEpochData {
294 next_epoch_committee,
295 ..
296 }) = checkpoint.end_of_epoch_data.as_ref()
297 {
298 let next_committee = next_epoch_committee.iter().cloned().collect();
299 let committee =
300 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
301 self.insert_committee(committee)?;
302 }
303
304 self.checkpoint_store
305 .insert_verified_checkpoint(checkpoint)
306 .map_err(Into::into)
307 }
308
309 fn update_highest_synced_checkpoint(
310 &self,
311 checkpoint: &VerifiedCheckpoint,
312 ) -> Result<(), iota_types::storage::error::Error> {
313 let mut locked = self.highest_synced_checkpoint.lock();
314 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
315 return Ok(());
316 }
317 self.checkpoint_store
318 .update_highest_synced_checkpoint(checkpoint)
319 .map_err(iota_types::storage::error::Error::custom)?;
320 *locked = Some(checkpoint.sequence_number);
321 Ok(())
322 }
323
324 fn update_highest_verified_checkpoint(
325 &self,
326 checkpoint: &VerifiedCheckpoint,
327 ) -> Result<(), iota_types::storage::error::Error> {
328 let mut locked = self.highest_verified_checkpoint.lock();
329 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
330 return Ok(());
331 }
332 self.checkpoint_store
333 .update_highest_verified_checkpoint(checkpoint)
334 .map_err(iota_types::storage::error::Error::custom)?;
335 *locked = Some(checkpoint.sequence_number);
336 Ok(())
337 }
338
339 fn insert_checkpoint_contents(
340 &self,
341 checkpoint: &VerifiedCheckpoint,
342 contents: VerifiedCheckpointContents,
343 ) -> Result<(), iota_types::storage::error::Error> {
344 self.cache_traits
345 .state_sync_store
346 .multi_insert_transaction_and_effects(contents.transactions())
347 .map_err(iota_types::storage::error::Error::custom)?;
348 self.checkpoint_store
349 .insert_verified_checkpoint_contents(checkpoint, contents)
350 .map_err(Into::into)
351 }
352
353 fn insert_committee(
354 &self,
355 new_committee: Committee,
356 ) -> Result<(), iota_types::storage::error::Error> {
357 self.committee_store
358 .insert_new_committee(&new_committee)
359 .unwrap();
360 Ok(())
361 }
362}
363
364pub struct RestReadStore {
365 state: Arc<AuthorityState>,
366 rocks: RocksDbStore,
367}
368
369impl RestReadStore {
370 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
371 Self { state, rocks }
372 }
373
374 fn index(&self) -> iota_types::storage::error::Result<&RestIndexStore> {
375 self.state.rest_index.as_deref().ok_or_else(|| {
376 iota_types::storage::error::Error::custom("rest index store is disabled")
377 })
378 }
379}
380
381impl ObjectStore for RestReadStore {
382 fn get_object(
383 &self,
384 object_id: &iota_types::base_types::ObjectID,
385 ) -> iota_types::storage::error::Result<Option<Object>> {
386 self.rocks.get_object(object_id)
387 }
388
389 fn get_object_by_key(
390 &self,
391 object_id: &iota_types::base_types::ObjectID,
392 version: iota_types::base_types::VersionNumber,
393 ) -> iota_types::storage::error::Result<Option<Object>> {
394 self.rocks.get_object_by_key(object_id, version)
395 }
396}
397
398impl ReadStore for RestReadStore {
399 fn get_committee(
400 &self,
401 epoch: EpochId,
402 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
403 self.rocks.get_committee(epoch)
404 }
405
406 fn get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
407 self.rocks.get_latest_checkpoint()
408 }
409
410 fn get_highest_verified_checkpoint(
411 &self,
412 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
413 self.rocks.get_highest_verified_checkpoint()
414 }
415
416 fn get_highest_synced_checkpoint(
417 &self,
418 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
419 self.rocks.get_highest_synced_checkpoint()
420 }
421
422 fn get_lowest_available_checkpoint(
423 &self,
424 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
425 self.rocks.get_lowest_available_checkpoint()
426 }
427
428 fn get_checkpoint_by_digest(
429 &self,
430 digest: &CheckpointDigest,
431 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
432 self.rocks.get_checkpoint_by_digest(digest)
433 }
434
435 fn get_checkpoint_by_sequence_number(
436 &self,
437 sequence_number: CheckpointSequenceNumber,
438 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
439 self.rocks
440 .get_checkpoint_by_sequence_number(sequence_number)
441 }
442
443 fn get_checkpoint_contents_by_digest(
444 &self,
445 digest: &CheckpointContentsDigest,
446 ) -> iota_types::storage::error::Result<
447 Option<iota_types::messages_checkpoint::CheckpointContents>,
448 > {
449 self.rocks.get_checkpoint_contents_by_digest(digest)
450 }
451
452 fn get_checkpoint_contents_by_sequence_number(
453 &self,
454 sequence_number: CheckpointSequenceNumber,
455 ) -> iota_types::storage::error::Result<
456 Option<iota_types::messages_checkpoint::CheckpointContents>,
457 > {
458 self.rocks
459 .get_checkpoint_contents_by_sequence_number(sequence_number)
460 }
461
462 fn get_transaction(
463 &self,
464 digest: &TransactionDigest,
465 ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
466 self.rocks.get_transaction(digest)
467 }
468
469 fn get_transaction_effects(
470 &self,
471 digest: &TransactionDigest,
472 ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
473 self.rocks.get_transaction_effects(digest)
474 }
475
476 fn get_events(
477 &self,
478 digest: &TransactionEventsDigest,
479 ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
480 self.rocks.get_events(digest)
481 }
482
483 fn get_full_checkpoint_contents_by_sequence_number(
484 &self,
485 sequence_number: CheckpointSequenceNumber,
486 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
487 self.rocks
488 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
489 }
490
491 fn get_full_checkpoint_contents(
492 &self,
493 digest: &CheckpointContentsDigest,
494 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
495 self.rocks.get_full_checkpoint_contents(digest)
496 }
497}
498
499impl RestStateReader for RestReadStore {
500 fn get_transaction_checkpoint(
501 &self,
502 digest: &TransactionDigest,
503 ) -> iota_types::storage::error::Result<Option<CheckpointSequenceNumber>> {
504 self.index()?
505 .get_transaction_info(digest)
506 .map(|maybe_info| maybe_info.map(|info| info.checkpoint))
507 .map_err(StorageError::custom)
508 }
509
510 fn get_lowest_available_checkpoint_objects(
511 &self,
512 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
513 let highest_pruned_cp = self
514 .state
515 .get_object_cache_reader()
516 .get_highest_pruned_checkpoint()
517 .map_err(StorageError::custom)?;
518
519 if highest_pruned_cp == 0 {
520 Ok(0)
521 } else {
522 Ok(highest_pruned_cp + 1)
523 }
524 }
525
526 fn get_chain_identifier(
527 &self,
528 ) -> iota_types::storage::error::Result<iota_types::digests::ChainIdentifier> {
529 self.state
530 .get_chain_identifier()
531 .ok_or_else(|| StorageError::missing("unable to query chain identifier"))
532 }
533
534 fn account_owned_objects_info_iter(
535 &self,
536 owner: IotaAddress,
537 cursor: Option<ObjectID>,
538 ) -> Result<Box<dyn Iterator<Item = AccountOwnedObjectInfo> + '_>> {
539 let iter = self.index()?.owner_iter(owner, cursor)?.map(
540 |(OwnerIndexKey { owner, object_id }, OwnerIndexInfo { version, type_ })| {
541 AccountOwnedObjectInfo {
542 owner,
543 object_id,
544 version,
545 type_,
546 }
547 },
548 );
549
550 Ok(Box::new(iter) as _)
551 }
552
553 fn dynamic_field_iter(
554 &self,
555 parent: ObjectID,
556 cursor: Option<ObjectID>,
557 ) -> iota_types::storage::error::Result<
558 Box<dyn Iterator<Item = (DynamicFieldKey, DynamicFieldIndexInfo)> + '_>,
559 > {
560 let iter = self.index()?.dynamic_field_iter(parent, cursor)?;
561
562 Ok(Box::new(iter) as _)
563 }
564
565 fn get_coin_info(
566 &self,
567 coin_type: &StructTag,
568 ) -> iota_types::storage::error::Result<Option<CoinInfo>> {
569 self.index()?
570 .get_coin_info(coin_type)?
571 .map(
572 |CoinIndexInfo {
573 coin_metadata_object_id,
574 treasury_object_id,
575 }| CoinInfo {
576 coin_metadata_object_id,
577 treasury_object_id,
578 },
579 )
580 .pipe(Ok)
581 }
582
583 fn get_epoch_last_checkpoint(
584 &self,
585 epoch_id: EpochId,
586 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
587 self.rocks
588 .checkpoint_store
589 .get_epoch_last_checkpoint(epoch_id)
590 .map_err(iota_types::storage::error::Error::custom)
591 }
592}