1use std::sync::Arc;
6
7use iota_node_storage::{GrpcIndexes, GrpcStateReader};
8use iota_sdk_types::StructTag;
9use iota_types::{
10 base_types::TransactionDigest,
11 committee::{Committee, EpochId},
12 effects::{TransactionEffects, TransactionEvents},
13 error::IotaError,
14 messages_checkpoint::{
15 CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber, EndOfEpochData,
16 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
17 },
18 object::Object,
19 storage::{
20 ObjectKey, ObjectStore, ReadStore, WriteStore,
21 error::{Error as StorageError, Result},
22 },
23 transaction::VerifiedTransaction,
24};
25use parking_lot::Mutex;
26use tracing::instrument;
27
28use crate::{
29 authority::AuthorityState, checkpoints::CheckpointStore,
30 epoch::committee_store::CommitteeStore, execution_cache::ExecutionCacheTraitPointers,
31 grpc_indexes::GrpcIndexesStore,
32};
33
34#[derive(Clone)]
35pub struct RocksDbStore {
36 cache_traits: ExecutionCacheTraitPointers,
37
38 committee_store: Arc<CommitteeStore>,
39 checkpoint_store: Arc<CheckpointStore>,
40 highest_verified_checkpoint: Arc<Mutex<Option<u64>>>,
42 highest_synced_checkpoint: Arc<Mutex<Option<u64>>>,
43}
44
45impl RocksDbStore {
46 pub fn new(
47 cache_traits: ExecutionCacheTraitPointers,
48 committee_store: Arc<CommitteeStore>,
49 checkpoint_store: Arc<CheckpointStore>,
50 ) -> Self {
51 Self {
52 cache_traits,
53 committee_store,
54 checkpoint_store,
55 highest_verified_checkpoint: Arc::new(Mutex::new(None)),
56 highest_synced_checkpoint: Arc::new(Mutex::new(None)),
57 }
58 }
59
60 pub fn get_objects(&self, object_keys: &[ObjectKey]) -> Result<Vec<Option<Object>>, IotaError> {
61 self.cache_traits
62 .object_cache_reader
63 .try_multi_get_objects_by_key(object_keys)
64 }
65
66 pub fn get_last_executed_checkpoint(&self) -> Result<Option<VerifiedCheckpoint>, IotaError> {
67 Ok(self.checkpoint_store.get_highest_executed_checkpoint()?)
68 }
69}
70
71impl ReadStore for RocksDbStore {
72 fn try_get_checkpoint_by_digest(
73 &self,
74 digest: &CheckpointDigest,
75 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
76 self.checkpoint_store
77 .get_checkpoint_by_digest(digest)
78 .map_err(Into::into)
79 }
80
81 fn try_get_checkpoint_by_sequence_number(
82 &self,
83 sequence_number: CheckpointSequenceNumber,
84 ) -> Result<Option<VerifiedCheckpoint>, StorageError> {
85 self.checkpoint_store
86 .get_checkpoint_by_sequence_number(sequence_number)
87 .map_err(Into::into)
88 }
89
90 fn try_get_highest_verified_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
91 self.checkpoint_store
92 .get_highest_verified_checkpoint()
93 .map(|maybe_checkpoint| {
94 maybe_checkpoint
95 .expect("storage should have been initialized with genesis checkpoint")
96 })
97 .map_err(Into::into)
98 }
99
100 fn try_get_highest_synced_checkpoint(&self) -> Result<VerifiedCheckpoint, StorageError> {
101 self.checkpoint_store
102 .get_highest_synced_checkpoint()
103 .map(|maybe_checkpoint| {
104 maybe_checkpoint
105 .expect("storage should have been initialized with genesis checkpoint")
106 })
107 .map_err(Into::into)
108 }
109
110 fn try_get_lowest_available_checkpoint(
111 &self,
112 ) -> Result<CheckpointSequenceNumber, StorageError> {
113 if let Some(highest_pruned_cp) = self
114 .checkpoint_store
115 .get_highest_pruned_checkpoint_seq_number()
116 .map_err(Into::<StorageError>::into)?
117 {
118 Ok(highest_pruned_cp + 1)
119 } else {
120 Ok(0)
121 }
122 }
123
124 fn try_get_full_checkpoint_contents_by_sequence_number(
125 &self,
126 sequence_number: CheckpointSequenceNumber,
127 ) -> Result<Option<FullCheckpointContents>, StorageError> {
128 self.checkpoint_store
129 .get_full_checkpoint_contents_by_sequence_number(sequence_number)
130 .map_err(Into::into)
131 }
132
133 fn try_get_full_checkpoint_contents(
134 &self,
135 digest: &CheckpointContentsDigest,
136 ) -> Result<Option<FullCheckpointContents>, StorageError> {
137 if let Some(seq_num) = self
139 .checkpoint_store
140 .get_sequence_number_by_contents_digest(digest)
141 .map_err(iota_types::storage::error::Error::custom)?
142 {
143 let contents = self
144 .checkpoint_store
145 .get_full_checkpoint_contents_by_sequence_number(seq_num)
146 .map_err(iota_types::storage::error::Error::custom)?;
147 if contents.is_some() {
148 return Ok(contents);
149 }
150 }
151
152 self.checkpoint_store
159 .get_checkpoint_contents(digest)
160 .map_err(iota_types::storage::error::Error::custom)?
161 .map(|contents| {
162 let mut transactions = Vec::with_capacity(contents.size());
163 for tx in contents.iter() {
164 if let (Some(t), Some(e)) = (
165 self.try_get_transaction(&tx.transaction)?,
166 self.cache_traits
167 .transaction_cache_reader
168 .try_get_effects(&tx.effects)
169 .map_err(iota_types::storage::error::Error::custom)?,
170 ) {
171 transactions.push(iota_types::base_types::ExecutionData::new(
172 (*t).clone().into_inner(),
173 e,
174 ))
175 } else {
176 return Result::<
177 Option<FullCheckpointContents>,
178 iota_types::storage::error::Error,
179 >::Ok(None);
180 }
181 }
182 Ok(Some(
183 FullCheckpointContents::from_contents_and_execution_data(
184 contents,
185 transactions.into_iter(),
186 ),
187 ))
188 })
189 .transpose()
190 .map(|contents| contents.flatten())
191 .map_err(iota_types::storage::error::Error::custom)
192 }
193
194 fn try_get_committee(
195 &self,
196 epoch: EpochId,
197 ) -> Result<Option<Arc<Committee>>, iota_types::storage::error::Error> {
198 Ok(self.committee_store.get_committee(&epoch).unwrap())
199 }
200
201 fn try_get_transaction(
202 &self,
203 digest: &TransactionDigest,
204 ) -> Result<Option<Arc<VerifiedTransaction>>, StorageError> {
205 self.cache_traits
206 .transaction_cache_reader
207 .try_get_transaction_block(digest)
208 .map_err(StorageError::custom)
209 }
210
211 fn try_get_transaction_effects(
212 &self,
213 digest: &TransactionDigest,
214 ) -> Result<Option<TransactionEffects>, StorageError> {
215 self.cache_traits
216 .transaction_cache_reader
217 .try_get_executed_effects(digest)
218 .map_err(StorageError::custom)
219 }
220
221 fn try_get_events(
222 &self,
223 digest: &TransactionDigest,
224 ) -> Result<Option<TransactionEvents>, StorageError> {
225 self.cache_traits
226 .transaction_cache_reader
227 .try_get_events(digest)
228 .map_err(StorageError::custom)
229 }
230
231 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
232 self.checkpoint_store
233 .get_highest_executed_checkpoint()
234 .map_err(iota_types::storage::error::Error::custom)?
235 .ok_or_else(|| {
236 iota_types::storage::error::Error::missing("unable to get latest checkpoint")
237 })
238 }
239
240 fn try_get_checkpoint_contents_by_digest(
241 &self,
242 digest: &CheckpointContentsDigest,
243 ) -> iota_types::storage::error::Result<
244 Option<iota_types::messages_checkpoint::CheckpointContents>,
245 > {
246 self.checkpoint_store
247 .get_checkpoint_contents(digest)
248 .map_err(iota_types::storage::error::Error::custom)
249 }
250
251 fn try_get_checkpoint_contents_by_sequence_number(
252 &self,
253 sequence_number: CheckpointSequenceNumber,
254 ) -> iota_types::storage::error::Result<
255 Option<iota_types::messages_checkpoint::CheckpointContents>,
256 > {
257 match self.try_get_checkpoint_by_sequence_number(sequence_number) {
258 Ok(Some(checkpoint)) => {
259 self.try_get_checkpoint_contents_by_digest(&checkpoint.content_digest)
260 }
261 Ok(None) => Ok(None),
262 Err(e) => Err(e),
263 }
264 }
265}
266
267impl ObjectStore for RocksDbStore {
268 fn try_get_object(
269 &self,
270 object_id: &iota_sdk_types::ObjectId,
271 ) -> iota_types::storage::error::Result<Option<Object>> {
272 self.cache_traits.object_store.try_get_object(object_id)
273 }
274
275 fn try_get_object_by_key(
276 &self,
277 object_id: &iota_sdk_types::ObjectId,
278 version: iota_types::base_types::VersionNumber,
279 ) -> iota_types::storage::error::Result<Option<Object>> {
280 self.cache_traits
281 .object_store
282 .try_get_object_by_key(object_id, version)
283 }
284}
285
286impl WriteStore for RocksDbStore {
287 #[instrument(level = "trace", skip_all)]
288 fn try_insert_checkpoint(
289 &self,
290 checkpoint: &VerifiedCheckpoint,
291 ) -> Result<(), iota_types::storage::error::Error> {
292 if let Some(EndOfEpochData {
293 next_epoch_committee,
294 ..
295 }) = checkpoint.end_of_epoch_data.as_ref()
296 {
297 let next_committee = next_epoch_committee.iter().cloned().collect();
298 let committee =
299 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
300 self.try_insert_committee(committee)?;
301 }
302
303 self.checkpoint_store
304 .insert_verified_checkpoint(checkpoint)
305 .map_err(Into::into)
306 }
307
308 fn try_update_highest_synced_checkpoint(
309 &self,
310 checkpoint: &VerifiedCheckpoint,
311 ) -> Result<(), iota_types::storage::error::Error> {
312 let mut locked = self.highest_synced_checkpoint.lock();
313 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
314 return Ok(());
315 }
316 self.checkpoint_store
317 .update_highest_synced_checkpoint(checkpoint)
318 .map_err(iota_types::storage::error::Error::custom)?;
319 *locked = Some(checkpoint.sequence_number);
320 Ok(())
321 }
322
323 fn try_update_highest_verified_checkpoint(
324 &self,
325 checkpoint: &VerifiedCheckpoint,
326 ) -> Result<(), iota_types::storage::error::Error> {
327 let mut locked = self.highest_verified_checkpoint.lock();
328 if locked.is_some() && locked.unwrap() >= checkpoint.sequence_number {
329 return Ok(());
330 }
331 self.checkpoint_store
332 .update_highest_verified_checkpoint(checkpoint)
333 .map_err(iota_types::storage::error::Error::custom)?;
334 *locked = Some(checkpoint.sequence_number);
335 Ok(())
336 }
337
338 fn try_insert_checkpoint_contents(
339 &self,
340 checkpoint: &VerifiedCheckpoint,
341 contents: VerifiedCheckpointContents,
342 ) -> Result<(), iota_types::storage::error::Error> {
343 self.cache_traits
344 .state_sync_store
345 .try_multi_insert_transaction_and_effects(contents.transactions())
346 .map_err(iota_types::storage::error::Error::custom)?;
347 self.checkpoint_store
348 .insert_verified_checkpoint_contents(checkpoint, contents)
349 .map_err(Into::into)
350 }
351
352 fn try_insert_committee(
353 &self,
354 new_committee: Committee,
355 ) -> Result<(), iota_types::storage::error::Error> {
356 self.committee_store
357 .insert_new_committee(&new_committee)
358 .unwrap();
359 Ok(())
360 }
361}
362
363pub struct GrpcReadStore {
364 state: Arc<AuthorityState>,
365 rocks: RocksDbStore,
366}
367
368impl GrpcReadStore {
369 pub fn new(state: Arc<AuthorityState>, rocks: RocksDbStore) -> Self {
370 Self { state, rocks }
371 }
372
373 fn grpc_indexes_store(&self) -> iota_types::storage::error::Result<&GrpcIndexesStore> {
374 self.state.grpc_indexes_store.as_deref().ok_or_else(|| {
375 iota_types::storage::error::Error::custom("gRPC index store is disabled")
376 })
377 }
378}
379
380impl ObjectStore for GrpcReadStore {
381 fn try_get_object(
382 &self,
383 object_id: &iota_sdk_types::ObjectId,
384 ) -> iota_types::storage::error::Result<Option<Object>> {
385 self.rocks.try_get_object(object_id)
386 }
387
388 fn try_get_object_by_key(
389 &self,
390 object_id: &iota_sdk_types::ObjectId,
391 version: iota_types::base_types::VersionNumber,
392 ) -> iota_types::storage::error::Result<Option<Object>> {
393 self.rocks.try_get_object_by_key(object_id, version)
394 }
395}
396
397impl ReadStore for GrpcReadStore {
398 fn try_get_committee(
399 &self,
400 epoch: EpochId,
401 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
402 self.rocks.try_get_committee(epoch)
403 }
404
405 fn try_get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
406 self.rocks.try_get_latest_checkpoint()
407 }
408
409 fn try_get_highest_verified_checkpoint(
410 &self,
411 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
412 self.rocks.try_get_highest_verified_checkpoint()
413 }
414
415 fn try_get_highest_synced_checkpoint(
416 &self,
417 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
418 self.rocks.try_get_highest_synced_checkpoint()
419 }
420
421 fn try_get_lowest_available_checkpoint(
422 &self,
423 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
424 self.rocks.try_get_lowest_available_checkpoint()
425 }
426
427 fn try_get_checkpoint_by_digest(
428 &self,
429 digest: &CheckpointDigest,
430 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
431 self.rocks.try_get_checkpoint_by_digest(digest)
432 }
433
434 fn try_get_checkpoint_by_sequence_number(
435 &self,
436 sequence_number: CheckpointSequenceNumber,
437 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
438 self.rocks
439 .try_get_checkpoint_by_sequence_number(sequence_number)
440 }
441
442 fn try_get_checkpoint_contents_by_digest(
443 &self,
444 digest: &CheckpointContentsDigest,
445 ) -> iota_types::storage::error::Result<
446 Option<iota_types::messages_checkpoint::CheckpointContents>,
447 > {
448 self.rocks.try_get_checkpoint_contents_by_digest(digest)
449 }
450
451 fn try_get_checkpoint_contents_by_sequence_number(
452 &self,
453 sequence_number: CheckpointSequenceNumber,
454 ) -> iota_types::storage::error::Result<
455 Option<iota_types::messages_checkpoint::CheckpointContents>,
456 > {
457 self.rocks
458 .try_get_checkpoint_contents_by_sequence_number(sequence_number)
459 }
460
461 fn try_get_transaction(
462 &self,
463 digest: &TransactionDigest,
464 ) -> iota_types::storage::error::Result<Option<Arc<VerifiedTransaction>>> {
465 self.rocks.try_get_transaction(digest)
466 }
467
468 fn try_get_transaction_effects(
469 &self,
470 digest: &TransactionDigest,
471 ) -> iota_types::storage::error::Result<Option<TransactionEffects>> {
472 self.rocks.try_get_transaction_effects(digest)
473 }
474
475 fn try_get_events(
476 &self,
477 digest: &TransactionDigest,
478 ) -> iota_types::storage::error::Result<Option<TransactionEvents>> {
479 self.rocks.try_get_events(digest)
480 }
481
482 fn try_get_full_checkpoint_contents_by_sequence_number(
483 &self,
484 sequence_number: CheckpointSequenceNumber,
485 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
486 self.rocks
487 .try_get_full_checkpoint_contents_by_sequence_number(sequence_number)
488 }
489
490 fn try_get_full_checkpoint_contents(
491 &self,
492 digest: &CheckpointContentsDigest,
493 ) -> iota_types::storage::error::Result<Option<FullCheckpointContents>> {
494 self.rocks.try_get_full_checkpoint_contents(digest)
495 }
496}
497
498impl GrpcStateReader for GrpcReadStore {
499 fn get_lowest_available_checkpoint_objects(
500 &self,
501 ) -> iota_types::storage::error::Result<CheckpointSequenceNumber> {
502 Ok(self
503 .state
504 .get_object_cache_reader()
505 .try_get_highest_pruned_checkpoint()
506 .map_err(StorageError::custom)?
507 .map(|cp| cp + 1)
508 .unwrap_or(0))
509 }
510
511 fn get_chain_identifier(&self) -> Result<iota_types::digests::ChainIdentifier> {
512 Ok(self.state.get_chain_identifier())
513 }
514
515 fn get_epoch_last_checkpoint(
516 &self,
517 epoch_id: EpochId,
518 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
519 self.rocks
520 .checkpoint_store
521 .get_epoch_last_checkpoint(epoch_id)
522 .map_err(iota_types::storage::error::Error::custom)
523 }
524
525 fn grpc_indexes(&self) -> Option<&dyn GrpcIndexes> {
526 self.grpc_indexes_store().ok().map(|index| index as _)
527 }
528
529 fn get_struct_layout(
530 &self,
531 struct_tag: &StructTag,
532 ) -> Result<Option<move_core_types::annotated_value::MoveTypeLayout>> {
533 self.state
534 .load_epoch_store_one_call_per_task()
535 .executor()
536 .type_layout_resolver(Box::new(self.state.get_backing_package_store().as_ref()))
538 .get_annotated_layout(struct_tag)
539 .map(|layout| layout.into_layout())
540 .map(Some)
541 .map_err(StorageError::custom)
542 }
543}