1use std::sync::Arc;
6
7use iota_node_storage::{GrpcIndexes, GrpcStateReader};
8use iota_types::{
9 base_types::{StructTag, TransactionDigest},
10 committee::{Committee, EpochId},
11 effects::{TransactionEffects, TransactionEvents},
12 error::IotaError,
13 messages_checkpoint::{
14 CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
15 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
16 },
17 object::Object,
18 storage::{
19 ObjectKey, ObjectStore, ReadStore, WriteStore,
20 error::{Error as StorageError, Result},
21 },
22 transaction::VerifiedTransaction,
23};
24use parking_lot::Mutex;
25use tracing::instrument;
26
27use crate::{
28 authority::AuthorityState, checkpoints::CheckpointStore,
29 epoch::committee_store::CommitteeStore, execution_cache::ExecutionCacheTraitPointers,
30 grpc_indexes::GrpcIndexesStore,
31};
32
33#[derive(Clone)]
34pub struct RocksDbStore {
35 cache_traits: ExecutionCacheTraitPointers,
36
37 committee_store: Arc<CommitteeStore>,
38 checkpoint_store: Arc<CheckpointStore>,
39 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
41 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
42}
43
44impl RocksDbStore {
45 pub fn new(
46 cache_traits: ExecutionCacheTraitPointers,
47 committee_store: Arc<CommitteeStore>,
48 checkpoint_store: Arc<CheckpointStore>,
49 ) -> Self {
50 Self {
51 cache_traits,
52 committee_store,
53 checkpoint_store,
54 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
55 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
56 }
57 }
58
59 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
60 self.cache_traits
61 .object_cache_reader
62 .try_multi_get_objects_by_key(object_keys)
63 }
64
65 pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
66 Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
67 }
68}
69
70impl ReadStore for RocksDbStore {
71 fn try_get_checkpoint_by_digest(
72 &self,
73 digest: &CheckpointDigest,
74 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
75 self.checkpoint_store
76 .get_checkpoint_by_digest(digest)
77 .map_err(Into::into)
78 }
79
80 fn try_get_checkpoint_by_sequence_number(
81 &self,
82 sequence_number: CheckpointSequenceNumber,
83 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
84 self.checkpoint_store
85 .get_checkpoint_by_sequence_number(sequence_number)
86 .map_err(Into::into)
87 }
88
89 fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
90 self.checkpoint_store
91 .get_highest_verified_checkpoint()
92 .map(|maybe_checkpoint| {
93 maybe_checkpoint
94 .expect("storage should have been initialized with genesis checkpoint")
95 })
96 .map_err(Into::into)
97 }
98
99 fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
100 self.checkpoint_store
101 .get_highest_synced_checkpoint()
102 .map(|maybe_checkpoint| {
103 maybe_checkpoint
104 .expect("storage should have been initialized with genesis checkpoint")
105 })
106 .map_err(Into::into)
107 }
108
109 fn try_get_lowest_available_checkpoint(
110 &self,
111 ) -> Result<CheckpointSequenceNumber, StorageError> {
112 if let Some(highest_pruned_cp) = self
113 .checkpoint_store
114 .get_highest_pruned_checkpoint_seq_number()
115 .map_err(Into::<StorageError>::into)?
116 {
117 Ok(highest_pruned_cp + 1)
118 } else {
119 Ok(0)
120 }
121 }
122
123 fn try_get_full_checkpoint_contents_by_sequence_number(
124 &self,
125 sequence_number: CheckpointSequenceNumber,
126 ) -> Result<Option<FullCheckpointContents>, StorageError> {
127 self.checkpoint_store
128 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
129 .map_err(Into::into)
130 }
131
132 fn try_get_full_checkpoint_contents(
133 &self,
134 digest: &CheckpointContentsDigest,
135 ) -> Result<Option<FullCheckpointContents>, StorageError> {
136 if let Some(seq_num) = self
138 .checkpoint_store
139 .get_sequence_number_by_contents_digest(digest)
140 .map_err(iota_types::storage::error::Error::custom)?
141 {
142 let contents = self
143 .checkpoint_store
144 .get_full_checkpoint_contents_by_sequence_number(seq_num)
145 .map_err(iota_types::storage::error::Error::custom)?;
146 if contents.is_some() {
147 return Ok(contents);
148 }
149 }
150
151 self.checkpoint_store
158 .get_checkpoint_contents(digest)
159 .map_err(iota_types::storage::error::Error::custom)?
160 .map(|contents| {
161 let mut transactions = Vec::with_capacity(contents.size());
162 for tx in contents.iter() {
163 if let (Some(t), Some(e)) = (
164 self.try_get_transaction(&tx.transaction)?,
165 self.cache_traits
166 .transaction_cache_reader
167 .try_get_effects(&tx.effects)
168 .map_err(iota_types::storage::error::Error::custom)?,
169 ) {
170 transactions.push(iota_types::base_types::ExecutionData::new(
171 (*t).clone().into_inner(),
172 e,
173 ))
174 } else {
175 return Result::<
176 Option<FullCheckpointContents>,
177 iota_types::storage::error::Error,
178 >::Ok(None);
179 }
180 }
181 Ok(Some(
182 FullCheckpointContents::from_contents_and_execution_data(
183 contents,
184 transactions.into_iter(),
185 ),
186 ))
187 })
188 .transpose()
189 .map(|contents| contents.flatten())
190 .map_err(iota_types::storage::error::Error::custom)
191 }
192
193 fn try_get_committee(
194 &self,
195 epoch: EpochId,
196 ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
197 Ok(self.committee_store.get_committee(&epoch).unwrap())
198 }
199
200 fn try_get_transaction(
201 &self,
202 digest: &TransactionDigest,
203 ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
204 self.cache_traits
205 .transaction_cache_reader
206 .try_get_transaction_block(digest)
207 .map_err(StorageError::custom)
208 }
209
210 fn try_get_transaction_effects(
211 &self,
212 digest: &TransactionDigest,
213 ) -> Result<Option<TransactionEffects>, StorageError> {
214 self.cache_traits
215 .transaction_cache_reader
216 .try_get_executed_effects(digest)
217 .map_err(StorageError::custom)
218 }
219
220 fn try_get_events(
221 &self,
222 digest: &TransactionDigest,
223 ) -> Result<Option<TransactionEvents>, StorageError> {
224 self.cache_traits
225 .transaction_cache_reader
226 .try_get_events(digest)
227 .map_err(StorageError::custom)
228 }
229
230 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
231 self.checkpoint_store
232 .get_highest_executed_checkpoint()
233 .map_err(iota_types::storage::error::Error::custom)?
234 .ok_or_else(|| {
235 iota_types::storage::error::Error::missing("unable to get latest checkpoint")
236 })
237 }
238
239 fn try_get_checkpoint_contents_by_digest(
240 &self,
241 digest: &CheckpointContentsDigest,
242 ) -> iota_types::storage::error::Result<
243 Option<iota_types::messages_checkpoint::CheckpointContents>,
244 > {
245 self.checkpoint_store
246 .get_checkpoint_contents(digest)
247 .map_err(iota_types::storage::error::Error::custom)
248 }
249
250 fn try_get_checkpoint_contents_by_sequence_number(
251 &self,
252 sequence_number: CheckpointSequenceNumber,
253 ) -> iota_types::storage::error::Result<
254 Option<iota_types::messages_checkpoint::CheckpointContents>,
255 > {
256 match self.try_get_checkpoint_by_sequence_number(sequence_number) {
257 Ok(Some(checkpoint)) => {
258 self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
259 }
260 Ok(None) => Ok(None),
261 Err(e) => Err(e),
262 }
263 }
264}
265
266impl ObjectStore for RocksDbStore {
267 fn try_get_object(
268 &self,
269 object_id: &iota_types::base_types::ObjectID,
270 ) -> iota_types::storage::error::Result<Option<Object>> {
271 self.cache_traits.object_store.try_get_object(object_id)
272 }
273
274 fn try_get_object_by_key(
275 &self,
276 object_id: &iota_types::base_types::ObjectID,
277 version: iota_types::base_types::VersionNumber,
278 ) -> iota_types::storage::error::Result<Option<Object>> {
279 self.cache_traits
280 .object_store
281 .try_get_object_by_key(object_id, version)
282 }
283}
284
285impl WriteStore for RocksDbStore {
286 #[instrument(level = "trace", skip_all)]
287 fn try_insert_checkpoint(
288 &self,
289 checkpoint: &VerifiedCheckpoint,
290 ) -> Result<(), iota_types::storage::error::Error> {
291 if let Some(EndOfEpochData {
292 next_epoch_committee,
293 ..
294 }) = checkpoint.end_of_epoch_data.as_ref()
295 {
296 let next_committee = next_epoch_committee.iter().cloned().collect();
297 let committee =
298 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
299 self.try_insert_committee(committee)?;
300 }
301
302 self.checkpoint_store
303 .insert_verified_checkpoint(checkpoint)
304 .map_err(Into::into)
305 }
306
307 fn try_update_highest_synced_checkpoint(
308 &self,
309 checkpoint: &VerifiedCheckpoint,
310 ) -> Result<(), iota_types::storage::error::Error> {
311 let mut locked = self.highest_synced_checkpoint.lock();
312 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
313 return Ok(());
314 }
315 self.checkpoint_store
316 .update_highest_synced_checkpoint(checkpoint)
317 .map_err(iota_types::storage::error::Error::custom)?;
318 *locked = Some(checkpoint.sequence_number);
319 Ok(())
320 }
321
322 fn try_update_highest_verified_checkpoint(
323 &self,
324 checkpoint: &VerifiedCheckpoint,
325 ) -> Result<(), iota_types::storage::error::Error> {
326 let mut locked = self.highest_verified_checkpoint.lock();
327 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
328 return Ok(());
329 }
330 self.checkpoint_store
331 .update_highest_verified_checkpoint(checkpoint)
332 .map_err(iota_types::storage::error::Error::custom)?;
333 *locked = Some(checkpoint.sequence_number);
334 Ok(())
335 }
336
337 fn try_insert_checkpoint_contents(
338 &self,
339 checkpoint: &VerifiedCheckpoint,
340 contents: VerifiedCheckpointContents,
341 ) -> Result<(), iota_types::storage::error::Error> {
342 self.cache_traits
343 .state_sync_store
344 .try_multi_insert_transaction_and_effects(contents.transactions())
345 .map_err(iota_types::storage::error::Error::custom)?;
346 self.checkpoint_store
347 .insert_verified_checkpoint_contents(checkpoint, contents)
348 .map_err(Into::into)
349 }
350
351 fn try_insert_committee(
352 &self,
353 new_committee: Committee,
354 ) -> Result<(), iota_types::storage::error::Error> {
355 self.committee_store
356 .insert_new_committee(&new_committee)
357 .unwrap();
358 Ok(())
359 }
360}
361
362pub struct GrpcReadStore {
363 state: Arc<AuthorityState>,
364 rocks: RocksDbStore,
365}
366
367impl GrpcReadStore {
368 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
369 Self { state, rocks }
370 }
371
372 fn grpc_indexes_store(&self) -> iota_types::storage::error::Result<&GrpcIndexesStore> {
373 self.state.grpc_indexes_store.as_deref().ok_or_else(|| {
374 iota_types::storage::error::Error::custom("gRPC index store is disabled")
375 })
376 }
377}
378
379impl ObjectStore for GrpcReadStore {
380 fn try_get_object(
381 &self,
382 object_id: &iota_types::base_types::ObjectID,
383 ) -> iota_types::storage::error::Result<Option<Object>> {
384 self.rocks.try_get_object(object_id)
385 }
386
387 fn try_get_object_by_key(
388 &self,
389 object_id: &iota_types::base_types::ObjectID,
390 version: iota_types::base_types::VersionNumber,
391 ) -> iota_types::storage::error::Result<Option<Object>> {
392 self.rocks.try_get_object_by_key(object_id, version)
393 }
394}
395
396impl ReadStore for GrpcReadStore {
397 fn try_get_committee(
398 &self,
399 epoch: EpochId,
400 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
401 self.rocks.try_get_committee(epoch)
402 }
403
404 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
405 self.rocks.try_get_latest_checkpoint()
406 }
407
408 fn try_get_highest_verified_checkpoint(
409 &self,
410 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
411 self.rocks.try_get_highest_verified_checkpoint()
412 }
413
414 fn try_get_highest_synced_checkpoint(
415 &self,
416 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
417 self.rocks.try_get_highest_synced_checkpoint()
418 }
419
420 fn try_get_lowest_available_checkpoint(
421 &self,
422 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
423 self.rocks.try_get_lowest_available_checkpoint()
424 }
425
426 fn try_get_checkpoint_by_digest(
427 &self,
428 digest: &CheckpointDigest,
429 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
430 self.rocks.try_get_checkpoint_by_digest(digest)
431 }
432
433 fn try_get_checkpoint_by_sequence_number(
434 &self,
435 sequence_number: CheckpointSequenceNumber,
436 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
437 self.rocks
438 .try_get_checkpoint_by_sequence_number(sequence_number)
439 }
440
441 fn try_get_checkpoint_contents_by_digest(
442 &self,
443 digest: &CheckpointContentsDigest,
444 ) -> iota_types::storage::error::Result<
445 Option<iota_types::messages_checkpoint::CheckpointContents>,
446 > {
447 self.rocks.try_get_checkpoint_contents_by_digest(digest)
448 }
449
450 fn try_get_checkpoint_contents_by_sequence_number(
451 &self,
452 sequence_number: CheckpointSequenceNumber,
453 ) -> iota_types::storage::error::Result<
454 Option<iota_types::messages_checkpoint::CheckpointContents>,
455 > {
456 self.rocks
457 .try_get_checkpoint_contents_by_sequence_number(sequence_number)
458 }
459
460 fn try_get_transaction(
461 &self,
462 digest: &TransactionDigest,
463 ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
464 self.rocks.try_get_transaction(digest)
465 }
466
467 fn try_get_transaction_effects(
468 &self,
469 digest: &TransactionDigest,
470 ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
471 self.rocks.try_get_transaction_effects(digest)
472 }
473
474 fn try_get_events(
475 &self,
476 digest: &TransactionDigest,
477 ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
478 self.rocks.try_get_events(digest)
479 }
480
481 fn try_get_full_checkpoint_contents_by_sequence_number(
482 &self,
483 sequence_number: CheckpointSequenceNumber,
484 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
485 self.rocks
486 .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
487 }
488
489 fn try_get_full_checkpoint_contents(
490 &self,
491 digest: &CheckpointContentsDigest,
492 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
493 self.rocks.try_get_full_checkpoint_contents(digest)
494 }
495}
496
497impl GrpcStateReader for GrpcReadStore {
498 fn get_lowest_available_checkpoint_objects(
499 &self,
500 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
501 Ok(self
502 .state
503 .get_object_cache_reader()
504 .try_get_highest_pruned_checkpoint()
505 .map_err(StorageError::custom)?
506 .map(|cp| cp + 1)
507 .unwrap_or(0))
508 }
509
510 fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
511 Ok(self.state.get_chain_identifier())
512 }
513
514 fn get_epoch_last_checkpoint(
515 &self,
516 epoch_id: EpochId,
517 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
518 self.rocks
519 .checkpoint_store
520 .get_epoch_last_checkpoint(epoch_id)
521 .map_err(iota_types::storage::error::Error::custom)
522 }
523
524 fn grpc_indexes(&self) -> Option<&dyn GrpcIndexes> {
525 self.grpc_indexes_store().ok().map(|index| index as _)
526 }
527
528 fn get_struct_layout(
529 &self,
530 struct_tag: &StructTag,
531 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
532 self.state
533 .load_epoch_store_one_call_per_task()
534 .executor()
535 .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
537 .get_annotated_layout(struct_tag)
538 .map(|layout| layout.into_layout())
539 .map(Some)
540 .map_err(StorageError::custom)
541 }
542}