1use std::path::Path;
6
7use iota_types::{
8 accumulator::Accumulator, base_types::SequenceNumber, digests::TransactionEventsDigest,
9 effects::TransactionEffects, storage::MarkerValue,
10};
11use serde::{Deserialize, Serialize};
12use typed_store::{
13 DBMapUtils,
14 metrics::SamplingInterval,
15 rocks::{
16 DBBatch, DBMap, DBOptions, MetricConf, ReadWriteOptions, default_db_options,
17 read_size_from_env,
18 util::{empty_compaction_filter, reference_count_merge_operator},
19 },
20 rocksdb::Options,
21 traits::{Map, TableSummary, TypedStoreDebug},
22};
23
24use super::*;
25use crate::authority::{
26 authority_store_types::{
27 ObjectContentDigest, StoreData, StoreMoveObjectWrapper, StoreObject, StoreObjectPair,
28 StoreObjectValue, StoreObjectWrapper, get_store_object_pair, try_construct_object,
29 },
30 epoch_start_configuration::EpochStartConfiguration,
31};
32
33const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
34pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
35const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
36const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
37const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
38const ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE: &str = "INDIRECT_OBJECTS_BLOCK_CACHE_MB";
39
40#[derive(DBMapUtils)]
43pub struct AuthorityPerpetualTables {
44 #[default_options_override_fn = "objects_table_default_config"]
60 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
61
62 #[default_options_override_fn = "indirect_move_objects_table_default_config"]
63 pub(crate) indirect_move_objects: DBMap<ObjectContentDigest, StoreMoveObjectWrapper>,
64
65 #[default_options_override_fn = "live_owned_object_markers_table_default_config"]
67 pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
68
69 #[default_options_override_fn = "transactions_table_default_config"]
74 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
75
76 #[default_options_override_fn = "effects_table_default_config"]
89 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
90
91 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
97
98 #[default_options_override_fn = "events_table_default_config"]
103 pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
104
105 pub(crate) executed_transactions_to_checkpoint:
110 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
111
112 pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
116
117 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
119
120 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
123
124 pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
128
129 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
134
135 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
143}
144
145#[derive(Debug, Serialize, Deserialize)]
147pub(crate) struct TotalIotaSupplyCheck {
148 pub(crate) total_supply: u64,
150 pub(crate) last_check_epoch: EpochId,
152}
153
154impl AuthorityPerpetualTables {
155 pub fn path(parent_path: &Path) -> PathBuf {
156 parent_path.join("perpetual")
157 }
158
159 pub fn open(parent_path: &Path, db_options: Option<Options>) -> Self {
160 Self::open_tables_read_write(
161 Self::path(parent_path),
162 MetricConf::new("perpetual")
163 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
164 db_options,
165 None,
166 )
167 }
168
169 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
170 Self::get_read_only_handle(
171 Self::path(parent_path),
172 None,
173 None,
174 MetricConf::new("perpetual_readonly"),
175 )
176 }
177
178 pub fn find_object_lt_or_eq_version(
183 &self,
184 object_id: ObjectID,
185 version: SequenceNumber,
186 ) -> IotaResult<Option<Object>> {
187 let iter = self
188 .objects
189 .safe_range_iter(ObjectKey::min_for_id(&object_id)..=ObjectKey::max_for_id(&object_id))
190 .skip_prior_to(&ObjectKey(object_id, version))?;
191 match iter.reverse().next() {
192 Some(Ok((key, o))) => self.object(&key, o),
193 Some(Err(e)) => Err(e.into()),
194 None => Ok(None),
195 }
196 }
197
198 fn construct_object(
199 &self,
200 object_key: &ObjectKey,
201 store_object: StoreObjectValue,
202 ) -> Result<Object, IotaError> {
203 let indirect_object = match store_object.data {
204 StoreData::IndirectObject(ref metadata) => self
205 .indirect_move_objects
206 .get(&metadata.digest)?
207 .map(|o| o.migrate().into_inner()),
208 _ => None,
209 };
210 try_construct_object(object_key, store_object, indirect_object)
211 }
212
213 pub fn object(
216 &self,
217 object_key: &ObjectKey,
218 store_object: StoreObjectWrapper,
219 ) -> Result<Option<Object>, IotaError> {
220 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
221 return Ok(None);
222 };
223 Ok(Some(self.construct_object(object_key, store_object)?))
224 }
225
226 pub fn object_reference(
227 &self,
228 object_key: &ObjectKey,
229 store_object: StoreObjectWrapper,
230 ) -> Result<ObjectRef, IotaError> {
231 let obj_ref = match store_object.migrate().into_inner() {
232 StoreObject::Value(object) => self
233 .construct_object(object_key, object)?
234 .compute_object_reference(),
235 StoreObject::Deleted => (
236 object_key.0,
237 object_key.1,
238 ObjectDigest::OBJECT_DIGEST_DELETED,
239 ),
240 StoreObject::Wrapped => (
241 object_key.0,
242 object_key.1,
243 ObjectDigest::OBJECT_DIGEST_WRAPPED,
244 ),
245 };
246 Ok(obj_ref)
247 }
248
249 pub fn tombstone_reference(
250 &self,
251 object_key: &ObjectKey,
252 store_object: &StoreObjectWrapper,
253 ) -> Result<Option<ObjectRef>, IotaError> {
254 let obj_ref = match store_object.inner() {
255 StoreObject::Deleted => Some((
256 object_key.0,
257 object_key.1,
258 ObjectDigest::OBJECT_DIGEST_DELETED,
259 )),
260 StoreObject::Wrapped => Some((
261 object_key.0,
262 object_key.1,
263 ObjectDigest::OBJECT_DIGEST_WRAPPED,
264 )),
265 _ => None,
266 };
267 Ok(obj_ref)
268 }
269
270 pub fn get_latest_object_ref_or_tombstone(
271 &self,
272 object_id: ObjectID,
273 ) -> Result<Option<ObjectRef>, IotaError> {
274 let mut iterator = self
275 .objects
276 .unbounded_iter()
277 .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
278
279 if let Some((object_key, value)) = iterator.next() {
280 if object_key.0 == object_id {
281 return Ok(Some(self.object_reference(&object_key, value)?));
282 }
283 }
284 Ok(None)
285 }
286
287 pub fn get_latest_object_or_tombstone(
288 &self,
289 object_id: ObjectID,
290 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
291 let mut iterator = self
292 .objects
293 .unbounded_iter()
294 .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
295
296 if let Some((object_key, value)) = iterator.next() {
297 if object_key.0 == object_id {
298 return Ok(Some((object_key, value)));
299 }
300 }
301 Ok(None)
302 }
303
304 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
305 Ok(self
306 .epoch_start_configuration
307 .get(&())?
308 .expect("Must have current epoch.")
309 .epoch_start_state()
310 .epoch())
311 }
312
313 pub fn set_epoch_start_configuration(
314 &self,
315 epoch_start_configuration: &EpochStartConfiguration,
316 ) -> IotaResult {
317 let mut wb = self.epoch_start_configuration.batch();
318 wb.insert_batch(
319 &self.epoch_start_configuration,
320 std::iter::once(((), epoch_start_configuration)),
321 )?;
322 wb.write()?;
323 Ok(())
324 }
325
326 pub fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
327 Ok(self.pruned_checkpoint.get(&())?.unwrap_or_default())
328 }
329
330 pub fn set_highest_pruned_checkpoint(
331 &self,
332 wb: &mut DBBatch,
333 checkpoint_number: CheckpointSequenceNumber,
334 ) -> IotaResult {
335 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
336 Ok(())
337 }
338
339 pub fn get_transaction(
340 &self,
341 digest: &TransactionDigest,
342 ) -> IotaResult<Option<TrustedTransaction>> {
343 let Some(transaction) = self.transactions.get(digest)? else {
344 return Ok(None);
345 };
346 Ok(Some(transaction))
347 }
348
349 pub fn get_effects(
350 &self,
351 digest: &TransactionDigest,
352 ) -> IotaResult<Option<TransactionEffects>> {
353 let Some(effect_digest) = self.executed_effects.get(digest)? else {
354 return Ok(None);
355 };
356 Ok(self.effects.get(&effect_digest)?)
357 }
358
359 pub fn get_checkpoint_sequence_number(
360 &self,
361 digest: &TransactionDigest,
362 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
363 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
364 }
365
366 pub fn get_newer_object_keys(
367 &self,
368 object: &(ObjectID, SequenceNumber),
369 ) -> IotaResult<Vec<ObjectKey>> {
370 let mut objects = vec![];
371 for result in self.objects.safe_iter_with_bounds(
372 Some(ObjectKey(object.0, object.1.next())),
373 Some(ObjectKey(object.0, VersionNumber::MAX)),
374 ) {
375 let (key, _) = result?;
376 objects.push(key);
377 }
378 Ok(objects)
379 }
380
381 pub fn set_highest_pruned_checkpoint_without_wb(
382 &self,
383 checkpoint_number: CheckpointSequenceNumber,
384 ) -> IotaResult {
385 let mut wb = self.pruned_checkpoint.batch();
386 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
387 wb.write()?;
388 Ok(())
389 }
390
391 pub fn database_is_empty(&self) -> IotaResult<bool> {
392 Ok(self
393 .objects
394 .unbounded_iter()
395 .skip_to(&ObjectKey::ZERO)?
396 .next()
397 .is_none())
398 }
399
400 pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
401 LiveSetIter {
402 iter: self.objects.unbounded_iter(),
403 tables: self,
404 prev: None,
405 }
406 }
407
408 pub fn range_iter_live_object_set(
409 &self,
410 lower_bound: Option<ObjectID>,
411 upper_bound: Option<ObjectID>,
412 ) -> LiveSetIter<'_> {
413 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
414 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
415
416 LiveSetIter {
417 iter: self.objects.iter_with_bounds(lower_bound, upper_bound),
418 tables: self,
419 prev: None,
420 }
421 }
422
423 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
424 self.objects.checkpoint_db(path).map_err(Into::into)
426 }
427
428 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
429 self.objects.unsafe_clear()?;
431 self.indirect_move_objects.unsafe_clear()?;
432 self.live_owned_object_markers.unsafe_clear()?;
433 self.executed_effects.unsafe_clear()?;
434 self.events.unsafe_clear()?;
435 self.executed_transactions_to_checkpoint.unsafe_clear()?;
436 self.root_state_hash_by_epoch.unsafe_clear()?;
437 self.epoch_start_configuration.unsafe_clear()?;
438 self.pruned_checkpoint.unsafe_clear()?;
439 self.total_iota_supply.unsafe_clear()?;
440 self.expected_storage_fund_imbalance.unsafe_clear()?;
441 self.object_per_epoch_marker_table.unsafe_clear()?;
442 self.objects.rocksdb.flush()?;
443 Ok(())
444 }
445
446 pub fn get_root_state_hash(
447 &self,
448 epoch: EpochId,
449 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
450 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
451 }
452
453 pub fn insert_root_state_hash(
454 &self,
455 epoch: EpochId,
456 last_checkpoint_of_epoch: CheckpointSequenceNumber,
457 accumulator: Accumulator,
458 ) -> IotaResult {
459 self.root_state_hash_by_epoch
460 .insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
461 Ok(())
462 }
463
464 pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
465 let object_reference = object.compute_object_reference();
466 let StoreObjectPair(wrapper, _indirect_object) = get_store_object_pair(object, usize::MAX);
467 let mut wb = self.objects.batch();
468 wb.insert_batch(
469 &self.objects,
470 std::iter::once((ObjectKey::from(object_reference), wrapper)),
471 )?;
472 wb.write()?;
473 Ok(())
474 }
475}
476
477impl ObjectStore for AuthorityPerpetualTables {
478 fn get_object(
480 &self,
481 object_id: &ObjectID,
482 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
483 let obj_entry = self
484 .objects
485 .unbounded_iter()
486 .skip_prior_to(&ObjectKey::max_for_id(object_id))
487 .map_err(iota_types::storage::error::Error::custom)?
488 .next();
489
490 match obj_entry {
491 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
492 .object(&ObjectKey(obj_id, version), obj)
493 .map_err(iota_types::storage::error::Error::custom)?),
494 _ => Ok(None),
495 }
496 }
497
498 fn get_object_by_key(
499 &self,
500 object_id: &ObjectID,
501 version: VersionNumber,
502 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
503 Ok(self
504 .objects
505 .get(&ObjectKey(*object_id, version))
506 .map_err(iota_types::storage::error::Error::custom)?
507 .map(|object| self.object(&ObjectKey(*object_id, version), object))
508 .transpose()
509 .map_err(iota_types::storage::error::Error::custom)?
510 .flatten())
511 }
512}
513
514pub struct LiveSetIter<'a> {
515 iter:
516 <DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
517 tables: &'a AuthorityPerpetualTables,
518 prev: Option<(ObjectKey, StoreObjectWrapper)>,
519}
520
521#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
522pub enum LiveObject {
523 Normal(Object),
524 Wrapped(ObjectKey),
525}
526
527impl LiveObject {
528 pub fn object_id(&self) -> ObjectID {
529 match self {
530 LiveObject::Normal(obj) => obj.id(),
531 LiveObject::Wrapped(key) => key.0,
532 }
533 }
534
535 pub fn version(&self) -> SequenceNumber {
536 match self {
537 LiveObject::Normal(obj) => obj.version(),
538 LiveObject::Wrapped(key) => key.1,
539 }
540 }
541
542 pub fn object_reference(&self) -> ObjectRef {
543 match self {
544 LiveObject::Normal(obj) => obj.compute_object_reference(),
545 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
546 }
547 }
548
549 pub fn to_normal(self) -> Option<Object> {
550 match self {
551 LiveObject::Normal(object) => Some(object),
552 LiveObject::Wrapped(_) => None,
553 }
554 }
555}
556
557impl LiveSetIter<'_> {
558 fn store_object_wrapper_to_live_object(
559 &self,
560 object_key: ObjectKey,
561 store_object: StoreObjectWrapper,
562 ) -> Option<LiveObject> {
563 match store_object.migrate().into_inner() {
564 StoreObject::Value(object) => {
565 let object = self
566 .tables
567 .construct_object(&object_key, object)
568 .expect("Constructing object from store cannot fail");
569 Some(LiveObject::Normal(object))
570 }
571 StoreObject::Wrapped | StoreObject::Deleted => None,
572 }
573 }
574}
575
576impl Iterator for LiveSetIter<'_> {
577 type Item = LiveObject;
578
579 fn next(&mut self) -> Option<Self::Item> {
580 loop {
581 if let Some((next_key, next_value)) = self.iter.next() {
582 let prev = self.prev.take();
583 self.prev = Some((next_key, next_value));
584
585 if let Some((prev_key, prev_value)) = prev {
586 if prev_key.0 != next_key.0 {
587 let live_object =
588 self.store_object_wrapper_to_live_object(prev_key, prev_value);
589 if live_object.is_some() {
590 return live_object;
591 }
592 }
593 }
594 continue;
595 }
596 if let Some((key, value)) = self.prev.take() {
597 let live_object = self.store_object_wrapper_to_live_object(key, value);
598 if live_object.is_some() {
599 return live_object;
600 }
601 }
602 return None;
603 }
604 }
605}
606
607fn live_owned_object_markers_table_default_config() -> DBOptions {
609 DBOptions {
610 options: default_db_options()
611 .optimize_for_write_throughput()
612 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
613 .options,
614 rw_options: ReadWriteOptions::default().set_ignore_range_deletions(false),
615 }
616}
617
618fn objects_table_default_config() -> DBOptions {
619 default_db_options()
620 .optimize_for_write_throughput()
621 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
622}
623
624fn transactions_table_default_config() -> DBOptions {
625 default_db_options()
626 .optimize_for_write_throughput()
627 .optimize_for_point_lookup(
628 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
629 )
630}
631
632fn effects_table_default_config() -> DBOptions {
633 default_db_options()
634 .optimize_for_write_throughput()
635 .optimize_for_point_lookup(
636 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
637 )
638}
639
640fn events_table_default_config() -> DBOptions {
641 default_db_options()
642 .optimize_for_write_throughput()
643 .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
644}
645
646fn indirect_move_objects_table_default_config() -> DBOptions {
647 let mut options = default_db_options()
648 .optimize_for_write_throughput()
649 .optimize_for_point_lookup(
650 read_size_from_env(ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(512),
651 );
652 options.options.set_merge_operator(
653 "refcount operator",
654 reference_count_merge_operator,
655 reference_count_merge_operator,
656 );
657 options
658 .options
659 .set_compaction_filter("empty filter", empty_compaction_filter);
660 options
661}