1use std::{
6 cmp::{max, min},
7 collections::{BTreeSet, HashMap},
8 sync::{Arc, Mutex},
9 time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use anyhow::anyhow;
13use iota_archival::reader::ArchiveReaderBalancer;
14use iota_config::node::AuthorityStorePruningConfig;
15use iota_metrics::{monitored_scope, spawn_monitored_task};
16use iota_storage::mutex_table::RwLockTable;
17use iota_types::{
18 base_types::{ObjectID, SequenceNumber, VersionNumber},
19 committee::EpochId,
20 effects::{TransactionEffects, TransactionEffectsAPI},
21 message_envelope::Message,
22 messages_checkpoint::{CheckpointContents, CheckpointDigest, CheckpointSequenceNumber},
23 storage::ObjectKey,
24};
25use once_cell::sync::Lazy;
26use prometheus::{
27 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
28 register_int_gauge_with_registry,
29};
30use tokio::{
31 sync::oneshot::{self, Sender},
32 time::Instant,
33};
34use tracing::{debug, error, info, warn};
35use typed_store::{Map, TypedStoreError, rocksdb::LiveFile};
36
37use super::authority_store_tables::AuthorityPerpetualTables;
38use crate::{
39 authority::authority_store_types::{ObjectContentDigest, StoreData, StoreObject},
40 checkpoints::{CheckpointStore, CheckpointWatermark},
41 rest_index::RestIndexStore,
42};
43
44static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
45 [
46 "objects",
47 "effects",
48 "transactions",
49 "events",
50 "executed_effects",
51 "executed_transactions_to_checkpoint",
52 ]
53 .into_iter()
54 .map(|cf| cf.to_string())
55 .collect()
56});
57pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
58
59pub struct AuthorityStorePruner {
63 _objects_pruner_cancel_handle: oneshot::Sender<()>,
64}
65
66static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
67
68pub struct AuthorityStorePruningMetrics {
71 pub last_pruned_checkpoint: IntGauge,
72 pub num_pruned_objects: IntCounter,
73 pub num_pruned_tombstones: IntCounter,
74 pub last_pruned_effects_checkpoint: IntGauge,
75 pub num_epochs_to_retain_for_objects: IntGauge,
76 pub num_epochs_to_retain_for_checkpoints: IntGauge,
77}
78
79impl AuthorityStorePruningMetrics {
80 pub fn new(registry: &Registry) -> Arc<Self> {
84 let this = Self {
85 last_pruned_checkpoint: register_int_gauge_with_registry!(
86 "last_pruned_checkpoint",
87 "Last pruned checkpoint",
88 registry
89 )
90 .unwrap(),
91 num_pruned_objects: register_int_counter_with_registry!(
92 "num_pruned_objects",
93 "Number of pruned objects",
94 registry
95 )
96 .unwrap(),
97 num_pruned_tombstones: register_int_counter_with_registry!(
98 "num_pruned_tombstones",
99 "Number of pruned tombstones",
100 registry
101 )
102 .unwrap(),
103 last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
104 "last_pruned_effects_checkpoint",
105 "Last pruned effects checkpoint",
106 registry
107 )
108 .unwrap(),
109 num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
110 "num_epochs_to_retain_for_objects",
111 "Number of epochs to retain for objects",
112 registry
113 )
114 .unwrap(),
115 num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
116 "num_epochs_to_retain_for_checkpoints",
117 "Number of epochs to retain for checkpoints",
118 registry
119 )
120 .unwrap(),
121 };
122 Arc::new(this)
123 }
124
125 pub fn new_for_test() -> Arc<Self> {
128 Self::new(&Registry::new())
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq)]
134pub enum PruningMode {
135 Objects,
136 Checkpoints,
137}
138
139impl AuthorityStorePruner {
140 async fn prune_objects(
142 transaction_effects: Vec<TransactionEffects>,
143 perpetual_db: &Arc<AuthorityPerpetualTables>,
144 objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
145 checkpoint_number: CheckpointSequenceNumber,
146 metrics: Arc<AuthorityStorePruningMetrics>,
147 indirect_objects_threshold: usize,
148 ) -> anyhow::Result<()> {
149 let _scope = monitored_scope("ObjectsLivePruner");
150 let mut wb = perpetual_db.objects.batch();
151
152 let mut live_object_keys_to_prune = vec![];
154 let mut object_tombstones_to_prune = vec![];
155 for effects in &transaction_effects {
156 for (object_id, seq_number) in effects.modified_at_versions() {
157 live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158 }
159
160 for deleted_object_key in effects.all_tombstones() {
161 object_tombstones_to_prune
162 .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
163 }
164 }
165
166 metrics
167 .num_pruned_objects
168 .inc_by(live_object_keys_to_prune.len() as u64);
169 metrics
170 .num_pruned_tombstones
171 .inc_by(object_tombstones_to_prune.len() as u64);
172
173 let mut indirect_objects: HashMap<_, i64> = HashMap::new();
174 if indirect_objects_threshold > 0 && indirect_objects_threshold < usize::MAX {
175 for object in perpetual_db
176 .objects
177 .multi_get(live_object_keys_to_prune.iter())?
178 .into_iter()
179 .flatten()
180 {
181 if let StoreObject::Value(obj) = object.into_inner() {
182 if let StoreData::IndirectObject(indirect_object) = obj.data {
183 *indirect_objects.entry(indirect_object.digest).or_default() -= 1;
184 }
185 }
186 }
187 }
188
189 let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
190 for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
191 updates
192 .entry(object_id)
193 .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
194 .or_insert((seq_number, seq_number));
195 }
196
197 for (object_id, (min_version, max_version)) in updates {
198 debug!(
199 "Pruning object {:?} versions {:?} - {:?}",
200 object_id, min_version, max_version
201 );
202 let start_range = ObjectKey(object_id, min_version);
203 let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
204 wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
205 }
206
207 if !object_tombstones_to_prune.is_empty() {
216 let mut object_keys_to_delete = vec![];
217 for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
218 for result in perpetual_db.objects.safe_iter_with_bounds(
219 Some(ObjectKey(object_id, VersionNumber::MIN)),
220 Some(ObjectKey(object_id, seq_number.next())),
221 ) {
222 let (object_key, _) = result?;
223 assert_eq!(object_key.0, object_id);
224 object_keys_to_delete.push(object_key);
225 }
226 }
227
228 wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
229 }
230
231 if !indirect_objects.is_empty() {
232 let ref_count_update = indirect_objects
233 .iter()
234 .map(|(digest, delta)| (digest, delta.to_le_bytes()));
235 wb.partial_merge_batch(&perpetual_db.indirect_move_objects, ref_count_update)?;
236 }
237 perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
238 metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
239
240 let _locks = objects_lock_table
241 .acquire_locks(indirect_objects.into_keys())
242 .await;
243 wb.write()?;
244 Ok(())
245 }
246
247 fn prune_checkpoints(
253 perpetual_db: &Arc<AuthorityPerpetualTables>,
254 checkpoint_db: &Arc<CheckpointStore>,
255 rest_index: Option<&RestIndexStore>,
256 checkpoint_number: CheckpointSequenceNumber,
257 checkpoints_to_prune: Vec<CheckpointDigest>,
258 checkpoint_content_to_prune: Vec<CheckpointContents>,
259 effects_to_prune: &Vec<TransactionEffects>,
260 metrics: Arc<AuthorityStorePruningMetrics>,
261 ) -> anyhow::Result<()> {
262 let _scope = monitored_scope("EffectsLivePruner");
263
264 let mut perpetual_batch = perpetual_db.objects.batch();
265 let transactions: Vec<_> = checkpoint_content_to_prune
266 .iter()
267 .flat_map(|content| content.iter().map(|tx| tx.transaction))
268 .collect();
269
270 perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
271 perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
272 perpetual_batch.delete_batch(
273 &perpetual_db.executed_transactions_to_checkpoint,
274 transactions,
275 )?;
276
277 let mut effect_digests = vec![];
278 for effects in effects_to_prune {
279 let effects_digest = effects.digest();
280 debug!("Pruning effects {:?}", effects_digest);
281 effect_digests.push(effects_digest);
282
283 if let Some(event_digest) = effects.events_digest() {
284 if let Some(next_digest) = event_digest.next_lexicographical() {
285 perpetual_batch.schedule_delete_range(
286 &perpetual_db.events,
287 &(*event_digest, 0),
288 &(next_digest, 0),
289 )?;
290 }
291 }
292 }
293 perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
294
295 let mut checkpoints_batch = checkpoint_db.certified_checkpoints.batch();
296
297 let checkpoint_content_digests =
298 checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
299 checkpoints_batch.delete_batch(
300 &checkpoint_db.checkpoint_content,
301 checkpoint_content_digests.clone(),
302 )?;
303 checkpoints_batch.delete_batch(
304 &checkpoint_db.checkpoint_sequence_by_contents_digest,
305 checkpoint_content_digests,
306 )?;
307
308 checkpoints_batch
309 .delete_batch(&checkpoint_db.checkpoint_by_digest, checkpoints_to_prune)?;
310
311 checkpoints_batch.insert_batch(
312 &checkpoint_db.watermarks,
313 [(
314 &CheckpointWatermark::HighestPruned,
315 &(checkpoint_number, CheckpointDigest::random()),
316 )],
317 )?;
318
319 if let Some(rest_index) = rest_index {
320 rest_index.prune(&checkpoint_content_to_prune)?;
321 }
322 perpetual_batch.write()?;
323 checkpoints_batch.write()?;
324 metrics
325 .last_pruned_effects_checkpoint
326 .set(checkpoint_number as i64);
327 Ok(())
328 }
329
330 pub async fn prune_objects_for_eligible_epochs(
333 perpetual_db: &Arc<AuthorityPerpetualTables>,
334 checkpoint_store: &Arc<CheckpointStore>,
335 rest_index: Option<&RestIndexStore>,
336 objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
337 config: AuthorityStorePruningConfig,
338 metrics: Arc<AuthorityStorePruningMetrics>,
339 indirect_objects_threshold: usize,
340 epoch_duration_ms: u64,
341 ) -> anyhow::Result<()> {
342 let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
343 let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
344 .get_highest_executed_checkpoint()?
345 .map(|c| (*c.sequence_number(), c.epoch))
346 .unwrap_or_default();
347 let pruned_checkpoint_number = perpetual_db.get_highest_pruned_checkpoint()?;
348 if config.smooth && config.num_epochs_to_retain > 0 {
349 max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
350 checkpoint_store,
351 max_eligible_checkpoint_number,
352 pruned_checkpoint_number,
353 epoch_id,
354 epoch_duration_ms,
355 config.num_epochs_to_retain,
356 )?;
357 }
358 Self::prune_for_eligible_epochs(
359 perpetual_db,
360 checkpoint_store,
361 rest_index,
362 PruningMode::Objects,
363 config.num_epochs_to_retain,
364 pruned_checkpoint_number,
365 max_eligible_checkpoint_number,
366 objects_lock_table,
367 config,
368 metrics.clone(),
369 indirect_objects_threshold,
370 )
371 .await
372 }
373
374 pub async fn prune_checkpoints_for_eligible_epochs(
383 perpetual_db: &Arc<AuthorityPerpetualTables>,
384 checkpoint_store: &Arc<CheckpointStore>,
385 rest_index: Option<&RestIndexStore>,
386 objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
387 config: AuthorityStorePruningConfig,
388 metrics: Arc<AuthorityStorePruningMetrics>,
389 indirect_objects_threshold: usize,
390 archive_readers: ArchiveReaderBalancer,
391 epoch_duration_ms: u64,
392 ) -> anyhow::Result<()> {
393 let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
394 let pruned_checkpoint_number =
395 checkpoint_store.get_highest_pruned_checkpoint_seq_number()?;
396 let (last_executed_checkpoint, epoch_id) = checkpoint_store
397 .get_highest_executed_checkpoint()?
398 .map(|c| (*c.sequence_number(), c.epoch))
399 .unwrap_or_default();
400 let latest_archived_checkpoint = archive_readers
401 .get_archive_watermark()
402 .await?
403 .unwrap_or(u64::MAX);
404 let mut max_eligible_checkpoint = min(latest_archived_checkpoint, last_executed_checkpoint);
405 if config.num_epochs_to_retain != u64::MAX {
406 max_eligible_checkpoint = min(
407 max_eligible_checkpoint,
408 perpetual_db.get_highest_pruned_checkpoint()?,
409 );
410 }
411 if config.smooth {
412 if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints {
413 max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
414 checkpoint_store,
415 max_eligible_checkpoint,
416 pruned_checkpoint_number,
417 epoch_id,
418 epoch_duration_ms,
419 num_epochs_to_retain,
420 )?;
421 }
422 }
423 debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
424 Self::prune_for_eligible_epochs(
425 perpetual_db,
426 checkpoint_store,
427 rest_index,
428 PruningMode::Checkpoints,
429 config
430 .num_epochs_to_retain_for_checkpoints()
431 .ok_or_else(|| anyhow!("config value not set"))?,
432 pruned_checkpoint_number,
433 max_eligible_checkpoint,
434 objects_lock_table,
435 config,
436 metrics.clone(),
437 indirect_objects_threshold,
438 )
439 .await
440 }
441
442 pub async fn prune_for_eligible_epochs(
445 perpetual_db: &Arc<AuthorityPerpetualTables>,
446 checkpoint_store: &Arc<CheckpointStore>,
447 rest_index: Option<&RestIndexStore>,
448 mode: PruningMode,
449 num_epochs_to_retain: u64,
450 starting_checkpoint_number: CheckpointSequenceNumber,
451 max_eligible_checkpoint: CheckpointSequenceNumber,
452 objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
453 config: AuthorityStorePruningConfig,
454 metrics: Arc<AuthorityStorePruningMetrics>,
455 indirect_objects_threshold: usize,
456 ) -> anyhow::Result<()> {
457 let _scope = monitored_scope("PruneForEligibleEpochs");
458
459 let mut checkpoint_number = starting_checkpoint_number;
460 let current_epoch = checkpoint_store
461 .get_highest_executed_checkpoint()?
462 .map(|c| c.epoch())
463 .unwrap_or_default();
464
465 let mut checkpoints_to_prune = vec![];
466 let mut checkpoint_content_to_prune = vec![];
467 let mut effects_to_prune = vec![];
468
469 loop {
470 let Some(ckpt) = checkpoint_store
471 .certified_checkpoints
472 .get(&(checkpoint_number + 1))?
473 else {
474 break;
475 };
476 let checkpoint = ckpt.into_inner();
477 if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
483 || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
484 {
485 break;
486 }
487 checkpoint_number = *checkpoint.sequence_number();
488
489 let content = checkpoint_store
490 .get_checkpoint_contents(&checkpoint.content_digest)?
491 .ok_or_else(|| {
492 anyhow::anyhow!(
493 "checkpoint content data is missing: {}",
494 checkpoint.sequence_number
495 )
496 })?;
497 let effects = perpetual_db
498 .effects
499 .multi_get(content.iter().map(|tx| tx.effects))?;
500
501 info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
502 checkpoints_to_prune.push(*checkpoint.digest());
503 checkpoint_content_to_prune.push(content);
504 effects_to_prune.extend(effects.into_iter().flatten());
505
506 if effects_to_prune.len() >= config.max_transactions_in_batch
507 || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
508 {
509 match mode {
510 PruningMode::Objects => {
511 Self::prune_objects(
512 effects_to_prune,
513 perpetual_db,
514 objects_lock_table,
515 checkpoint_number,
516 metrics.clone(),
517 indirect_objects_threshold,
518 )
519 .await?
520 }
521 PruningMode::Checkpoints => Self::prune_checkpoints(
522 perpetual_db,
523 checkpoint_store,
524 rest_index,
525 checkpoint_number,
526 checkpoints_to_prune,
527 checkpoint_content_to_prune,
528 &effects_to_prune,
529 metrics.clone(),
530 )?,
531 };
532 checkpoints_to_prune = vec![];
533 checkpoint_content_to_prune = vec![];
534 effects_to_prune = vec![];
535 tokio::task::yield_now().await;
537 }
538 }
539
540 if !checkpoints_to_prune.is_empty() {
541 match mode {
542 PruningMode::Objects => {
543 Self::prune_objects(
544 effects_to_prune,
545 perpetual_db,
546 objects_lock_table,
547 checkpoint_number,
548 metrics.clone(),
549 indirect_objects_threshold,
550 )
551 .await?
552 }
553 PruningMode::Checkpoints => Self::prune_checkpoints(
554 perpetual_db,
555 checkpoint_store,
556 rest_index,
557 checkpoint_number,
558 checkpoints_to_prune,
559 checkpoint_content_to_prune,
560 &effects_to_prune,
561 metrics.clone(),
562 )?,
563 };
564 }
565 Ok(())
566 }
567
568 fn compact_next_sst_file(
575 perpetual_db: Arc<AuthorityPerpetualTables>,
576 delay_days: usize,
577 last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
578 ) -> anyhow::Result<Option<LiveFile>> {
579 let db_path = perpetual_db.objects.rocksdb.path();
580 let mut state = last_processed
581 .lock()
582 .expect("failed to obtain a lock for last processed SST files");
583 let mut sst_file_for_compaction: Option<LiveFile> = None;
584 let time_threshold =
585 SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
586 for sst_file in perpetual_db.objects.rocksdb.live_files()? {
587 let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
588 let last_modified = std::fs::metadata(file_path)?.modified()?;
589 if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
590 || sst_file.level < 1
591 || sst_file.start_key.is_none()
592 || sst_file.end_key.is_none()
593 || last_modified > time_threshold
594 || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
595 {
596 continue;
597 }
598 if let Some(candidate) = &sst_file_for_compaction {
599 if candidate.size > sst_file.size {
600 continue;
601 }
602 }
603 sst_file_for_compaction = Some(sst_file);
604 }
605 let Some(sst_file) = sst_file_for_compaction else {
606 return Ok(None);
607 };
608 info!(
609 "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
610 sst_file.name, sst_file.size, sst_file.level
611 );
612 perpetual_db.objects.compact_range_raw(
613 &sst_file.column_family_name,
614 sst_file.start_key.clone().unwrap(),
615 sst_file.end_key.clone().unwrap(),
616 )?;
617 state.insert(sst_file.name.clone(), SystemTime::now());
618 Ok(Some(sst_file))
619 }
620
621 fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
625 min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
626 }
627
628 fn smoothed_max_eligible_checkpoint_number(
631 checkpoint_store: &Arc<CheckpointStore>,
632 mut max_eligible_checkpoint: CheckpointSequenceNumber,
633 pruned_checkpoint: CheckpointSequenceNumber,
634 epoch_id: EpochId,
635 epoch_duration_ms: u64,
636 num_epochs_to_retain: u64,
637 ) -> anyhow::Result<CheckpointSequenceNumber> {
638 if epoch_id < num_epochs_to_retain {
639 return Ok(0);
640 }
641 let last_checkpoint_in_epoch = checkpoint_store
642 .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
643 .map(|checkpoint| checkpoint.sequence_number)
644 .unwrap_or_default();
645 max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
646 if max_eligible_checkpoint == 0 {
647 return Ok(max_eligible_checkpoint);
648 }
649 let num_intervals = epoch_duration_ms
650 .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
651 .unwrap_or(1);
652 let delta = max_eligible_checkpoint
653 .checked_sub(pruned_checkpoint)
654 .unwrap_or_default()
655 .checked_div(num_intervals)
656 .unwrap_or(1);
657 Ok(pruned_checkpoint + delta)
658 }
659
660 fn setup_pruning(
661 config: AuthorityStorePruningConfig,
662 epoch_duration_ms: u64,
663 perpetual_db: Arc<AuthorityPerpetualTables>,
664 checkpoint_store: Arc<CheckpointStore>,
665 rest_index: Option<Arc<RestIndexStore>>,
666 objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
667 metrics: Arc<AuthorityStorePruningMetrics>,
668 indirect_objects_threshold: usize,
669 archive_readers: ArchiveReaderBalancer,
670 ) -> Sender<()> {
671 let (sender, mut recv) = tokio::sync::oneshot::channel();
672 debug!(
673 "Starting object pruning service with num_epochs_to_retain={}",
674 config.num_epochs_to_retain
675 );
676
677 let tick_duration =
678 Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
679 let pruning_initial_delay = if cfg!(msim) {
680 Duration::from_millis(1)
681 } else {
682 Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
683 };
684 let mut objects_prune_interval =
685 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
686 let mut checkpoints_prune_interval =
687 tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
688
689 let perpetual_db_for_compaction = perpetual_db.clone();
690 if let Some(delay_days) = config.periodic_compaction_threshold_days {
691 spawn_monitored_task!(async move {
692 let last_processed = Arc::new(Mutex::new(HashMap::new()));
693 loop {
694 let db = perpetual_db_for_compaction.clone();
695 let state = Arc::clone(&last_processed);
696 let result = tokio::task::spawn_blocking(move || {
697 Self::compact_next_sst_file(db, delay_days, state)
698 })
699 .await;
700 let mut sleep_interval_secs = 1;
701 match result {
702 Err(err) => error!("Failed to compact sst file: {:?}", err),
703 Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
704 Ok(Ok(None)) => {
705 sleep_interval_secs = 3600;
706 }
707 _ => {}
708 }
709 tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
710 }
711 });
712 }
713
714 metrics
715 .num_epochs_to_retain_for_objects
716 .set(config.num_epochs_to_retain as i64);
717 metrics.num_epochs_to_retain_for_checkpoints.set(
718 config
719 .num_epochs_to_retain_for_checkpoints
720 .unwrap_or_default() as i64,
721 );
722
723 tokio::task::spawn(async move {
724 loop {
725 tokio::select! {
726 _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
727 if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rest_index.as_deref(), &objects_lock_table, config.clone(), metrics.clone(), indirect_objects_threshold, epoch_duration_ms).await {
728 error!("Failed to prune objects: {:?}", err);
729 }
730 },
731 _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
732 if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rest_index.as_deref(), &objects_lock_table, config.clone(), metrics.clone(), indirect_objects_threshold, archive_readers.clone(), epoch_duration_ms).await {
733 error!("Failed to prune checkpoints: {:?}", err);
734 }
735 },
736 _ = &mut recv => break,
737 }
738 }
739 });
740 sender
741 }
742
743 pub fn new(
746 perpetual_db: Arc<AuthorityPerpetualTables>,
747 checkpoint_store: Arc<CheckpointStore>,
748 rest_index: Option<Arc<RestIndexStore>>,
749 objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
750 mut pruning_config: AuthorityStorePruningConfig,
751 is_validator: bool,
752 epoch_duration_ms: u64,
753 registry: &Registry,
754 indirect_objects_threshold: usize,
755 archive_readers: ArchiveReaderBalancer,
756 ) -> Self {
757 if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
758 {
759 warn!(
760 "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
761 pruning_config.num_epochs_to_retain
762 );
763 if is_validator {
764 warn!("Resetting to aggressive pruner.");
765 pruning_config.num_epochs_to_retain = 0;
766 } else {
767 warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
768 }
769 }
770 AuthorityStorePruner {
771 _objects_pruner_cancel_handle: Self::setup_pruning(
772 pruning_config,
773 epoch_duration_ms,
774 perpetual_db,
775 checkpoint_store,
776 rest_index,
777 objects_lock_table,
778 AuthorityStorePruningMetrics::new(registry),
779 indirect_objects_threshold,
780 archive_readers,
781 ),
782 }
783 }
784
785 pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
788 perpetual_db.objects.compact_range(
789 &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
790 &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
791 )
792 }
793}
794
795#[cfg(test)]
796mod tests {
797 use std::{collections::HashSet, path::Path, sync::Arc, time::Duration};
798
799 use iota_storage::mutex_table::RwLockTable;
800 use iota_types::{
801 base_types::{ObjectDigest, ObjectID, SequenceNumber},
802 effects::{TransactionEffects, TransactionEffectsAPI},
803 object::Object,
804 storage::ObjectKey,
805 };
806 use more_asserts as ma;
807 use prometheus::Registry;
808 use tracing::log::info;
809 use typed_store::{
810 Map,
811 rocks::{DBMap, MetricConf, ReadWriteOptions, util::reference_count_merge_operator},
812 };
813
814 use super::AuthorityStorePruner;
815 use crate::authority::{
816 authority_store_pruner::AuthorityStorePruningMetrics,
817 authority_store_tables::AuthorityPerpetualTables,
818 authority_store_types::{
819 ObjectContentDigest, StoreData, StoreObject, StoreObjectPair, StoreObjectWrapper,
820 get_store_object_pair,
821 },
822 };
823
824 fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
825 let perpetual_db_path = path.join(Path::new("perpetual"));
826 let cf_names = AuthorityPerpetualTables::describe_tables();
827 let cfs: Vec<&str> = cf_names.keys().map(|x| x.as_str()).collect();
828 let mut db_options = typed_store::rocksdb::Options::default();
829 db_options.set_merge_operator(
830 "refcount operator",
831 reference_count_merge_operator,
832 reference_count_merge_operator,
833 );
834 let perpetual_db = typed_store::rocks::open_cf(
835 perpetual_db_path,
836 Some(db_options),
837 MetricConf::new("perpetual_pruning"),
838 &cfs,
839 );
840
841 let mut after_pruning = HashSet::new();
842 let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
843 &perpetual_db?,
844 Some("objects"),
845 &ReadWriteOptions::default(),
848 false,
849 )?;
850 let iter = objects.unbounded_iter();
851 for (k, _v) in iter {
852 after_pruning.insert(k);
853 }
854 Ok(after_pruning)
855 }
856
857 type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
858
859 fn generate_test_data(
860 db: Arc<AuthorityPerpetualTables>,
861 num_versions_per_object: u64,
862 num_object_versions_to_retain: u64,
863 total_unique_object_ids: u32,
864 indirect_object_threshold: usize,
865 ) -> Result<GenerateTestDataResult, anyhow::Error> {
866 assert!(num_versions_per_object >= num_object_versions_to_retain);
867
868 let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
869 let mut batch = db.objects.batch();
870
871 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
872 for id in ids {
873 for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
874 let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
875 if counter < num_object_versions_to_retain.try_into().unwrap() {
876 to_keep.push(object_key);
878 } else {
879 to_delete.push(object_key);
880 }
881 let StoreObjectPair(obj, indirect_obj) = get_store_object_pair(
882 Object::immutable_with_id_for_testing(id),
883 indirect_object_threshold,
884 );
885 batch.insert_batch(
886 &db.objects,
887 [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
888 )?;
889 if let StoreObject::Value(o) = obj.into_inner() {
890 if let StoreData::IndirectObject(metadata) = o.data {
891 batch.merge_batch(
892 &db.indirect_move_objects,
893 [(metadata.digest, indirect_obj.unwrap())],
894 )?;
895 }
896 }
897 }
898
899 if num_object_versions_to_retain == 0 {
901 let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
902 println!("Adding tombstone object {:?}", tombstone_key);
903 batch.insert_batch(
904 &db.objects,
905 [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
906 )?;
907 tombstones.push(tombstone_key);
908 }
909 }
910 batch.write().unwrap();
911 assert_eq!(
912 to_keep.len() as u64,
913 std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
914 * total_unique_object_ids as u64
915 );
916 assert_eq!(
917 tombstones.len() as u64,
918 if num_object_versions_to_retain == 0 {
919 total_unique_object_ids as u64
920 } else {
921 0
922 }
923 );
924 Ok((to_keep, to_delete, tombstones))
925 }
926
927 pub(crate) fn lock_table() -> Arc<RwLockTable<ObjectContentDigest>> {
928 Arc::new(RwLockTable::new(1))
929 }
930
931 async fn run_pruner(
932 path: &Path,
933 num_versions_per_object: u64,
934 num_object_versions_to_retain: u64,
935 total_unique_object_ids: u32,
936 indirect_object_threshold: usize,
937 ) -> Vec<ObjectKey> {
938 let registry = Registry::default();
939 let metrics = AuthorityStorePruningMetrics::new(®istry);
940 let to_keep = {
941 let db = Arc::new(AuthorityPerpetualTables::open(path, None));
942 let (to_keep, to_delete, tombstones) = generate_test_data(
943 db.clone(),
944 num_versions_per_object,
945 num_object_versions_to_retain,
946 total_unique_object_ids,
947 indirect_object_threshold,
948 )
949 .unwrap();
950 let mut effects = TransactionEffects::default();
951 for object in to_delete {
952 effects.unsafe_add_deleted_live_object_for_testing((
953 object.0,
954 object.1,
955 ObjectDigest::MIN,
956 ));
957 }
958 for object in tombstones {
959 effects.unsafe_add_object_tombstone_for_testing((
960 object.0,
961 object.1,
962 ObjectDigest::MIN,
963 ));
964 }
965 AuthorityStorePruner::prune_objects(
966 vec![effects],
967 &db,
968 &lock_table(),
969 0,
970 metrics,
971 indirect_object_threshold,
972 )
973 .await
974 .unwrap();
975 to_keep
976 };
977 tokio::time::sleep(Duration::from_secs(3)).await;
978 to_keep
979 }
980
981 #[tokio::test]
983 async fn test_pruning_objects() {
984 let path = tempfile::tempdir().unwrap().into_path();
985 let to_keep = run_pruner(&path, 3, 2, 1000, 0).await;
986 assert_eq!(
987 HashSet::from_iter(to_keep),
988 get_keys_after_pruning(&path).unwrap()
989 );
990 run_pruner(&tempfile::tempdir().unwrap().into_path(), 3, 2, 1000, 0).await;
991 }
992
993 #[tokio::test]
995 async fn test_pruning_tombstones() {
996 let path = tempfile::tempdir().unwrap().into_path();
997 let to_keep = run_pruner(&path, 0, 0, 1000, 0).await;
998 assert_eq!(to_keep.len(), 0);
999 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1000
1001 let path = tempfile::tempdir().unwrap().into_path();
1002 let to_keep = run_pruner(&path, 3, 0, 1000, 0).await;
1003 assert_eq!(to_keep.len(), 0);
1004 assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1005 }
1006
1007 #[tokio::test]
1008 async fn test_ref_count_pruning() {
1009 let path = tempfile::tempdir().unwrap().into_path();
1010 run_pruner(&path, 3, 2, 1000, 1).await;
1011 {
1012 let perpetual_db = AuthorityPerpetualTables::open(&path, None);
1013 let count = perpetual_db.indirect_move_objects.keys().count();
1014 assert_eq!(count, 1000);
1016 }
1017
1018 let path = tempfile::tempdir().unwrap().into_path();
1019 run_pruner(&path, 3, 0, 1000, 1).await;
1020 {
1021 let perpetual_db = AuthorityPerpetualTables::open(&path, None);
1022 perpetual_db.indirect_move_objects.flush().unwrap();
1023 perpetual_db
1024 .indirect_move_objects
1025 .compact_range(&ObjectDigest::MIN, &ObjectDigest::MAX)
1026 .unwrap();
1027 perpetual_db
1028 .indirect_move_objects
1029 .compact_range(&ObjectDigest::MIN, &ObjectDigest::MAX)
1030 .unwrap();
1031 let count = perpetual_db.indirect_move_objects.keys().count();
1032 assert_eq!(count, 0);
1033 }
1034 }
1035
1036 #[cfg(not(target_env = "msvc"))]
1037 #[tokio::test]
1038 async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1039 let primary_path = tempfile::tempdir()?.into_path();
1040 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None));
1041 let total_unique_object_ids = 10_000;
1042 let num_versions_per_object = 10;
1043 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1044 let mut to_delete = vec![];
1045 for id in ids {
1046 for i in (0..num_versions_per_object).rev() {
1047 if i < num_versions_per_object - 2 {
1048 to_delete.push((id, SequenceNumber::from(i)));
1049 }
1050 let obj = get_store_object_pair(Object::immutable_with_id_for_testing(id), 0).0;
1051 perpetual_db
1052 .objects
1053 .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1054 }
1055 }
1056
1057 fn get_sst_size(path: &Path) -> u64 {
1058 let mut size = 0;
1059 for entry in std::fs::read_dir(path).unwrap() {
1060 let entry = entry.unwrap();
1061 let path = entry.path();
1062 if let Some(ext) = path.extension() {
1063 if ext != "sst" {
1064 continue;
1065 }
1066 size += std::fs::metadata(path).unwrap().len();
1067 }
1068 }
1069 size
1070 }
1071
1072 let db_path = primary_path.clone().join("perpetual");
1073 let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1074 let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1075
1076 perpetual_db.objects.rocksdb.flush()?;
1077 perpetual_db.objects.compact_range_to_bottom(&start, &end)?;
1078 let before_compaction_size = get_sst_size(&db_path);
1079
1080 let mut effects = TransactionEffects::default();
1081 for object in to_delete {
1082 effects.unsafe_add_deleted_live_object_for_testing((
1083 object.0,
1084 object.1,
1085 ObjectDigest::MIN,
1086 ));
1087 }
1088 let registry = Registry::default();
1089 let metrics = AuthorityStorePruningMetrics::new(®istry);
1090 let total_pruned = AuthorityStorePruner::prune_objects(
1091 vec![effects],
1092 &perpetual_db,
1093 &lock_table(),
1094 0,
1095 metrics,
1096 0,
1097 )
1098 .await;
1099 info!("Total pruned keys = {:?}", total_pruned);
1100
1101 perpetual_db.objects.rocksdb.flush()?;
1102 perpetual_db.objects.compact_range_to_bottom(&start, &end)?;
1103 let after_compaction_size = get_sst_size(&db_path);
1104
1105 info!(
1106 "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1107 before_compaction_size, after_compaction_size
1108 );
1109 ma::assert_le!(after_compaction_size, before_compaction_size);
1110 Ok(())
1111 }
1112}
1113
1114#[cfg(test)]
1115#[cfg(not(target_os = "macos"))]
1116#[cfg(not(target_env = "msvc"))]
1117mod pprof_tests {
1118 use std::sync::Arc;
1119
1120 use iota_types::{
1121 base_types::{ObjectDigest, ObjectID, SequenceNumber, VersionNumber},
1122 effects::{TransactionEffects, TransactionEffectsAPI},
1123 object::Object,
1124 storage::ObjectKey,
1125 };
1126 use pprof::Symbol;
1127 use prometheus::Registry;
1128 use tracing::log::{error, info};
1129 use typed_store::{Map, rocks::DBMap};
1130
1131 use super::AuthorityStorePruner;
1132 use crate::authority::{
1133 authority_store_pruner::{AuthorityStorePruningMetrics, tests, tests::lock_table},
1134 authority_store_tables::AuthorityPerpetualTables,
1135 authority_store_types::{StoreObjectWrapper, get_store_object_pair},
1136 };
1137
1138 fn insert_keys(
1139 objects: &DBMap<ObjectKey, StoreObjectWrapper>,
1140 ) -> Result<TransactionEffects, anyhow::Error> {
1141 let mut to_delete = vec![];
1142 let num_versions_to_keep = 2;
1143 let total_unique_object_ids = 100_000;
1144 let num_versions_per_object = 10;
1145 let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1146 for id in ids {
1147 for i in (0..num_versions_per_object).rev() {
1148 let obj = get_store_object_pair(Object::immutable_with_id_for_testing(id), 0).0;
1149 objects.insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1150 if i < num_versions_per_object - num_versions_to_keep {
1151 to_delete.push((id, SequenceNumber::from(i)));
1152 }
1153 objects.insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1154 }
1155 }
1156
1157 let mut effects = TransactionEffects::default();
1158 for object in to_delete {
1159 effects.unsafe_add_deleted_live_object_for_testing((
1160 object.0,
1161 object.1,
1162 ObjectDigest::MIN,
1163 ));
1164 }
1165 Ok(effects)
1166 }
1167
1168 fn read_keys(
1169 objects: &DBMap<ObjectKey, StoreObjectWrapper>,
1170 num_reads: u32,
1171 ) -> Result<(), anyhow::Error> {
1172 let mut i = 0;
1173 while i < num_reads {
1174 let _res = objects.get(&ObjectKey(ObjectID::random(), VersionNumber::MAX))?;
1175 i += 1;
1176 }
1177 Ok(())
1178 }
1179
1180 fn is_rocksdb_range_tombstone_frame(vs: &[Symbol]) -> bool {
1181 for symbol in vs.iter() {
1182 if symbol
1183 .name()
1184 .contains("rocksdb::FragmentedRangeTombstoneList")
1185 {
1186 return true;
1187 }
1188 }
1189 false
1190 }
1191
1192 #[tokio::test]
1193 async fn ensure_no_tombstone_fragmentation_in_stack_frame_with_ignore_tombstones()
1194 -> Result<(), anyhow::Error> {
1195 let registry = Registry::default();
1201 let metrics = AuthorityStorePruningMetrics::new(®istry);
1202 let primary_path = tempfile::tempdir()?.into_path();
1203 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None));
1204 let effects = insert_keys(&perpetual_db.objects)?;
1205 AuthorityStorePruner::prune_objects(
1206 vec![effects],
1207 &perpetual_db,
1208 &tests::lock_table(),
1209 0,
1210 metrics,
1211 1,
1212 )
1213 .await?;
1214 let guard = pprof::ProfilerGuardBuilder::default()
1215 .frequency(1000)
1216 .build()
1217 .unwrap();
1218 read_keys(&perpetual_db.objects, 1000)?;
1219 if let Ok(report) = guard.report().build() {
1220 assert!(!report.data.keys().any(|f| {
1221 f.frames
1222 .iter()
1223 .any(|vs| is_rocksdb_range_tombstone_frame(vs))
1224 }));
1225 }
1226 Ok(())
1227 }
1228
1229 #[tokio::test]
1230 async fn ensure_no_tombstone_fragmentation_in_stack_frame_after_flush()
1231 -> Result<(), anyhow::Error> {
1232 let primary_path = tempfile::tempdir()?.into_path();
1238 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None));
1239 let effects = insert_keys(&perpetual_db.objects)?;
1240 let registry = Registry::default();
1241 let metrics = AuthorityStorePruningMetrics::new(®istry);
1242 AuthorityStorePruner::prune_objects(
1243 vec![effects],
1244 &perpetual_db,
1245 &lock_table(),
1246 0,
1247 metrics,
1248 1,
1249 )
1250 .await?;
1251 if let Ok(()) = perpetual_db.objects.flush() {
1252 info!("Completed flushing objects table");
1253 } else {
1254 error!("Failed to flush objects table");
1255 }
1256 let guard = pprof::ProfilerGuardBuilder::default()
1257 .frequency(1000)
1258 .build()
1259 .unwrap();
1260 read_keys(&perpetual_db.objects, 1000)?;
1261 if let Ok(report) = guard.report().build() {
1262 assert!(!report.data.keys().any(|f| {
1263 f.frames
1264 .iter()
1265 .any(|vs| is_rocksdb_range_tombstone_frame(vs))
1266 }));
1267 }
1268 Ok(())
1269 }
1270}