1use std::path::Path;
6
7use iota_types::{
8 accumulator::Accumulator, base_types::SequenceNumber, digests::TransactionEventsDigest,
9 effects::TransactionEffects, storage::MarkerValue,
10};
11use serde::{Deserialize, Serialize};
12use tracing::error;
13use typed_store::{
14 DBMapUtils,
15 metrics::SamplingInterval,
16 rocks::{
17 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
18 read_size_from_env,
19 },
20 rocksdb::compaction_filter::Decision,
21 traits::{Map, TableSummary, TypedStoreDebug},
22};
23
24use super::*;
25use crate::authority::{
26 authority_store_pruner::ObjectsCompactionFilter,
27 authority_store_types::{
28 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
29 },
30 epoch_start_configuration::EpochStartConfiguration,
31};
32
33const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
34pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
35const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
36const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
37const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
38
39#[derive(Default)]
41pub struct AuthorityPerpetualTablesOptions {
42 pub enable_write_stall: bool,
44 pub compaction_filter: Option<ObjectsCompactionFilter>,
45}
46
47impl AuthorityPerpetualTablesOptions {
48 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
49 if !self.enable_write_stall {
50 db_options = db_options.disable_write_throttling();
51 }
52 db_options
53 }
54}
55
56#[derive(DBMapUtils)]
59pub struct AuthorityPerpetualTables {
60 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
75
76 pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
78
79 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
84
85 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
98
99 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
105
106 pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
111
112 pub(crate) executed_transactions_to_checkpoint:
117 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
118
119 pub(crate) root_state_hash_by_epoch: DBMap<EpochId, (CheckpointSequenceNumber, Accumulator)>,
123
124 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
126
127 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
130
131 pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
135
136 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
141
142 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
150}
151
152#[derive(DBMapUtils)]
153pub struct AuthorityPrunerTables {
154 pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
155}
156
157impl AuthorityPrunerTables {
158 pub fn path(parent_path: &Path) -> PathBuf {
159 parent_path.join("pruner")
160 }
161
162 pub fn open(parent_path: &Path) -> Self {
163 Self::open_tables_read_write(
164 Self::path(parent_path),
165 MetricConf::new("pruner")
166 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
167 None,
168 None,
169 )
170 }
171}
172
173#[derive(Debug, Serialize, Deserialize)]
175pub(crate) struct TotalIotaSupplyCheck {
176 pub(crate) total_supply: u64,
178 pub(crate) last_check_epoch: EpochId,
180}
181
182impl AuthorityPerpetualTables {
183 pub fn path(parent_path: &Path) -> PathBuf {
184 parent_path.join("perpetual")
185 }
186
187 pub fn open(
188 parent_path: &Path,
189 db_options_override: Option<AuthorityPerpetualTablesOptions>,
190 ) -> Self {
191 let db_options_override = db_options_override.unwrap_or_default();
192 let db_options =
193 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
194 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
195 (
196 "objects".to_string(),
197 objects_table_config(db_options.clone(), db_options_override.compaction_filter),
198 ),
199 (
200 "live_owned_object_markers".to_string(),
201 live_owned_object_markers_table_config(db_options.clone()),
202 ),
203 (
204 "transactions".to_string(),
205 transactions_table_config(db_options.clone()),
206 ),
207 (
208 "effects".to_string(),
209 effects_table_config(db_options.clone()),
210 ),
211 (
212 "events".to_string(),
213 events_table_config(db_options.clone()),
214 ),
215 ]));
216 Self::open_tables_read_write(
217 Self::path(parent_path),
218 MetricConf::new("perpetual")
219 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
220 Some(db_options.options),
221 Some(table_options),
222 )
223 }
224
225 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
226 Self::get_read_only_handle(
227 Self::path(parent_path),
228 None,
229 None,
230 MetricConf::new("perpetual_readonly"),
231 )
232 }
233
234 pub fn find_object_lt_or_eq_version(
239 &self,
240 object_id: ObjectID,
241 version: SequenceNumber,
242 ) -> IotaResult<Option<Object>> {
243 let mut iter = self.objects.reversed_safe_iter_with_bounds(
244 Some(ObjectKey::min_for_id(&object_id)),
245 Some(ObjectKey(object_id, version)),
246 )?;
247 match iter.next() {
248 Some(Ok((key, o))) => self.object(&key, o),
249 Some(Err(e)) => Err(e.into()),
250 None => Ok(None),
251 }
252 }
253
254 fn construct_object(
255 &self,
256 object_key: &ObjectKey,
257 store_object: StoreObjectValue,
258 ) -> Result<Object, IotaError> {
259 try_construct_object(object_key, store_object)
260 }
261
262 pub fn object(
265 &self,
266 object_key: &ObjectKey,
267 store_object: StoreObjectWrapper,
268 ) -> Result<Option<Object>, IotaError> {
269 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
270 return Ok(None);
271 };
272 Ok(Some(self.construct_object(object_key, store_object)?))
273 }
274
275 pub fn object_reference(
276 &self,
277 object_key: &ObjectKey,
278 store_object: StoreObjectWrapper,
279 ) -> Result<ObjectRef, IotaError> {
280 let obj_ref = match store_object.migrate().into_inner() {
281 StoreObject::Value(object) => self
282 .construct_object(object_key, object)?
283 .compute_object_reference(),
284 StoreObject::Deleted => (
285 object_key.0,
286 object_key.1,
287 ObjectDigest::OBJECT_DIGEST_DELETED,
288 ),
289 StoreObject::Wrapped => (
290 object_key.0,
291 object_key.1,
292 ObjectDigest::OBJECT_DIGEST_WRAPPED,
293 ),
294 };
295 Ok(obj_ref)
296 }
297
298 pub fn tombstone_reference(
299 &self,
300 object_key: &ObjectKey,
301 store_object: &StoreObjectWrapper,
302 ) -> Result<Option<ObjectRef>, IotaError> {
303 let obj_ref = match store_object.inner() {
304 StoreObject::Deleted => Some((
305 object_key.0,
306 object_key.1,
307 ObjectDigest::OBJECT_DIGEST_DELETED,
308 )),
309 StoreObject::Wrapped => Some((
310 object_key.0,
311 object_key.1,
312 ObjectDigest::OBJECT_DIGEST_WRAPPED,
313 )),
314 _ => None,
315 };
316 Ok(obj_ref)
317 }
318
319 pub fn get_latest_object_ref_or_tombstone(
320 &self,
321 object_id: ObjectID,
322 ) -> Result<Option<ObjectRef>, IotaError> {
323 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
324 Some(ObjectKey::min_for_id(&object_id)),
325 Some(ObjectKey::max_for_id(&object_id)),
326 )?;
327
328 if let Some(Ok((object_key, value))) = iterator.next() {
329 if object_key.0 == object_id {
330 return Ok(Some(self.object_reference(&object_key, value)?));
331 }
332 }
333 Ok(None)
334 }
335
336 pub fn get_latest_object_or_tombstone(
337 &self,
338 object_id: ObjectID,
339 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
340 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
341 Some(ObjectKey::min_for_id(&object_id)),
342 Some(ObjectKey::max_for_id(&object_id)),
343 )?;
344
345 if let Some(Ok((object_key, value))) = iterator.next() {
346 if object_key.0 == object_id {
347 return Ok(Some((object_key, value)));
348 }
349 }
350 Ok(None)
351 }
352
353 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
354 Ok(self
355 .epoch_start_configuration
356 .get(&())?
357 .expect("Must have current epoch.")
358 .epoch_start_state()
359 .epoch())
360 }
361
362 pub fn set_epoch_start_configuration(
363 &self,
364 epoch_start_configuration: &EpochStartConfiguration,
365 ) -> IotaResult {
366 let mut wb = self.epoch_start_configuration.batch();
367 wb.insert_batch(
368 &self.epoch_start_configuration,
369 std::iter::once(((), epoch_start_configuration)),
370 )?;
371 wb.write()?;
372 Ok(())
373 }
374
375 pub fn get_highest_pruned_checkpoint(&self) -> IotaResult<CheckpointSequenceNumber> {
376 Ok(self.pruned_checkpoint.get(&())?.unwrap_or_default())
377 }
378
379 pub fn set_highest_pruned_checkpoint(
380 &self,
381 wb: &mut DBBatch,
382 checkpoint_number: CheckpointSequenceNumber,
383 ) -> IotaResult {
384 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
385 Ok(())
386 }
387
388 pub fn get_transaction(
389 &self,
390 digest: &TransactionDigest,
391 ) -> IotaResult<Option<TrustedTransaction>> {
392 let Some(transaction) = self.transactions.get(digest)? else {
393 return Ok(None);
394 };
395 Ok(Some(transaction))
396 }
397
398 pub fn get_effects(
399 &self,
400 digest: &TransactionDigest,
401 ) -> IotaResult<Option<TransactionEffects>> {
402 let Some(effect_digest) = self.executed_effects.get(digest)? else {
403 return Ok(None);
404 };
405 Ok(self.effects.get(&effect_digest)?)
406 }
407
408 pub fn get_checkpoint_sequence_number(
409 &self,
410 digest: &TransactionDigest,
411 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
412 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
413 }
414
415 pub fn get_newer_object_keys(
416 &self,
417 object: &(ObjectID, SequenceNumber),
418 ) -> IotaResult<Vec<ObjectKey>> {
419 let mut objects = vec![];
420 for result in self.objects.safe_iter_with_bounds(
421 Some(ObjectKey(object.0, object.1.next())),
422 Some(ObjectKey(object.0, VersionNumber::MAX_VALID_EXCL)),
423 ) {
424 let (key, _) = result?;
425 objects.push(key);
426 }
427 Ok(objects)
428 }
429
430 pub fn set_highest_pruned_checkpoint_without_wb(
431 &self,
432 checkpoint_number: CheckpointSequenceNumber,
433 ) -> IotaResult {
434 let mut wb = self.pruned_checkpoint.batch();
435 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
436 wb.write()?;
437 Ok(())
438 }
439
440 pub fn database_is_empty(&self) -> IotaResult<bool> {
441 Ok(self.objects.unbounded_iter().next().is_none())
442 }
443
444 pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
445 LiveSetIter {
446 iter: self.objects.unbounded_iter(),
447 tables: self,
448 prev: None,
449 }
450 }
451
452 pub fn range_iter_live_object_set(
453 &self,
454 lower_bound: Option<ObjectID>,
455 upper_bound: Option<ObjectID>,
456 ) -> LiveSetIter<'_> {
457 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
458 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
459
460 LiveSetIter {
461 iter: self.objects.iter_with_bounds(lower_bound, upper_bound),
462 tables: self,
463 prev: None,
464 }
465 }
466
467 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
468 self.objects.checkpoint_db(path).map_err(Into::into)
470 }
471
472 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
473 self.objects.unsafe_clear()?;
475 self.live_owned_object_markers.unsafe_clear()?;
476 self.executed_effects.unsafe_clear()?;
477 self.events.unsafe_clear()?;
478 self.executed_transactions_to_checkpoint.unsafe_clear()?;
479 self.root_state_hash_by_epoch.unsafe_clear()?;
480 self.epoch_start_configuration.unsafe_clear()?;
481 self.pruned_checkpoint.unsafe_clear()?;
482 self.total_iota_supply.unsafe_clear()?;
483 self.expected_storage_fund_imbalance.unsafe_clear()?;
484 self.object_per_epoch_marker_table.unsafe_clear()?;
485 self.objects.rocksdb.flush()?;
486 Ok(())
487 }
488
489 pub fn get_root_state_hash(
490 &self,
491 epoch: EpochId,
492 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
493 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
494 }
495
496 pub fn insert_root_state_hash(
497 &self,
498 epoch: EpochId,
499 last_checkpoint_of_epoch: CheckpointSequenceNumber,
500 accumulator: Accumulator,
501 ) -> IotaResult {
502 self.root_state_hash_by_epoch
503 .insert(&epoch, &(last_checkpoint_of_epoch, accumulator))?;
504 Ok(())
505 }
506
507 pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
508 let object_reference = object.compute_object_reference();
509 let wrapper = get_store_object(object);
510 let mut wb = self.objects.batch();
511 wb.insert_batch(
512 &self.objects,
513 std::iter::once((ObjectKey::from(object_reference), wrapper)),
514 )?;
515 wb.write()?;
516 Ok(())
517 }
518}
519
520impl ObjectStore for AuthorityPerpetualTables {
521 fn try_get_object(
523 &self,
524 object_id: &ObjectID,
525 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
526 let obj_entry = self
527 .objects
528 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))
529 .map_err(iota_types::storage::error::Error::custom)?
530 .next();
531
532 match obj_entry.transpose()? {
533 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
534 .object(&ObjectKey(obj_id, version), obj)
535 .map_err(iota_types::storage::error::Error::custom)?),
536 _ => Ok(None),
537 }
538 }
539
540 fn try_get_object_by_key(
541 &self,
542 object_id: &ObjectID,
543 version: VersionNumber,
544 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
545 Ok(self
546 .objects
547 .get(&ObjectKey(*object_id, version))
548 .map_err(iota_types::storage::error::Error::custom)?
549 .map(|object| self.object(&ObjectKey(*object_id, version), object))
550 .transpose()
551 .map_err(iota_types::storage::error::Error::custom)?
552 .flatten())
553 }
554}
555
556pub struct LiveSetIter<'a> {
557 iter:
558 <DBMap<ObjectKey, StoreObjectWrapper> as Map<'a, ObjectKey, StoreObjectWrapper>>::Iterator,
559 tables: &'a AuthorityPerpetualTables,
560 prev: Option<(ObjectKey, StoreObjectWrapper)>,
561}
562
563#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
564pub enum LiveObject {
565 Normal(Object),
566 Wrapped(ObjectKey),
567}
568
569impl LiveObject {
570 pub fn object_id(&self) -> ObjectID {
571 match self {
572 LiveObject::Normal(obj) => obj.id(),
573 LiveObject::Wrapped(key) => key.0,
574 }
575 }
576
577 pub fn version(&self) -> SequenceNumber {
578 match self {
579 LiveObject::Normal(obj) => obj.version(),
580 LiveObject::Wrapped(key) => key.1,
581 }
582 }
583
584 pub fn object_reference(&self) -> ObjectRef {
585 match self {
586 LiveObject::Normal(obj) => obj.compute_object_reference(),
587 LiveObject::Wrapped(key) => (key.0, key.1, ObjectDigest::OBJECT_DIGEST_WRAPPED),
588 }
589 }
590
591 pub fn to_normal(self) -> Option<Object> {
592 match self {
593 LiveObject::Normal(object) => Some(object),
594 LiveObject::Wrapped(_) => None,
595 }
596 }
597}
598
599impl LiveSetIter<'_> {
600 fn store_object_wrapper_to_live_object(
601 &self,
602 object_key: ObjectKey,
603 store_object: StoreObjectWrapper,
604 ) -> Option<LiveObject> {
605 match store_object.migrate().into_inner() {
606 StoreObject::Value(object) => {
607 let object = self
608 .tables
609 .construct_object(&object_key, object)
610 .expect("Constructing object from store cannot fail");
611 Some(LiveObject::Normal(object))
612 }
613 StoreObject::Wrapped | StoreObject::Deleted => None,
614 }
615 }
616}
617
618impl Iterator for LiveSetIter<'_> {
619 type Item = LiveObject;
620
621 fn next(&mut self) -> Option<Self::Item> {
622 loop {
623 if let Some((next_key, next_value)) = self.iter.next() {
624 let prev = self.prev.take();
625 self.prev = Some((next_key, next_value));
626
627 if let Some((prev_key, prev_value)) = prev {
628 if prev_key.0 != next_key.0 {
629 let live_object =
630 self.store_object_wrapper_to_live_object(prev_key, prev_value);
631 if live_object.is_some() {
632 return live_object;
633 }
634 }
635 }
636 continue;
637 }
638 if let Some((key, value)) = self.prev.take() {
639 let live_object = self.store_object_wrapper_to_live_object(key, value);
640 if live_object.is_some() {
641 return live_object;
642 }
643 }
644 return None;
645 }
646 }
647}
648
649fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
651 DBOptions {
652 options: db_options
653 .clone()
654 .optimize_for_write_throughput()
655 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
656 .options,
657 rw_options: db_options.rw_options.set_ignore_range_deletions(false),
658 }
659}
660
661fn objects_table_config(
662 mut db_options: DBOptions,
663 compaction_filter: Option<ObjectsCompactionFilter>,
664) -> DBOptions {
665 if let Some(mut compaction_filter) = compaction_filter {
666 db_options
667 .options
668 .set_compaction_filter("objects", move |_, key, value| {
669 match compaction_filter.filter(key, value) {
670 Ok(decision) => decision,
671 Err(err) => {
672 error!("Compaction error: {:?}", err);
673 Decision::Keep
674 }
675 }
676 });
677 }
678 db_options
679 .optimize_for_write_throughput()
680 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
681}
682
683fn transactions_table_config(db_options: DBOptions) -> DBOptions {
684 db_options
685 .optimize_for_write_throughput()
686 .optimize_for_point_lookup(
687 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
688 )
689}
690
691fn effects_table_config(db_options: DBOptions) -> DBOptions {
692 db_options
693 .optimize_for_write_throughput()
694 .optimize_for_point_lookup(
695 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
696 )
697}
698
699fn events_table_config(db_options: DBOptions) -> DBOptions {
700 db_options
701 .optimize_for_write_throughput()
702 .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
703}