1use std::path::Path;
6
7use iota_types::{
8 accumulator::Accumulator, base_types::SequenceNumber, digests::TransactionEventsDigest,
9 effects::TransactionEffects, storage::MarkerValue,
10};
11use serde::{Deserialize, Serialize};
12use typed_store::{
13 DBMapUtils,
14 metrics::SamplingInterval,
15 rocks::{
16 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
17 read_size_from_env,
18 util::{empty_compaction_filter, reference_count_merge_operator},
19 },
20 traits::{Map, TableSummary, TypedStoreDebug},
21};
22
23use super::*;
24use crate::authority::{
25 authority_store_types::{
26 ObjectContentDigest, StoreData, StoreMoveObjectWrapper, StoreObject, StoreObjectPair,
27 StoreObjectValue, StoreObjectWrapper, get_store_object_pair, try_construct_object,
28 },
29 epoch_start_configuration::EpochStartConfiguration,
30};
31
32const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
33pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
34const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
35const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
36const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
37const ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE: &str = "INDIRECT_OBJECTS_BLOCK_CACHE_MB";
38
39#[derive(Default)]
41pub struct AuthorityPerpetualTablesOptions {
42 pub enable_write_stall: bool,
44}
45
46impl AuthorityPerpetualTablesOptions {
47 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
48 if !self.enable_write_stall {
49 db_options = db_options.disable_write_throttling();
50 }
51 db_options
52 }
53}
54
55#[derive(DBMapUtils)]
58pub struct AuthorityPerpetualTables {
59 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
75
76 pub(crate) indirect_move_objects: DBMap<ObjectContentDigest, StoreMoveObjectWrapper>,
77
78 pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
80
81 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
86
87 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
100
101 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
107
108 pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
113
114 pub(crate) executed_transactions_to_checkpoint:
119 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
120
121 pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
125
126 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
128
129 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
132
133 pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
137
138 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
143
144 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
152}
153
154#[derive(Debug, Serialize, Deserialize)]
156pub(crate) struct TotalIotaSupplyCheck {
157 pub(crate) total_supply: u64,
159 pub(crate) last_check_epoch: EpochId,
161}
162
163impl AuthorityPerpetualTables {
164 pub fn path(parent_path: &Path) -> PathBuf {
165 parent_path.join("perpetual")
166 }
167
168 pub fn open(
169 parent_path: &Path,
170 db_options_override: Option<AuthorityPerpetualTablesOptions>,
171 ) -> Self {
172 let db_options_override = db_options_override.unwrap_or_default();
173 let db_options =
174 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
175 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
176 (
177 "objects".to_string(),
178 objects_table_config(db_options.clone()),
179 ),
180 (
181 "indirect_move_objects".to_string(),
182 indirect_move_objects_table_config(db_options.clone()),
183 ),
184 (
185 "live_owned_object_markers".to_string(),
186 live_owned_object_markers_table_config(db_options.clone()),
187 ),
188 (
189 "transactions".to_string(),
190 transactions_table_config(db_options.clone()),
191 ),
192 (
193 "effects".to_string(),
194 effects_table_config(db_options.clone()),
195 ),
196 (
197 "events".to_string(),
198 events_table_config(db_options.clone()),
199 ),
200 ]));
201 Self::open_tables_read_write(
202 Self::path(parent_path),
203 MetricConf::new("perpetual")
204 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
205 Some(db_options.options),
206 Some(table_options),
207 )
208 }
209
210 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
211 Self::get_read_only_handle(
212 Self::path(parent_path),
213 None,
214 None,
215 MetricConf::new("perpetual_readonly"),
216 )
217 }
218
219 pub fn find_object_lt_or_eq_version(
224 &self,
225 object_id: ObjectID,
226 version: SequenceNumber,
227 ) -> IotaResult<Option<Object>> {
228 let iter = self
229 .objects
230 .safe_range_iter(ObjectKey::min_for_id(&object_id)..=ObjectKey::max_for_id(&object_id))
231 .skip_prior_to(&ObjectKey(object_id, version))?;
232 match iter.reverse().next() {
233 Some(Ok((key, o))) => self.object(&key, o),
234 Some(Err(e)) => Err(e.into()),
235 None => Ok(None),
236 }
237 }
238
239 fn construct_object(
240 &self,
241 object_key: &ObjectKey,
242 store_object: StoreObjectValue,
243 ) -> Result<Object, IotaError> {
244 let indirect_object = match store_object.data {
245 StoreData::IndirectObject(ref metadata) => self
246 .indirect_move_objects
247 .get(&metadata.digest)?
248 .map(|o| o.migrate().into_inner()),
249 _ => None,
250 };
251 try_construct_object(object_key, store_object, indirect_object)
252 }
253
254 pub fn object(
257 &self,
258 object_key: &ObjectKey,
259 store_object: StoreObjectWrapper,
260 ) -> Result<Option<Object>, IotaError> {
261 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
262 return Ok(None);
263 };
264 Ok(Some(self.construct_object(object_key, store_object)?))
265 }
266
267 pub fn object_reference(
268 &self,
269 object_key: &ObjectKey,
270 store_object: StoreObjectWrapper,
271 ) -> Result<ObjectRef, IotaError> {
272 let obj_ref = match store_object.migrate().into_inner() {
273 StoreObject::Value(object) => self
274 .construct_object(object_key, object)?
275 .compute_object_reference(),
276 StoreObject::Deleted => (
277 object_key.0,
278 object_key.1,
279 ObjectDigest::OBJECT_DIGEST_DELETED,
280 ),
281 StoreObject::Wrapped => (
282 object_key.0,
283 object_key.1,
284 ObjectDigest::OBJECT_DIGEST_WRAPPED,
285 ),
286 };
287 Ok(obj_ref)
288 }
289
290 pub fn tombstone_reference(
291 &self,
292 object_key: &ObjectKey,
293 store_object: &StoreObjectWrapper,
294 ) -> Result<Option<ObjectRef>, IotaError> {
295 let obj_ref = match store_object.inner() {
296 StoreObject::Deleted => Some((
297 object_key.0,
298 object_key.1,
299 ObjectDigest::OBJECT_DIGEST_DELETED,
300 )),
301 StoreObject::Wrapped => Some((
302 object_key.0,
303 object_key.1,
304 ObjectDigest::OBJECT_DIGEST_WRAPPED,
305 )),
306 _ => None,
307 };
308 Ok(obj_ref)
309 }
310
311 pub fn get_latest_object_ref_or_tombstone(
312 &self,
313 object_id: ObjectID,
314 ) -> Result<Option<ObjectRef>, IotaError> {
315 let mut iterator = self
316 .objects
317 .unbounded_iter()
318 .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
319
320 if let Some((object_key, value)) = iterator.next() {
321 if object_key.0 == object_id {
322 return Ok(Some(self.object_reference(&object_key, value)?));
323 }
324 }
325 Ok(None)
326 }
327
328 pub fn get_latest_object_or_tombstone(
329 &self,
330 object_id: ObjectID,
331 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
332 let mut iterator = self
333 .objects
334 .unbounded_iter()
335 .skip_prior_to(&ObjectKey::max_for_id(&object_id))?;
336
337 if let Some((object_key, value)) = iterator.next() {
338 if object_key.0 == object_id {
339 return Ok(Some((object_key, value)));
340 }
341 }
342 Ok(None)
343 }
344
345 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
346 Ok(self
347 .epoch_start_configuration
348 .get(&())?
349 .expect("Must have current epoch.")
350 .epoch_start_state()
351 .epoch())
352 }
353
354 pub fn set_epoch_start_configuration(
355 &self,
356 epoch_start_configuration: &EpochStartConfiguration,
357 ) -> IotaResult {
358 let mut wb = self.epoch_start_configuration.batch();
359 wb.insert_batch(
360 &self.epoch_start_configuration,
361 std::iter::once(((), epoch_start_configuration)),
362 )?;
363 wb.write()?;
364 Ok(())
365 }
366
367 pub fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
368 Ok(self.pruned_checkpoint.get(&())?.unwrap_or_default())
369 }
370
371 pub fn set_highest_pruned_checkpoint(
372 &self,
373 wb: &mut DBBatch,
374 checkpoint_number: CheckpointSequenceNumber,
375 ) -> IotaResult {
376 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
377 Ok(())
378 }
379
380 pub fn get_transaction(
381 &self,
382 digest: &TransactionDigest,
383 ) -> IotaResult<Option<TrustedTransaction>> {
384 let Some(transaction) = self.transactions.get(digest)? else {
385 return Ok(None);
386 };
387 Ok(Some(transaction))
388 }
389
390 pub fn get_effects(
391 &self,
392 digest: &TransactionDigest,
393 ) -> IotaResult<Option<TransactionEffects>> {
394 let Some(effect_digest) = self.executed_effects.get(digest)? else {
395 return Ok(None);
396 };
397 Ok(self.effects.get(&effect_digest)?)
398 }
399
400 pub fn get_checkpoint_sequence_number(
401 &self,
402 digest: &TransactionDigest,
403 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
404 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
405 }
406
407 pub fn get_newer_object_keys(
408 &self,
409 object: &(ObjectID, SequenceNumber),
410 ) -> IotaResult<Vec<ObjectKey>> {
411 let mut objects = vec![];
412 for result in self.objects.safe_iter_with_bounds(
413 Some(ObjectKey(object.0, object.1.next())),
414 Some(ObjectKey(object.0, VersionNumber::MAX)),
415 ) {
416 let (key, _) = result?;
417 objects.push(key);
418 }
419 Ok(objects)
420 }
421
422 pub fn set_highest_pruned_checkpoint_without_wb(
423 &self,
424 checkpoint_number: CheckpointSequenceNumber,
425 ) -> IotaResult {
426 let mut wb = self.pruned_checkpoint.batch();
427 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
428 wb.write()?;
429 Ok(())
430 }
431
432 pub fn database_is_empty(&self) -> IotaResult<bool> {
433 Ok(self
434 .objects
435 .unbounded_iter()
436 .skip_to(&ObjectKey::ZERO)?
437 .next()
438 .is_none())
439 }
440
441 pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
442 LiveSetIter {
443 iter: self.objects.unbounded_iter(),
444 tables: self,
445 prev: None,
446 }
447 }
448
449 pub fn range_iter_live_object_set(
450 &self,
451 lower_bound: Option<ObjectID>,
452 upper_bound: Option<ObjectID>,
453 ) -> LiveSetIter<'_> {
454 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
455 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
456
457 LiveSetIter {
458 iter: self.objects.iter_with_bounds(lower_bound, upper_bound),
459 tables: self,
460 prev: None,
461 }
462 }
463
464 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
465 self.objects.checkpoint_db(path).map_err(Into::into)
467 }
468
469 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
470 self.objects.unsafe_clear()?;
472 self.indirect_move_objects.unsafe_clear()?;
473 self.live_owned_object_markers.unsafe_clear()?;
474 self.executed_effects.unsafe_clear()?;
475 self.events.unsafe_clear()?;
476 self.executed_transactions_to_checkpoint.unsafe_clear()?;
477 self.root_state_hash_by_epoch.unsafe_clear()?;
478 self.epoch_start_configuration.unsafe_clear()?;
479 self.pruned_checkpoint.unsafe_clear()?;
480 self.total_iota_supply.unsafe_clear()?;
481 self.expected_storage_fund_imbalance.unsafe_clear()?;
482 self.object_per_epoch_marker_table.unsafe_clear()?;
483 self.objects.rocksdb.flush()?;
484 Ok(())
485 }
486
487 pub fn get_root_state_hash(
488 &self,
489 epoch: EpochId,
490 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
491 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
492 }
493
494 pub fn insert_root_state_hash(
495 &self,
496 epoch: EpochId,
497 last_checkpoint_of_epoch: CheckpointSequenceNumber,
498 accumulator: Accumulator,
499 ) -> IotaResult {
500 self.root_state_hash_by_epoch
501 .insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
502 Ok(())
503 }
504
505 pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
506 let object_reference = object.compute_object_reference();
507 let StoreObjectPair(wrapper, _indirect_object) = get_store_object_pair(object, usize::MAX);
508 let mut wb = self.objects.batch();
509 wb.insert_batch(
510 &self.objects,
511 std::iter::once((ObjectKey::from(object_reference), wrapper)),
512 )?;
513 wb.write()?;
514 Ok(())
515 }
516}
517
518impl ObjectStore for AuthorityPerpetualTables {
519 fn get_object(
521 &self,
522 object_id: &ObjectID,
523 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
524 let obj_entry = self
525 .objects
526 .unbounded_iter()
527 .skip_prior_to(&ObjectKey::max_for_id(object_id))
528 .map_err(iota_types::storage::error::Error::custom)?
529 .next();
530
531 match obj_entry {
532 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
533 .object(&ObjectKey(obj_id, version), obj)
534 .map_err(iota_types::storage::error::Error::custom)?),
535 _ => Ok(None),
536 }
537 }
538
539 fn get_object_by_key(
540 &self,
541 object_id: &ObjectID,
542 version: VersionNumber,
543 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
544 Ok(self
545 .objects
546 .get(&ObjectKey(*object_id, version))
547 .map_err(iota_types::storage::error::Error::custom)?
548 .map(|object| self.object(&ObjectKey(*object_id, version), object))
549 .transpose()
550 .map_err(iota_types::storage::error::Error::custom)?
551 .flatten())
552 }
553}
554
555pub struct LiveSetIter<'a> {
556 iter:
557 <DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
558 tables: &'a AuthorityPerpetualTables,
559 prev: Option<(ObjectKey, StoreObjectWrapper)>,
560}
561
562#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
563pub enum LiveObject {
564 Normal(Object),
565 Wrapped(ObjectKey),
566}
567
568impl LiveObject {
569 pub fn object_id(&self) -> ObjectID {
570 match self {
571 LiveObject::Normal(obj) => obj.id(),
572 LiveObject::Wrapped(key) => key.0,
573 }
574 }
575
576 pub fn version(&self) -> SequenceNumber {
577 match self {
578 LiveObject::Normal(obj) => obj.version(),
579 LiveObject::Wrapped(key) => key.1,
580 }
581 }
582
583 pub fn object_reference(&self) -> ObjectRef {
584 match self {
585 LiveObject::Normal(obj) => obj.compute_object_reference(),
586 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
587 }
588 }
589
590 pub fn to_normal(self) -> Option<Object> {
591 match self {
592 LiveObject::Normal(object) => Some(object),
593 LiveObject::Wrapped(_) => None,
594 }
595 }
596}
597
598impl LiveSetIter<'_> {
599 fn store_object_wrapper_to_live_object(
600 &self,
601 object_key: ObjectKey,
602 store_object: StoreObjectWrapper,
603 ) -> Option<LiveObject> {
604 match store_object.migrate().into_inner() {
605 StoreObject::Value(object) => {
606 let object = self
607 .tables
608 .construct_object(&object_key, object)
609 .expect("Constructing object from store cannot fail");
610 Some(LiveObject::Normal(object))
611 }
612 StoreObject::Wrapped | StoreObject::Deleted => None,
613 }
614 }
615}
616
617impl Iterator for LiveSetIter<'_> {
618 type Item = LiveObject;
619
620 fn next(&mut self) -> Option<Self::Item> {
621 loop {
622 if let Some((next_key, next_value)) = self.iter.next() {
623 let prev = self.prev.take();
624 self.prev = Some((next_key, next_value));
625
626 if let Some((prev_key, prev_value)) = prev {
627 if prev_key.0 != next_key.0 {
628 let live_object =
629 self.store_object_wrapper_to_live_object(prev_key, prev_value);
630 if live_object.is_some() {
631 return live_object;
632 }
633 }
634 }
635 continue;
636 }
637 if let Some((key, value)) = self.prev.take() {
638 let live_object = self.store_object_wrapper_to_live_object(key, value);
639 if live_object.is_some() {
640 return live_object;
641 }
642 }
643 return None;
644 }
645 }
646}
647
648fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
650 DBOptions {
651 options: db_options
652 .clone()
653 .optimize_for_write_throughput()
654 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
655 .options,
656 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
657 }
658}
659
660fn objects_table_config(db_options: DBOptions) -> DBOptions {
661 db_options
662 .optimize_for_write_throughput()
663 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
664}
665
666fn transactions_table_config(db_options: DBOptions) -> DBOptions {
667 db_options
668 .optimize_for_write_throughput()
669 .optimize_for_point_lookup(
670 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
671 )
672}
673
674fn effects_table_config(db_options: DBOptions) -> DBOptions {
675 db_options
676 .optimize_for_write_throughput()
677 .optimize_for_point_lookup(
678 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
679 )
680}
681
682fn events_table_config(db_options: DBOptions) -> DBOptions {
683 db_options
684 .optimize_for_write_throughput()
685 .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
686}
687
688fn indirect_move_objects_table_config(mut db_options: DBOptions) -> DBOptions {
689 db_options = db_options
690 .optimize_for_write_throughput()
691 .optimize_for_point_lookup(
692 read_size_from_env(ENV_VAR_INDIRECT_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(512),
693 );
694 db_options.options.set_merge_operator(
695 "refcount operator",
696 reference_count_merge_operator,
697 reference_count_merge_operator,
698 );
699 db_options
700 .options
701 .set_compaction_filter("empty filter", empty_compaction_filter);
702 db_options
703}