1use std::path::Path;
6
7use iota_types::{
8 base_types::SequenceNumber,
9 digests::TransactionEventsDigest,
10 effects::{TransactionEffects, TransactionEvents},
11 global_state_hash::GlobalStateHash,
12 storage::MarkerValue,
13};
14use serde::{Deserialize, Serialize};
15use tracing::error;
16use typed_store::{
17 DBMapUtils, DbIterator,
18 metrics::SamplingInterval,
19 rocks::{
20 DBBatch, DBMap, DBMapTableConfigMap, DBOptions, MetricConf, default_db_options,
21 read_size_from_env,
22 },
23 rocksdb::compaction_filter::Decision,
24 traits::Map,
25};
26
27use super::*;
28use crate::authority::{
29 authority_store_pruner::ObjectsCompactionFilter,
30 authority_store_types::{
31 StoreObject, StoreObjectValue, StoreObjectWrapper, get_store_object, try_construct_object,
32 },
33 epoch_start_configuration::EpochStartConfiguration,
34};
35
36const ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE: &str = "OBJECTS_BLOCK_CACHE_MB";
37pub(crate) const ENV_VAR_LOCKS_BLOCK_CACHE_SIZE: &str = "LOCKS_BLOCK_CACHE_MB";
38const ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE: &str = "TRANSACTIONS_BLOCK_CACHE_MB";
39const ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE: &str = "EFFECTS_BLOCK_CACHE_MB";
40const ENV_VAR_EVENTS_BLOCK_CACHE_SIZE: &str = "EVENTS_BLOCK_CACHE_MB";
41
42#[derive(Default)]
44pub struct AuthorityPerpetualTablesOptions {
45 pub enable_write_stall: bool,
47 pub compaction_filter: Option<ObjectsCompactionFilter>,
48}
49
50impl AuthorityPerpetualTablesOptions {
51 fn apply_to(&self, mut db_options: DBOptions) -> DBOptions {
52 if !self.enable_write_stall {
53 db_options = db_options.disable_write_throttling();
54 }
55 db_options
56 }
57}
58
59#[derive(DBMapUtils)]
62pub struct AuthorityPerpetualTables {
63 pub(crate) objects: DBMap<ObjectKey, StoreObjectWrapper>,
78
79 pub(crate) live_owned_object_markers: DBMap<ObjectRef, ()>,
81
82 pub(crate) transactions: DBMap<TransactionDigest, TrustedTransaction>,
87
88 pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,
101
102 pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsDigest>,
108
109 pub(crate) events: DBMap<(TransactionEventsDigest, usize), Event>,
114
115 pub(crate) events_2: DBMap<TransactionDigest, TransactionEvents>,
117
118 pub(crate) executed_transactions_to_checkpoint:
123 DBMap<TransactionDigest, (EpochId, CheckpointSequenceNumber)>,
124
125 pub(crate) root_state_hash_by_epoch:
129 DBMap<EpochId, (CheckpointSequenceNumber, GlobalStateHash)>,
130
131 pub(crate) epoch_start_configuration: DBMap<(), EpochStartConfiguration>,
133
134 pub(crate) pruned_checkpoint: DBMap<(), CheckpointSequenceNumber>,
137
138 pub(crate) total_iota_supply: DBMap<(), TotalIotaSupplyCheck>,
142
143 pub(crate) expected_storage_fund_imbalance: DBMap<(), i64>,
148
149 pub(crate) object_per_epoch_marker_table: DBMap<(EpochId, ObjectKey), MarkerValue>,
157}
158
159#[derive(DBMapUtils)]
160pub struct AuthorityPrunerTables {
161 pub(crate) object_tombstones: DBMap<ObjectId, SequenceNumber>,
162}
163
164impl AuthorityPrunerTables {
165 pub fn path(parent_path: &Path) -> PathBuf {
166 parent_path.join("pruner")
167 }
168
169 pub fn open(parent_path: &Path) -> Self {
170 Self::open_tables_read_write(
171 Self::path(parent_path),
172 MetricConf::new("pruner")
173 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
174 None,
175 None,
176 )
177 }
178}
179
180#[derive(Debug, Serialize, Deserialize)]
182pub(crate) struct TotalIotaSupplyCheck {
183 pub(crate) total_supply: u64,
185 pub(crate) last_check_epoch: EpochId,
187}
188
189impl AuthorityPerpetualTables {
190 pub fn path(parent_path: &Path) -> PathBuf {
191 parent_path.join("perpetual")
192 }
193
194 pub fn open(
195 parent_path: &Path,
196 db_options_override: Option<AuthorityPerpetualTablesOptions>,
197 ) -> Self {
198 let db_options_override = db_options_override.unwrap_or_default();
199 let db_options =
200 db_options_override.apply_to(default_db_options().optimize_db_for_write_throughput(4));
201 let table_options = DBMapTableConfigMap::new(BTreeMap::from([
202 (
203 "objects".to_string(),
204 objects_table_config(db_options.clone(), db_options_override.compaction_filter),
205 ),
206 (
207 "live_owned_object_markers".to_string(),
208 live_owned_object_markers_table_config(db_options.clone()),
209 ),
210 (
211 "transactions".to_string(),
212 transactions_table_config(db_options.clone()),
213 ),
214 (
215 "effects".to_string(),
216 effects_table_config(db_options.clone()),
217 ),
218 (
219 "events".to_string(),
220 events_table_config(db_options.clone()),
221 ),
222 ]));
223 Self::open_tables_read_write(
224 Self::path(parent_path),
225 MetricConf::new("perpetual")
226 .with_sampling(SamplingInterval::new(Duration::from_secs(60), 0)),
227 Some(db_options.options),
228 Some(table_options),
229 )
230 }
231
232 pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
233 Self::get_read_only_handle(
234 Self::path(parent_path),
235 None,
236 None,
237 MetricConf::new("perpetual_readonly"),
238 )
239 }
240
241 pub fn find_object_lt_or_eq_version(
246 &self,
247 object_id: ObjectId,
248 version: SequenceNumber,
249 ) -> IotaResult<Option<Object>> {
250 let mut iter = self.objects.reversed_safe_iter_with_bounds(
251 Some(ObjectKey::min_for_id(&object_id)),
252 Some(ObjectKey(object_id, version)),
253 )?;
254 match iter.next() {
255 Some(Ok((key, o))) => self.object(&key, o),
256 Some(Err(e)) => Err(e.into()),
257 None => Ok(None),
258 }
259 }
260
261 fn construct_object(
262 &self,
263 object_key: &ObjectKey,
264 store_object: StoreObjectValue,
265 ) -> Result<Object, IotaError> {
266 try_construct_object(object_key, store_object)
267 }
268
269 pub fn object(
272 &self,
273 object_key: &ObjectKey,
274 store_object: StoreObjectWrapper,
275 ) -> Result<Option<Object>, IotaError> {
276 let StoreObject::Value(store_object) = store_object.migrate().into_inner() else {
277 return Ok(None);
278 };
279 Ok(Some(self.construct_object(object_key, *store_object)?))
280 }
281
282 pub fn object_reference(
283 &self,
284 object_key: &ObjectKey,
285 store_object: StoreObjectWrapper,
286 ) -> Result<ObjectRef, IotaError> {
287 let obj_ref = match store_object.migrate().into_inner() {
288 StoreObject::Value(object) => self.construct_object(object_key, *object)?.object_ref(),
289 StoreObject::Deleted => {
290 ObjectRef::new(object_key.0, object_key.1, ObjectDigest::OBJECT_DELETED)
291 }
292 StoreObject::Wrapped => {
293 ObjectRef::new(object_key.0, object_key.1, ObjectDigest::OBJECT_WRAPPED)
294 }
295 };
296 Ok(obj_ref)
297 }
298
299 pub fn tombstone_reference(
300 &self,
301 object_key: &ObjectKey,
302 store_object: &StoreObjectWrapper,
303 ) -> Result<Option<ObjectRef>, IotaError> {
304 let obj_ref = match store_object.inner() {
305 StoreObject::Deleted => Some(ObjectRef::new(
306 object_key.0,
307 object_key.1,
308 ObjectDigest::OBJECT_DELETED,
309 )),
310 StoreObject::Wrapped => Some(ObjectRef::new(
311 object_key.0,
312 object_key.1,
313 ObjectDigest::OBJECT_WRAPPED,
314 )),
315 _ => None,
316 };
317 Ok(obj_ref)
318 }
319
320 pub fn get_latest_object_ref_or_tombstone(
321 &self,
322 object_id: ObjectId,
323 ) -> Result<Option<ObjectRef>, IotaError> {
324 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
325 Some(ObjectKey::min_for_id(&object_id)),
326 Some(ObjectKey::max_for_id(&object_id)),
327 )?;
328
329 if let Some(Ok((object_key, value))) = iterator.next() {
330 if object_key.0 == object_id {
331 return Ok(Some(self.object_reference(&object_key, value)?));
332 }
333 }
334 Ok(None)
335 }
336
337 pub fn get_latest_object_or_tombstone(
338 &self,
339 object_id: ObjectId,
340 ) -> Result<Option<(ObjectKey, StoreObjectWrapper)>, IotaError> {
341 let mut iterator = self.objects.reversed_safe_iter_with_bounds(
342 Some(ObjectKey::min_for_id(&object_id)),
343 Some(ObjectKey::max_for_id(&object_id)),
344 )?;
345
346 if let Some(Ok((object_key, value))) = iterator.next() {
347 if object_key.0 == object_id {
348 return Ok(Some((object_key, value)));
349 }
350 }
351 Ok(None)
352 }
353
354 pub fn get_recovery_epoch_at_restart(&self) -> IotaResult<EpochId> {
355 Ok(self
356 .epoch_start_configuration
357 .get(&())?
358 .expect("Must have current epoch.")
359 .epoch_start_state()
360 .epoch())
361 }
362
363 pub fn set_epoch_start_configuration(
364 &self,
365 epoch_start_configuration: &EpochStartConfiguration,
366 ) -> IotaResult {
367 let mut wb = self.epoch_start_configuration.batch();
368 wb.insert_batch(
369 &self.epoch_start_configuration,
370 std::iter::once(((), epoch_start_configuration)),
371 )?;
372 wb.write()?;
373 Ok(())
374 }
375
376 pub fn get_highest_pruned_checkpoint(
377 &self,
378 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
379 self.pruned_checkpoint.get(&())
380 }
381
382 pub fn set_highest_pruned_checkpoint(
383 &self,
384 wb: &mut DBBatch,
385 checkpoint_number: CheckpointSequenceNumber,
386 ) -> IotaResult {
387 wb.insert_batch(&self.pruned_checkpoint, [((), checkpoint_number)])?;
388 Ok(())
389 }
390
391 pub fn get_transaction(
392 &self,
393 digest: &TransactionDigest,
394 ) -> IotaResult<Option<TrustedTransaction>> {
395 let Some(transaction) = self.transactions.get(digest)? else {
396 return Ok(None);
397 };
398 Ok(Some(transaction))
399 }
400
401 pub fn get_effects(
402 &self,
403 digest: &TransactionDigest,
404 ) -> IotaResult<Option<TransactionEffects>> {
405 let Some(effect_digest) = self.executed_effects.get(digest)? else {
406 return Ok(None);
407 };
408 Ok(self.effects.get(&effect_digest)?)
409 }
410
411 pub fn get_checkpoint_sequence_number(
412 &self,
413 digest: &TransactionDigest,
414 ) -> IotaResult<Option<(EpochId, CheckpointSequenceNumber)>> {
415 Ok(self.executed_transactions_to_checkpoint.get(digest)?)
416 }
417
418 pub fn get_newer_object_keys(
419 &self,
420 object: &(ObjectId, SequenceNumber),
421 ) -> IotaResult<Vec<ObjectKey>> {
422 let mut objects = vec![];
423 for result in self.objects.safe_iter_with_bounds(
424 Some(ObjectKey(object.0, object.1.next().unwrap())),
425 Some(ObjectKey(object.0, VersionNumber::MAX_VALID_EXCL)),
426 ) {
427 let (key, _) = result?;
428 objects.push(key);
429 }
430 Ok(objects)
431 }
432
433 pub fn set_highest_pruned_checkpoint_without_wb(
434 &self,
435 checkpoint_number: CheckpointSequenceNumber,
436 ) -> IotaResult {
437 let mut wb = self.pruned_checkpoint.batch();
438 self.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
439 wb.write()?;
440 Ok(())
441 }
442
443 pub fn database_is_empty(&self) -> IotaResult<bool> {
444 Ok(self.objects.safe_iter().next().is_none())
445 }
446
447 pub fn iter_live_object_set(&self) -> LiveSetIter<'_> {
448 LiveSetIter {
449 iter: Box::new(self.objects.safe_iter()),
450 tables: self,
451 prev: None,
452 }
453 }
454
455 pub fn range_iter_live_object_set(
456 &self,
457 lower_bound: Option<ObjectId>,
458 upper_bound: Option<ObjectId>,
459 ) -> LiveSetIter<'_> {
460 let lower_bound = lower_bound.as_ref().map(ObjectKey::min_for_id);
461 let upper_bound = upper_bound.as_ref().map(ObjectKey::max_for_id);
462
463 LiveSetIter {
464 iter: Box::new(self.objects.safe_iter_with_bounds(lower_bound, upper_bound)),
465 tables: self,
466 prev: None,
467 }
468 }
469
470 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
471 self.objects.checkpoint_db(path).map_err(Into::into)
473 }
474
475 pub fn get_root_state_hash(
476 &self,
477 epoch: EpochId,
478 ) -> IotaResult<Option<(CheckpointSequenceNumber, GlobalStateHash)>> {
479 Ok(self.root_state_hash_by_epoch.get(&epoch)?)
480 }
481
482 pub fn insert_root_state_hash(
483 &self,
484 epoch: EpochId,
485 last_checkpoint_of_epoch: CheckpointSequenceNumber,
486 hash: GlobalStateHash,
487 ) -> IotaResult {
488 self.root_state_hash_by_epoch
489 .insert(&epoch, &(last_checkpoint_of_epoch, hash))?;
490 Ok(())
491 }
492
493 pub fn insert_object_test_only(&self, object: Object) -> IotaResult {
494 let object_reference = object.object_ref();
495 let wrapper = get_store_object(object);
496 let mut wb = self.objects.batch();
497 wb.insert_batch(
498 &self.objects,
499 std::iter::once((ObjectKey::from(object_reference), wrapper)),
500 )?;
501 wb.write()?;
502 Ok(())
503 }
504}
505
506impl ObjectStore for AuthorityPerpetualTables {
507 fn try_get_object(
509 &self,
510 object_id: &ObjectId,
511 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
512 let obj_entry = self
513 .objects
514 .reversed_safe_iter_with_bounds(None, Some(ObjectKey::max_for_id(object_id)))
515 .map_err(iota_types::storage::error::Error::custom)?
516 .next();
517
518 match obj_entry.transpose()? {
519 Some((ObjectKey(obj_id, version), obj)) if obj_id == *object_id => Ok(self
520 .object(&ObjectKey(obj_id, version), obj)
521 .map_err(iota_types::storage::error::Error::custom)?),
522 _ => Ok(None),
523 }
524 }
525
526 fn try_get_object_by_key(
527 &self,
528 object_id: &ObjectId,
529 version: VersionNumber,
530 ) -> Result<Option<Object>, iota_types::storage::error::Error> {
531 Ok(self
532 .objects
533 .get(&ObjectKey(*object_id, version))
534 .map_err(iota_types::storage::error::Error::custom)?
535 .map(|object| self.object(&ObjectKey(*object_id, version), object))
536 .transpose()
537 .map_err(iota_types::storage::error::Error::custom)?
538 .flatten())
539 }
540}
541
542pub struct LiveSetIter<'a> {
543 iter: DbIterator<'a, (ObjectKey, StoreObjectWrapper)>,
544 tables: &'a AuthorityPerpetualTables,
545 prev: Option<(ObjectKey, StoreObjectWrapper)>,
546}
547
548#[derive(Eq, PartialEq, Debug, Clone, Deserialize, Serialize, Hash)]
549pub enum LiveObject {
550 Normal(Object),
551 Wrapped(ObjectKey),
552}
553
554impl LiveObject {
555 pub fn object_id(&self) -> ObjectId {
556 match self {
557 LiveObject::Normal(obj) => obj.id(),
558 LiveObject::Wrapped(key) => key.0,
559 }
560 }
561
562 pub fn version(&self) -> SequenceNumber {
563 match self {
564 LiveObject::Normal(obj) => obj.version(),
565 LiveObject::Wrapped(key) => key.1,
566 }
567 }
568
569 pub fn object_reference(&self) -> ObjectRef {
570 match self {
571 LiveObject::Normal(obj) => obj.object_ref(),
572 LiveObject::Wrapped(key) => ObjectRef::new(key.0, key.1, ObjectDigest::OBJECT_WRAPPED),
573 }
574 }
575
576 pub fn to_normal(self) -> Option<Object> {
577 match self {
578 LiveObject::Normal(object) => Some(object),
579 LiveObject::Wrapped(_) => None,
580 }
581 }
582}
583
584impl LiveSetIter<'_> {
585 fn store_object_wrapper_to_live_object(
586 &self,
587 object_key: ObjectKey,
588 store_object: StoreObjectWrapper,
589 ) -> Option<LiveObject> {
590 match store_object.migrate().into_inner() {
591 StoreObject::Value(object) => {
592 let object = self
593 .tables
594 .construct_object(&object_key, *object)
595 .expect("Constructing object from store cannot fail");
596 Some(LiveObject::Normal(object))
597 }
598 StoreObject::Wrapped | StoreObject::Deleted => None,
599 }
600 }
601}
602
603impl Iterator for LiveSetIter<'_> {
604 type Item = LiveObject;
605
606 fn next(&mut self) -> Option<Self::Item> {
607 loop {
608 if let Some(Ok((next_key, next_value))) = self.iter.next() {
609 let prev = self.prev.take();
610 self.prev = Some((next_key, next_value));
611
612 if let Some((prev_key, prev_value)) = prev {
613 if prev_key.0 != next_key.0 {
614 let live_object =
615 self.store_object_wrapper_to_live_object(prev_key, prev_value);
616 if live_object.is_some() {
617 return live_object;
618 }
619 }
620 }
621 continue;
622 }
623 if let Some((key, value)) = self.prev.take() {
624 let live_object = self.store_object_wrapper_to_live_object(key, value);
625 if live_object.is_some() {
626 return live_object;
627 }
628 }
629 return None;
630 }
631 }
632}
633
634fn live_owned_object_markers_table_config(db_options: DBOptions) -> DBOptions {
636 DBOptions {
637 options: db_options
638 .clone()
639 .optimize_for_write_throughput()
640 .optimize_for_read(read_size_from_env(ENV_VAR_LOCKS_BLOCK_CACHE_SIZE).unwrap_or(1024))
641 .options,
642 rw_options: db_options.rw_options,
643 }
644}
645
646fn objects_table_config(
647 mut db_options: DBOptions,
648 compaction_filter: Option<ObjectsCompactionFilter>,
649) -> DBOptions {
650 if let Some(mut compaction_filter) = compaction_filter {
651 db_options
652 .options
653 .set_compaction_filter("objects", move |_, key, value| {
654 match compaction_filter.filter(key, value) {
655 Ok(decision) => decision,
656 Err(err) => {
657 error!("Compaction error: {:?}", err);
658 Decision::Keep
659 }
660 }
661 });
662 }
663 db_options
664 .optimize_for_write_throughput()
665 .optimize_for_read(read_size_from_env(ENV_VAR_OBJECTS_BLOCK_CACHE_SIZE).unwrap_or(5 * 1024))
666}
667
668fn transactions_table_config(db_options: DBOptions) -> DBOptions {
669 db_options
670 .optimize_for_write_throughput()
671 .optimize_for_point_lookup(
672 read_size_from_env(ENV_VAR_TRANSACTIONS_BLOCK_CACHE_SIZE).unwrap_or(512),
673 )
674}
675
676fn effects_table_config(db_options: DBOptions) -> DBOptions {
677 db_options
678 .optimize_for_write_throughput()
679 .optimize_for_point_lookup(
680 read_size_from_env(ENV_VAR_EFFECTS_BLOCK_CACHE_SIZE).unwrap_or(1024),
681 )
682}
683
684fn events_table_config(db_options: DBOptions) -> DBOptions {
685 db_options
686 .optimize_for_write_throughput()
687 .optimize_for_read(read_size_from_env(ENV_VAR_EVENTS_BLOCK_CACHE_SIZE).unwrap_or(1024))
688}