1use std::path::Path;
6
7use iota_types::{
8 base_types::SequenceNumber,
9 digests::TransactionEventsDigest,
10 effects::{TransactionEffects, TransactionEvents},
11 global_state_hash::GlobalStateHash,
12 storage::MarkerValue,
13};
14use serde::{Deserialize, Serialize};
15use tracing::error;
16use typed_store::{
17 DBMapUtils, DbIterator,
18 metrics::SamplingInterval,
19 rocks::{
20 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
21 read_size_from_env,
22 },
23 rocksdb::compaction_filter::Decision,
24 traits::Map,
25};
26
27use super::*;
28use crate::authority::{
29 authority_store_pruner::ObjectsCompactionFilter,
30 authority_store_types::{
31 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
32 },
33 epoch_start_configuration::EpochStartConfiguration,
34};
35
36const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
37pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
38const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
39const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
40const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
41
42#[derive(Default)]
44pub struct AuthorityPerpetualTablesOptions {
45 pub enable_write_stall: bool,
47 pub compaction_filter: Option<ObjectsCompactionFilter>,
48}
49
50impl AuthorityPerpetualTablesOptions {
51 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
52 if !self.enable_write_stall {
53 db_options = db_options.disable_write_throttling();
54 }
55 db_options
56 }
57}
58
59#[derive(DBMapUtils)]
62pub struct AuthorityPerpetualTables {
63 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
78
79 pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
81
82 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
87
88 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
101
102 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
108
109 pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
114
115 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
117
118 pub(crate) executed_transactions_to_checkpoint:
123 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
124
125 pub(crate) root_state_hash_by_epoch:
129 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
130
131 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
133
134 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
137
138 pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
142
143 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
148
149 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
157}
158
159#[derive(DBMapUtils)]
160pub struct AuthorityPrunerTables {
161 pub(crate) object_tombstones: DBMap<ObjectID, SequenceNumber>,
162}
163
164impl AuthorityPrunerTables {
165 pub fn path(parent_path: &Path) -> PathBuf {
166 parent_path.join("pruner")
167 }
168
169 pub fn open(parent_path: &Path) -> Self {
170 Self::open_tables_read_write(
171 Self::path(parent_path),
172 MetricConf::new("pruner")
173 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
174 None,
175 None,
176 )
177 }
178}
179
180#[derive(Debug, Serialize, Deserialize)]
182pub(crate) struct TotalIotaSupplyCheck {
183 pub(crate) total_supply: u64,
185 pub(crate) last_check_epoch: EpochId,
187}
188
189impl AuthorityPerpetualTables {
190 pub fn path(parent_path: &Path) -> PathBuf {
191 parent_path.join("perpetual")
192 }
193
194 pub fn open(
195 parent_path: &Path,
196 db_options_override: Option<AuthorityPerpetualTablesOptions>,
197 ) -> Self {
198 let db_options_override = db_options_override.unwrap_or_default();
199 let db_options =
200 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
201 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
202 (
203 "objects".to_string(),
204 objects_table_config(db_options.clone(), db_options_override.compaction_filter),
205 ),
206 (
207 "live_owned_object_markers".to_string(),
208 live_owned_object_markers_table_config(db_options.clone()),
209 ),
210 (
211 "transactions".to_string(),
212 transactions_table_config(db_options.clone()),
213 ),
214 (
215 "effects".to_string(),
216 effects_table_config(db_options.clone()),
217 ),
218 (
219 "events".to_string(),
220 events_table_config(db_options.clone()),
221 ),
222 ]));
223 Self::open_tables_read_write(
224 Self::path(parent_path),
225 MetricConf::new("perpetual")
226 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
227 Some(db_options.options),
228 Some(table_options),
229 )
230 }
231
232 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
233 Self::get_read_only_handle(
234 Self::path(parent_path),
235 None,
236 None,
237 MetricConf::new("perpetual_readonly"),
238 )
239 }
240
241 pub fn find_object_lt_or_eq_version(
246 &self,
247 object_id: ObjectID,
248 version: SequenceNumber,
249 ) -> IotaResult<Option<Object>> {
250 let mut iter = self.objects.reversed_safe_iter_with_bounds(
251 Some(ObjectKey::min_for_id(&object_id)),
252 Some(ObjectKey(object_id, version)),
253 )?;
254 match iter.next() {
255 Some(Ok((key, o))) => self.object(&key, o),
256 Some(Err(e)) => Err(e.into()),
257 None => Ok(None),
258 }
259 }
260
261 fn construct_object(
262 &self,
263 object_key: &ObjectKey,
264 store_object: StoreObjectValue,
265 ) -> Result<Object, IotaError> {
266 try_construct_object(object_key, store_object)
267 }
268
269 pub fn object(
272 &self,
273 object_key: &ObjectKey,
274 store_object: StoreObjectWrapper,
275 ) -> Result<Option<Object>, IotaError> {
276 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
277 return Ok(None);
278 };
279 Ok(Some(self.construct_object(object_key, *store_object)?))
280 }
281
282 pub fn object_reference(
283 &self,
284 object_key: &ObjectKey,
285 store_object: StoreObjectWrapper,
286 ) -> Result<ObjectRef, IotaError> {
287 let obj_ref = match store_object.migrate().into_inner() {
288 StoreObject::Value(object) => self
289 .construct_object(object_key, *object)?
290 .compute_object_reference(),
291 StoreObject::Deleted => {
292 ObjectRef::new(object_key.0, object_key.1, ObjectDigest::OBJECT_DELETED)
293 }
294 StoreObject::Wrapped => {
295 ObjectRef::new(object_key.0, object_key.1, ObjectDigest::OBJECT_WRAPPED)
296 }
297 };
298 Ok(obj_ref)
299 }
300
301 pub fn tombstone_reference(
302 &self,
303 object_key: &ObjectKey,
304 store_object: &StoreObjectWrapper,
305 ) -> Result<Option<ObjectRef>, IotaError> {
306 let obj_ref = match store_object.inner() {
307 StoreObject::Deleted => Some(ObjectRef::new(
308 object_key.0,
309 object_key.1,
310 ObjectDigest::OBJECT_DELETED,
311 )),
312 StoreObject::Wrapped => Some(ObjectRef::new(
313 object_key.0,
314 object_key.1,
315 ObjectDigest::OBJECT_WRAPPED,
316 )),
317 _ => None,
318 };
319 Ok(obj_ref)
320 }
321
322 pub fn get_latest_object_ref_or_tombstone(
323 &self,
324 object_id: ObjectID,
325 ) -> Result<Option<ObjectRef>, IotaError> {
326 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
327 Some(ObjectKey::min_for_id(&object_id)),
328 Some(ObjectKey::max_for_id(&object_id)),
329 )?;
330
331 if let Some(Ok((object_key, value))) = iterator.next() {
332 if object_key.0 == object_id {
333 return Ok(Some(self.object_reference(&object_key, value)?));
334 }
335 }
336 Ok(None)
337 }
338
339 pub fn get_latest_object_or_tombstone(
340 &self,
341 object_id: ObjectID,
342 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
343 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
344 Some(ObjectKey::min_for_id(&object_id)),
345 Some(ObjectKey::max_for_id(&object_id)),
346 )?;
347
348 if let Some(Ok((object_key, value))) = iterator.next() {
349 if object_key.0 == object_id {
350 return Ok(Some((object_key, value)));
351 }
352 }
353 Ok(None)
354 }
355
356 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
357 Ok(self
358 .epoch_start_configuration
359 .get(&())?
360 .expect("Must have current epoch.")
361 .epoch_start_state()
362 .epoch())
363 }
364
365 pub fn set_epoch_start_configuration(
366 &self,
367 epoch_start_configuration: &EpochStartConfiguration,
368 ) -> IotaResult {
369 let mut wb = self.epoch_start_configuration.batch();
370 wb.insert_batch(
371 &self.epoch_start_configuration,
372 std::iter::once(((), epoch_start_configuration)),
373 )?;
374 wb.write()?;
375 Ok(())
376 }
377
378 pub fn get_highest_pruned_checkpoint(
379 &self,
380 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
381 self.pruned_checkpoint.get(&())
382 }
383
384 pub fn set_highest_pruned_checkpoint(
385 &self,
386 wb: &mut DBBatch,
387 checkpoint_number: CheckpointSequenceNumber,
388 ) -> IotaResult {
389 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
390 Ok(())
391 }
392
393 pub fn get_transaction(
394 &self,
395 digest: &TransactionDigest,
396 ) -> IotaResult<Option<TrustedTransaction>> {
397 let Some(transaction) = self.transactions.get(digest)? else {
398 return Ok(None);
399 };
400 Ok(Some(transaction))
401 }
402
403 pub fn get_effects(
404 &self,
405 digest: &TransactionDigest,
406 ) -> IotaResult<Option<TransactionEffects>> {
407 let Some(effect_digest) = self.executed_effects.get(digest)? else {
408 return Ok(None);
409 };
410 Ok(self.effects.get(&effect_digest)?)
411 }
412
413 pub fn get_checkpoint_sequence_number(
414 &self,
415 digest: &TransactionDigest,
416 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
417 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
418 }
419
420 pub fn get_newer_object_keys(
421 &self,
422 object: &(ObjectID, SequenceNumber),
423 ) -> IotaResult<Vec<ObjectKey>> {
424 let mut objects = vec![];
425 for result in self.objects.safe_iter_with_bounds(
426 Some(ObjectKey(object.0, object.1.next().unwrap())),
427 Some(ObjectKey(object.0, VersionNumber::MAX_VALID_EXCL)),
428 ) {
429 let (key, _) = result?;
430 objects.push(key);
431 }
432 Ok(objects)
433 }
434
435 pub fn set_highest_pruned_checkpoint_without_wb(
436 &self,
437 checkpoint_number: CheckpointSequenceNumber,
438 ) -> IotaResult {
439 let mut wb = self.pruned_checkpoint.batch();
440 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
441 wb.write()?;
442 Ok(())
443 }
444
445 pub fn database_is_empty(&self) -> IotaResult<bool> {
446 Ok(self.objects.safe_iter().next().is_none())
447 }
448
449 pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
450 LiveSetIter {
451 iter: Box::new(self.objects.safe_iter()),
452 tables: self,
453 prev: None,
454 }
455 }
456
457 pub fn range_iter_live_object_set(
458 &self,
459 lower_bound: Option<ObjectID>,
460 upper_bound: Option<ObjectID>,
461 ) -> LiveSetIter<'_> {
462 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
463 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
464
465 LiveSetIter {
466 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
467 tables: self,
468 prev: None,
469 }
470 }
471
472 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
473 self.objects.checkpoint_db(path).map_err(Into::into)
475 }
476
477 pub fn get_root_state_hash(
478 &self,
479 epoch: EpochId,
480 ) -> IotaResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
481 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
482 }
483
484 pub fn insert_root_state_hash(
485 &self,
486 epoch: EpochId,
487 last_checkpoint_of_epoch: CheckpointSequenceNumber,
488 hash: GlobalStateHash,
489 ) -> IotaResult {
490 self.root_state_hash_by_epoch
491 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
492 Ok(())
493 }
494
495 pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
496 let object_reference = object.compute_object_reference();
497 let wrapper = get_store_object(object);
498 let mut wb = self.objects.batch();
499 wb.insert_batch(
500 &self.objects,
501 std::iter::once((ObjectKey::from(object_reference), wrapper)),
502 )?;
503 wb.write()?;
504 Ok(())
505 }
506}
507
508impl ObjectStore for AuthorityPerpetualTables {
509 fn try_get_object(
511 &self,
512 object_id: &ObjectID,
513 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
514 let obj_entry = self
515 .objects
516 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))
517 .map_err(iota_types::storage::error::Error::custom)?
518 .next();
519
520 match obj_entry.transpose()? {
521 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
522 .object(&ObjectKey(obj_id, version), obj)
523 .map_err(iota_types::storage::error::Error::custom)?),
524 _ => Ok(None),
525 }
526 }
527
528 fn try_get_object_by_key(
529 &self,
530 object_id: &ObjectID,
531 version: VersionNumber,
532 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
533 Ok(self
534 .objects
535 .get(&ObjectKey(*object_id, version))
536 .map_err(iota_types::storage::error::Error::custom)?
537 .map(|object| self.object(&ObjectKey(*object_id, version), object))
538 .transpose()
539 .map_err(iota_types::storage::error::Error::custom)?
540 .flatten())
541 }
542}
543
544pub struct LiveSetIter<'a> {
545 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
546 tables: &'a AuthorityPerpetualTables,
547 prev: Option<(ObjectKey, StoreObjectWrapper)>,
548}
549
550#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
551pub enum LiveObject {
552 Normal(Object),
553 Wrapped(ObjectKey),
554}
555
556impl LiveObject {
557 pub fn object_id(&self) -> ObjectID {
558 match self {
559 LiveObject::Normal(obj) => obj.id(),
560 LiveObject::Wrapped(key) => key.0,
561 }
562 }
563
564 pub fn version(&self) -> SequenceNumber {
565 match self {
566 LiveObject::Normal(obj) => obj.version(),
567 LiveObject::Wrapped(key) => key.1,
568 }
569 }
570
571 pub fn object_reference(&self) -> ObjectRef {
572 match self {
573 LiveObject::Normal(obj) => obj.compute_object_reference(),
574 LiveObject::Wrapped(key) => ObjectRef::new(key.0, key.1, ObjectDigest::OBJECT_WRAPPED),
575 }
576 }
577
578 pub fn to_normal(self) -> Option<Object> {
579 match self {
580 LiveObject::Normal(object) => Some(object),
581 LiveObject::Wrapped(_) => None,
582 }
583 }
584}
585
586impl LiveSetIter<'_> {
587 fn store_object_wrapper_to_live_object(
588 &self,
589 object_key: ObjectKey,
590 store_object: StoreObjectWrapper,
591 ) -> Option<LiveObject> {
592 match store_object.migrate().into_inner() {
593 StoreObject::Value(object) => {
594 let object = self
595 .tables
596 .construct_object(&object_key, *object)
597 .expect("Constructing object from store cannot fail");
598 Some(LiveObject::Normal(object))
599 }
600 StoreObject::Wrapped | StoreObject::Deleted => None,
601 }
602 }
603}
604
605impl Iterator for LiveSetIter<'_> {
606 type Item = LiveObject;
607
608 fn next(&mut self) -> Option<Self::Item> {
609 loop {
610 if let Some(Ok((next_key, next_value))) = self.iter.next() {
611 let prev = self.prev.take();
612 self.prev = Some((next_key, next_value));
613
614 if let Some((prev_key, prev_value)) = prev {
615 if prev_key.0 != next_key.0 {
616 let live_object =
617 self.store_object_wrapper_to_live_object(prev_key, prev_value);
618 if live_object.is_some() {
619 return live_object;
620 }
621 }
622 }
623 continue;
624 }
625 if let Some((key, value)) = self.prev.take() {
626 let live_object = self.store_object_wrapper_to_live_object(key, value);
627 if live_object.is_some() {
628 return live_object;
629 }
630 }
631 return None;
632 }
633 }
634}
635
636fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
638 DBOptions {
639 options: db_options
640 .clone()
641 .optimize_for_write_throughput()
642 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
643 .options,
644 rw_options: db_options.rw_options,
645 }
646}
647
648fn objects_table_config(
649 mut db_options: DBOptions,
650 compaction_filter: Option<ObjectsCompactionFilter>,
651) -> DBOptions {
652 if let Some(mut compaction_filter) = compaction_filter {
653 db_options
654 .options
655 .set_compaction_filter("objects", move |_, key, value| {
656 match compaction_filter.filter(key, value) {
657 Ok(decision) => decision,
658 Err(err) => {
659 error!("Compaction error: {:?}", err);
660 Decision::Keep
661 }
662 }
663 });
664 }
665 db_options
666 .optimize_for_write_throughput()
667 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
668}
669
670fn transactions_table_config(db_options: DBOptions) -> DBOptions {
671 db_options
672 .optimize_for_write_throughput()
673 .optimize_for_point_lookup(
674 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
675 )
676}
677
678fn effects_table_config(db_options: DBOptions) -> DBOptions {
679 db_options
680 .optimize_for_write_throughput()
681 .optimize_for_point_lookup(
682 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
683 )
684}
685
686fn events_table_config(db_options: DBOptions) -> DBOptions {
687 db_options
688 .optimize_for_write_throughput()
689 .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
690}