iota_core/authority/
authority_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::{max, min},
7    collections::{BTreeSet, HashMap},
8    sync::{Arc, Mutex},
9    time::{Duration, SystemTime, UNIX_EPOCH},
10};
11
12use anyhow::anyhow;
13use iota_archival::reader::ArchiveReaderBalancer;
14use iota_config::node::AuthorityStorePruningConfig;
15use iota_metrics::{monitored_scope, spawn_monitored_task};
16use iota_storage::mutex_table::RwLockTable;
17use iota_types::{
18    base_types::{ObjectID, SequenceNumber, VersionNumber},
19    committee::EpochId,
20    effects::{TransactionEffects, TransactionEffectsAPI},
21    message_envelope::Message,
22    messages_checkpoint::{CheckpointContents, CheckpointDigest, CheckpointSequenceNumber},
23    storage::ObjectKey,
24};
25use once_cell::sync::Lazy;
26use prometheus::{
27    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
28    register_int_gauge_with_registry,
29};
30use tokio::{
31    sync::oneshot::{self, Sender},
32    time::Instant,
33};
34use tracing::{debug, error, info, warn};
35use typed_store::{Map, TypedStoreError, rocksdb::LiveFile};
36
37use super::authority_store_tables::AuthorityPerpetualTables;
38use crate::{
39    authority::authority_store_types::{ObjectContentDigest, StoreData, StoreObject},
40    checkpoints::{CheckpointStore, CheckpointWatermark},
41    rest_index::RestIndexStore,
42};
43
44static PERIODIC_PRUNING_TABLES: Lazy<BTreeSet<String>> = Lazy::new(|| {
45    [
46        "objects",
47        "effects",
48        "transactions",
49        "events",
50        "executed_effects",
51        "executed_transactions_to_checkpoint",
52    ]
53    .into_iter()
54    .map(|cf| cf.to_string())
55    .collect()
56});
57pub const EPOCH_DURATION_MS_FOR_TESTING: u64 = 24 * 60 * 60 * 1000;
58
59/// The `AuthorityStorePruner` manages the pruning process for object stores
60/// within the `AuthorityStore`. It includes a cancellation handle that can be
61/// used to stop the pruning task for objects.
62pub struct AuthorityStorePruner {
63    _objects_pruner_cancel_handle: oneshot::Sender<()>,
64}
65
66static MIN_PRUNING_TICK_DURATION_MS: u64 = 10 * 1000;
67
68/// The `AuthorityStorePruningMetrics` tracks various metrics related to the
69/// pruning process of the `AuthorityStore`.
70pub struct AuthorityStorePruningMetrics {
71    pub last_pruned_checkpoint: IntGauge,
72    pub num_pruned_objects: IntCounter,
73    pub num_pruned_tombstones: IntCounter,
74    pub last_pruned_effects_checkpoint: IntGauge,
75    pub num_epochs_to_retain_for_objects: IntGauge,
76    pub num_epochs_to_retain_for_checkpoints: IntGauge,
77}
78
79impl AuthorityStorePruningMetrics {
80    /// Initializes a new instance of `AuthorityStorePruningMetrics` with the
81    /// provided registry, registering various metrics that track the pruning
82    /// operations in the `AuthorityStore`.
83    pub fn new(registry: &Registry) -> Arc<Self> {
84        let this = Self {
85            last_pruned_checkpoint: register_int_gauge_with_registry!(
86                "last_pruned_checkpoint",
87                "Last pruned checkpoint",
88                registry
89            )
90            .unwrap(),
91            num_pruned_objects: register_int_counter_with_registry!(
92                "num_pruned_objects",
93                "Number of pruned objects",
94                registry
95            )
96            .unwrap(),
97            num_pruned_tombstones: register_int_counter_with_registry!(
98                "num_pruned_tombstones",
99                "Number of pruned tombstones",
100                registry
101            )
102            .unwrap(),
103            last_pruned_effects_checkpoint: register_int_gauge_with_registry!(
104                "last_pruned_effects_checkpoint",
105                "Last pruned effects checkpoint",
106                registry
107            )
108            .unwrap(),
109            num_epochs_to_retain_for_objects: register_int_gauge_with_registry!(
110                "num_epochs_to_retain_for_objects",
111                "Number of epochs to retain for objects",
112                registry
113            )
114            .unwrap(),
115            num_epochs_to_retain_for_checkpoints: register_int_gauge_with_registry!(
116                "num_epochs_to_retain_for_checkpoints",
117                "Number of epochs to retain for checkpoints",
118                registry
119            )
120            .unwrap(),
121        };
122        Arc::new(this)
123    }
124
125    /// Creates a new instance of `AuthorityStorePruningMetrics` for testing
126    /// purposes.
127    pub fn new_for_test() -> Arc<Self> {
128        Self::new(&Registry::new())
129    }
130}
131
132/// Pruning modes for the `AuthorityStore`.
133#[derive(Debug, Clone, Copy, PartialEq)]
134pub enum PruningMode {
135    Objects,
136    Checkpoints,
137}
138
139impl AuthorityStorePruner {
140    /// prunes old versions of objects based on transaction effects
141    async fn prune_objects(
142        transaction_effects: Vec<TransactionEffects>,
143        perpetual_db: &Arc<AuthorityPerpetualTables>,
144        objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
145        checkpoint_number: CheckpointSequenceNumber,
146        metrics: Arc<AuthorityStorePruningMetrics>,
147        indirect_objects_threshold: usize,
148    ) -> anyhow::Result<()> {
149        let _scope = monitored_scope("ObjectsLivePruner");
150        let mut wb = perpetual_db.objects.batch();
151
152        // Collect objects keys that need to be deleted from `transaction_effects`.
153        let mut live_object_keys_to_prune = vec![];
154        let mut object_tombstones_to_prune = vec![];
155        for effects in &transaction_effects {
156            for (object_id, seq_number) in effects.modified_at_versions() {
157                live_object_keys_to_prune.push(ObjectKey(object_id, seq_number));
158            }
159
160            for deleted_object_key in effects.all_tombstones() {
161                object_tombstones_to_prune
162                    .push(ObjectKey(deleted_object_key.0, deleted_object_key.1));
163            }
164        }
165
166        metrics
167            .num_pruned_objects
168            .inc_by(live_object_keys_to_prune.len() as u64);
169        metrics
170            .num_pruned_tombstones
171            .inc_by(object_tombstones_to_prune.len() as u64);
172
173        let mut indirect_objects: HashMap<_, i64> = HashMap::new();
174        if indirect_objects_threshold > 0 && indirect_objects_threshold < usize::MAX {
175            for object in perpetual_db
176                .objects
177                .multi_get(live_object_keys_to_prune.iter())?
178                .into_iter()
179                .flatten()
180            {
181                if let StoreObject::Value(obj) = object.into_inner() {
182                    if let StoreData::IndirectObject(indirect_object) = obj.data {
183                        *indirect_objects.entry(indirect_object.digest).or_default() -= 1;
184                    }
185                }
186            }
187        }
188
189        let mut updates: HashMap<ObjectID, (VersionNumber, VersionNumber)> = HashMap::new();
190        for ObjectKey(object_id, seq_number) in live_object_keys_to_prune {
191            updates
192                .entry(object_id)
193                .and_modify(|range| *range = (min(range.0, seq_number), max(range.1, seq_number)))
194                .or_insert((seq_number, seq_number));
195        }
196
197        for (object_id, (min_version, max_version)) in updates {
198            debug!(
199                "Pruning object {:?} versions {:?} - {:?}",
200                object_id, min_version, max_version
201            );
202            let start_range = ObjectKey(object_id, min_version);
203            let end_range = ObjectKey(object_id, (max_version.value() + 1).into());
204            wb.schedule_delete_range(&perpetual_db.objects, &start_range, &end_range)?;
205        }
206
207        // Instead of using range deletes, we
208        // need to do a scan of all the keys for the deleted objects and then do
209        // point deletes to delete all the existing keys. This is because to improve
210        // read performance, we set `ignore_range_deletions` on all read
211        // options, and using range delete to delete tombstones may leak object
212        // (imagine a tombstone is compacted away, but earlier version is still not).
213        // Using point deletes guarantees that all earlier versions are deleted
214        // in the database.
215        if !object_tombstones_to_prune.is_empty() {
216            let mut object_keys_to_delete = vec![];
217            for ObjectKey(object_id, seq_number) in object_tombstones_to_prune {
218                for result in perpetual_db.objects.safe_iter_with_bounds(
219                    Some(ObjectKey(object_id, VersionNumber::MIN)),
220                    Some(ObjectKey(object_id, seq_number.next())),
221                ) {
222                    let (object_key, _) = result?;
223                    assert_eq!(object_key.0, object_id);
224                    object_keys_to_delete.push(object_key);
225                }
226            }
227
228            wb.delete_batch(&perpetual_db.objects, object_keys_to_delete)?;
229        }
230
231        if !indirect_objects.is_empty() {
232            let ref_count_update = indirect_objects
233                .iter()
234                .map(|(digest, delta)| (digest, delta.to_le_bytes()));
235            wb.partial_merge_batch(&perpetual_db.indirect_move_objects, ref_count_update)?;
236        }
237        perpetual_db.set_highest_pruned_checkpoint(&mut wb, checkpoint_number)?;
238        metrics.last_pruned_checkpoint.set(checkpoint_number as i64);
239
240        let _locks = objects_lock_table
241            .acquire_locks(indirect_objects.into_keys())
242            .await;
243        wb.write()?;
244        Ok(())
245    }
246
247    /// Prunes checkpoint-related data from the `AuthorityStore`, including
248    /// transaction effects, executed transactions, and checkpoint contents,
249    /// based on the specified checkpoint number and list of checkpoints to
250    /// prune. This function removes outdated data, updates pruning metrics,
251    /// and maintains database consistency by updating watermarks.
252    fn prune_checkpoints(
253        perpetual_db: &Arc<AuthorityPerpetualTables>,
254        checkpoint_db: &Arc<CheckpointStore>,
255        rest_index: Option<&RestIndexStore>,
256        checkpoint_number: CheckpointSequenceNumber,
257        checkpoints_to_prune: Vec<CheckpointDigest>,
258        checkpoint_content_to_prune: Vec<CheckpointContents>,
259        effects_to_prune: &Vec<TransactionEffects>,
260        metrics: Arc<AuthorityStorePruningMetrics>,
261    ) -> anyhow::Result<()> {
262        let _scope = monitored_scope("EffectsLivePruner");
263
264        let mut perpetual_batch = perpetual_db.objects.batch();
265        let transactions: Vec<_> = checkpoint_content_to_prune
266            .iter()
267            .flat_map(|content| content.iter().map(|tx| tx.transaction))
268            .collect();
269
270        perpetual_batch.delete_batch(&perpetual_db.transactions, transactions.iter())?;
271        perpetual_batch.delete_batch(&perpetual_db.executed_effects, transactions.iter())?;
272        perpetual_batch.delete_batch(
273            &perpetual_db.executed_transactions_to_checkpoint,
274            transactions,
275        )?;
276
277        let mut effect_digests = vec![];
278        for effects in effects_to_prune {
279            let effects_digest = effects.digest();
280            debug!("Pruning effects {:?}", effects_digest);
281            effect_digests.push(effects_digest);
282
283            if let Some(event_digest) = effects.events_digest() {
284                if let Some(next_digest) = event_digest.next_lexicographical() {
285                    perpetual_batch.schedule_delete_range(
286                        &perpetual_db.events,
287                        &(*event_digest, 0),
288                        &(next_digest, 0),
289                    )?;
290                }
291            }
292        }
293        perpetual_batch.delete_batch(&perpetual_db.effects, effect_digests)?;
294
295        let mut checkpoints_batch = checkpoint_db.certified_checkpoints.batch();
296
297        let checkpoint_content_digests =
298            checkpoint_content_to_prune.iter().map(|ckpt| ckpt.digest());
299        checkpoints_batch.delete_batch(
300            &checkpoint_db.checkpoint_content,
301            checkpoint_content_digests.clone(),
302        )?;
303        checkpoints_batch.delete_batch(
304            &checkpoint_db.checkpoint_sequence_by_contents_digest,
305            checkpoint_content_digests,
306        )?;
307
308        checkpoints_batch
309            .delete_batch(&checkpoint_db.checkpoint_by_digest, checkpoints_to_prune)?;
310
311        checkpoints_batch.insert_batch(
312            &checkpoint_db.watermarks,
313            [(
314                &CheckpointWatermark::HighestPruned,
315                &(checkpoint_number, CheckpointDigest::random()),
316            )],
317        )?;
318
319        if let Some(rest_index) = rest_index {
320            rest_index.prune(&checkpoint_content_to_prune)?;
321        }
322        perpetual_batch.write()?;
323        checkpoints_batch.write()?;
324        metrics
325            .last_pruned_effects_checkpoint
326            .set(checkpoint_number as i64);
327        Ok(())
328    }
329
330    /// Prunes old data based on effects from all checkpoints from epochs
331    /// eligible for pruning
332    pub async fn prune_objects_for_eligible_epochs(
333        perpetual_db: &Arc<AuthorityPerpetualTables>,
334        checkpoint_store: &Arc<CheckpointStore>,
335        rest_index: Option<&RestIndexStore>,
336        objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
337        config: AuthorityStorePruningConfig,
338        metrics: Arc<AuthorityStorePruningMetrics>,
339        indirect_objects_threshold: usize,
340        epoch_duration_ms: u64,
341    ) -> anyhow::Result<()> {
342        let _scope = monitored_scope("PruneObjectsForEligibleEpochs");
343        let (mut max_eligible_checkpoint_number, epoch_id) = checkpoint_store
344            .get_highest_executed_checkpoint()?
345            .map(|c| (*c.sequence_number(), c.epoch))
346            .unwrap_or_default();
347        let pruned_checkpoint_number = perpetual_db.get_highest_pruned_checkpoint()?;
348        if config.smooth && config.num_epochs_to_retain > 0 {
349            max_eligible_checkpoint_number = Self::smoothed_max_eligible_checkpoint_number(
350                checkpoint_store,
351                max_eligible_checkpoint_number,
352                pruned_checkpoint_number,
353                epoch_id,
354                epoch_duration_ms,
355                config.num_epochs_to_retain,
356            )?;
357        }
358        Self::prune_for_eligible_epochs(
359            perpetual_db,
360            checkpoint_store,
361            rest_index,
362            PruningMode::Objects,
363            config.num_epochs_to_retain,
364            pruned_checkpoint_number,
365            max_eligible_checkpoint_number,
366            objects_lock_table,
367            config,
368            metrics.clone(),
369            indirect_objects_threshold,
370        )
371        .await
372    }
373
374    /// Asynchronously prunes checkpoint data for eligible epochs based on the
375    /// configuration and current state of the `AuthorityStore`. This
376    /// function determines the range of checkpoints that can be pruned,
377    /// taking into account retention policies, archival watermarks, and
378    /// smoothing options. It then delegates the pruning to the
379    /// `prune_for_eligible_epochs` method.
380    /// The function also updates pruning metrics and ensures proper handling of
381    /// indirect objects.
382    pub async fn prune_checkpoints_for_eligible_epochs(
383        perpetual_db: &Arc<AuthorityPerpetualTables>,
384        checkpoint_store: &Arc<CheckpointStore>,
385        rest_index: Option<&RestIndexStore>,
386        objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
387        config: AuthorityStorePruningConfig,
388        metrics: Arc<AuthorityStorePruningMetrics>,
389        indirect_objects_threshold: usize,
390        archive_readers: ArchiveReaderBalancer,
391        epoch_duration_ms: u64,
392    ) -> anyhow::Result<()> {
393        let _scope = monitored_scope("PruneCheckpointsForEligibleEpochs");
394        let pruned_checkpoint_number =
395            checkpoint_store.get_highest_pruned_checkpoint_seq_number()?;
396        let (last_executed_checkpoint, epoch_id) = checkpoint_store
397            .get_highest_executed_checkpoint()?
398            .map(|c| (*c.sequence_number(), c.epoch))
399            .unwrap_or_default();
400        let latest_archived_checkpoint = archive_readers
401            .get_archive_watermark()
402            .await?
403            .unwrap_or(u64::MAX);
404        let mut max_eligible_checkpoint = min(latest_archived_checkpoint, last_executed_checkpoint);
405        if config.num_epochs_to_retain != u64::MAX {
406            max_eligible_checkpoint = min(
407                max_eligible_checkpoint,
408                perpetual_db.get_highest_pruned_checkpoint()?,
409            );
410        }
411        if config.smooth {
412            if let Some(num_epochs_to_retain) = config.num_epochs_to_retain_for_checkpoints {
413                max_eligible_checkpoint = Self::smoothed_max_eligible_checkpoint_number(
414                    checkpoint_store,
415                    max_eligible_checkpoint,
416                    pruned_checkpoint_number,
417                    epoch_id,
418                    epoch_duration_ms,
419                    num_epochs_to_retain,
420                )?;
421            }
422        }
423        debug!("Max eligible checkpoint {}", max_eligible_checkpoint);
424        Self::prune_for_eligible_epochs(
425            perpetual_db,
426            checkpoint_store,
427            rest_index,
428            PruningMode::Checkpoints,
429            config
430                .num_epochs_to_retain_for_checkpoints()
431                .ok_or_else(|| anyhow!("config value not set"))?,
432            pruned_checkpoint_number,
433            max_eligible_checkpoint,
434            objects_lock_table,
435            config,
436            metrics.clone(),
437            indirect_objects_threshold,
438        )
439        .await
440    }
441
442    /// Prunes old object versions based on effects from all checkpoints from
443    /// epochs eligible for pruning
444    pub async fn prune_for_eligible_epochs(
445        perpetual_db: &Arc<AuthorityPerpetualTables>,
446        checkpoint_store: &Arc<CheckpointStore>,
447        rest_index: Option<&RestIndexStore>,
448        mode: PruningMode,
449        num_epochs_to_retain: u64,
450        starting_checkpoint_number: CheckpointSequenceNumber,
451        max_eligible_checkpoint: CheckpointSequenceNumber,
452        objects_lock_table: &Arc<RwLockTable<ObjectContentDigest>>,
453        config: AuthorityStorePruningConfig,
454        metrics: Arc<AuthorityStorePruningMetrics>,
455        indirect_objects_threshold: usize,
456    ) -> anyhow::Result<()> {
457        let _scope = monitored_scope("PruneForEligibleEpochs");
458
459        let mut checkpoint_number = starting_checkpoint_number;
460        let current_epoch = checkpoint_store
461            .get_highest_executed_checkpoint()?
462            .map(|c| c.epoch())
463            .unwrap_or_default();
464
465        let mut checkpoints_to_prune = vec![];
466        let mut checkpoint_content_to_prune = vec![];
467        let mut effects_to_prune = vec![];
468
469        loop {
470            let Some(ckpt) = checkpoint_store
471                .certified_checkpoints
472                .get(&(checkpoint_number + 1))?
473            else {
474                break;
475            };
476            let checkpoint = ckpt.into_inner();
477            // Skipping because  checkpoint's epoch or checkpoint number is too new.
478            // We have to respect the highest executed checkpoint watermark (including the
479            // watermark itself) because there might be parts of the system that
480            // still require access to old object versions (i.e. state
481            // accumulator).
482            if (current_epoch < checkpoint.epoch() + num_epochs_to_retain)
483                || (*checkpoint.sequence_number() >= max_eligible_checkpoint)
484            {
485                break;
486            }
487            checkpoint_number = *checkpoint.sequence_number();
488
489            let content = checkpoint_store
490                .get_checkpoint_contents(&checkpoint.content_digest)?
491                .ok_or_else(|| {
492                    anyhow::anyhow!(
493                        "checkpoint content data is missing: {}",
494                        checkpoint.sequence_number
495                    )
496                })?;
497            let effects = perpetual_db
498                .effects
499                .multi_get(content.iter().map(|tx| tx.effects))?;
500
501            info!("scheduling pruning for checkpoint {:?}", checkpoint_number);
502            checkpoints_to_prune.push(*checkpoint.digest());
503            checkpoint_content_to_prune.push(content);
504            effects_to_prune.extend(effects.into_iter().flatten());
505
506            if effects_to_prune.len() >= config.max_transactions_in_batch
507                || checkpoints_to_prune.len() >= config.max_checkpoints_in_batch
508            {
509                match mode {
510                    PruningMode::Objects => {
511                        Self::prune_objects(
512                            effects_to_prune,
513                            perpetual_db,
514                            objects_lock_table,
515                            checkpoint_number,
516                            metrics.clone(),
517                            indirect_objects_threshold,
518                        )
519                        .await?
520                    }
521                    PruningMode::Checkpoints => Self::prune_checkpoints(
522                        perpetual_db,
523                        checkpoint_store,
524                        rest_index,
525                        checkpoint_number,
526                        checkpoints_to_prune,
527                        checkpoint_content_to_prune,
528                        &effects_to_prune,
529                        metrics.clone(),
530                    )?,
531                };
532                checkpoints_to_prune = vec![];
533                checkpoint_content_to_prune = vec![];
534                effects_to_prune = vec![];
535                // yield back to the tokio runtime. Prevent potential halt of other tasks
536                tokio::task::yield_now().await;
537            }
538        }
539
540        if !checkpoints_to_prune.is_empty() {
541            match mode {
542                PruningMode::Objects => {
543                    Self::prune_objects(
544                        effects_to_prune,
545                        perpetual_db,
546                        objects_lock_table,
547                        checkpoint_number,
548                        metrics.clone(),
549                        indirect_objects_threshold,
550                    )
551                    .await?
552                }
553                PruningMode::Checkpoints => Self::prune_checkpoints(
554                    perpetual_db,
555                    checkpoint_store,
556                    rest_index,
557                    checkpoint_number,
558                    checkpoints_to_prune,
559                    checkpoint_content_to_prune,
560                    &effects_to_prune,
561                    metrics.clone(),
562                )?,
563            };
564        }
565        Ok(())
566    }
567
568    /// Identifies and compacts the next eligible SST file in the
569    /// `AuthorityStore` that meets the specified conditions for manual
570    /// compaction. This function checks each SST file's metadata, including
571    /// modification time and size, against a delay threshold to determine if it
572    /// should be compacted. If a suitable file is found, it triggers a
573    /// manual compaction and updates the last processed timestamp.
574    fn compact_next_sst_file(
575        perpetual_db: Arc<AuthorityPerpetualTables>,
576        delay_days: usize,
577        last_processed: Arc<Mutex<HashMap<String, SystemTime>>>,
578    ) -> anyhow::Result<Option<LiveFile>> {
579        let db_path = perpetual_db.objects.rocksdb.path();
580        let mut state = last_processed
581            .lock()
582            .expect("failed to obtain a lock for last processed SST files");
583        let mut sst_file_for_compaction: Option<LiveFile> = None;
584        let time_threshold =
585            SystemTime::now() - Duration::from_secs(delay_days as u64 * 24 * 60 * 60);
586        for sst_file in perpetual_db.objects.rocksdb.live_files()? {
587            let file_path = db_path.join(sst_file.name.clone().trim_matches('/'));
588            let last_modified = std::fs::metadata(file_path)?.modified()?;
589            if !PERIODIC_PRUNING_TABLES.contains(&sst_file.column_family_name)
590                || sst_file.level < 1
591                || sst_file.start_key.is_none()
592                || sst_file.end_key.is_none()
593                || last_modified > time_threshold
594                || state.get(&sst_file.name).unwrap_or(&UNIX_EPOCH) > &time_threshold
595            {
596                continue;
597            }
598            if let Some(candidate) = &sst_file_for_compaction {
599                if candidate.size > sst_file.size {
600                    continue;
601                }
602            }
603            sst_file_for_compaction = Some(sst_file);
604        }
605        let Some(sst_file) = sst_file_for_compaction else {
606            return Ok(None);
607        };
608        info!(
609            "Manual compaction of sst file {:?}. Size: {:?}, level: {:?}",
610            sst_file.name, sst_file.size, sst_file.level
611        );
612        perpetual_db.objects.compact_range_raw(
613            &sst_file.column_family_name,
614            sst_file.start_key.clone().unwrap(),
615            sst_file.end_key.clone().unwrap(),
616        )?;
617        state.insert(sst_file.name.clone(), SystemTime::now());
618        Ok(Some(sst_file))
619    }
620
621    /// Calculates the duration in milliseconds for a pruning tick based on the
622    /// provided epoch duration. The function returns the lesser of half the
623    /// epoch duration or 60 seconds.
624    fn pruning_tick_duration_ms(epoch_duration_ms: u64) -> u64 {
625        min(epoch_duration_ms / 2, MIN_PRUNING_TICK_DURATION_MS)
626    }
627
628    /// Calculates a smoothed maximum eligible checkpoint number for pruning,
629    /// balancing the pruning operation over the epoch's duration.
630    fn smoothed_max_eligible_checkpoint_number(
631        checkpoint_store: &Arc<CheckpointStore>,
632        mut max_eligible_checkpoint: CheckpointSequenceNumber,
633        pruned_checkpoint: CheckpointSequenceNumber,
634        epoch_id: EpochId,
635        epoch_duration_ms: u64,
636        num_epochs_to_retain: u64,
637    ) -> anyhow::Result<CheckpointSequenceNumber> {
638        if epoch_id < num_epochs_to_retain {
639            return Ok(0);
640        }
641        let last_checkpoint_in_epoch = checkpoint_store
642            .get_epoch_last_checkpoint(epoch_id - num_epochs_to_retain)?
643            .map(|checkpoint| checkpoint.sequence_number)
644            .unwrap_or_default();
645        max_eligible_checkpoint = max_eligible_checkpoint.min(last_checkpoint_in_epoch);
646        if max_eligible_checkpoint == 0 {
647            return Ok(max_eligible_checkpoint);
648        }
649        let num_intervals = epoch_duration_ms
650            .checked_div(Self::pruning_tick_duration_ms(epoch_duration_ms))
651            .unwrap_or(1);
652        let delta = max_eligible_checkpoint
653            .checked_sub(pruned_checkpoint)
654            .unwrap_or_default()
655            .checked_div(num_intervals)
656            .unwrap_or(1);
657        Ok(pruned_checkpoint + delta)
658    }
659
660    fn setup_pruning(
661        config: AuthorityStorePruningConfig,
662        epoch_duration_ms: u64,
663        perpetual_db: Arc<AuthorityPerpetualTables>,
664        checkpoint_store: Arc<CheckpointStore>,
665        rest_index: Option<Arc<RestIndexStore>>,
666        objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
667        metrics: Arc<AuthorityStorePruningMetrics>,
668        indirect_objects_threshold: usize,
669        archive_readers: ArchiveReaderBalancer,
670    ) -> Sender<()> {
671        let (sender, mut recv) = tokio::sync::oneshot::channel();
672        debug!(
673            "Starting object pruning service with num_epochs_to_retain={}",
674            config.num_epochs_to_retain
675        );
676
677        let tick_duration =
678            Duration::from_millis(Self::pruning_tick_duration_ms(epoch_duration_ms));
679        let pruning_initial_delay = if cfg!(msim) {
680            Duration::from_millis(1)
681        } else {
682            Duration::from_secs(config.pruning_run_delay_seconds.unwrap_or(60 * 60))
683        };
684        let mut objects_prune_interval =
685            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
686        let mut checkpoints_prune_interval =
687            tokio::time::interval_at(Instant::now() + pruning_initial_delay, tick_duration);
688
689        let perpetual_db_for_compaction = perpetual_db.clone();
690        if let Some(delay_days) = config.periodic_compaction_threshold_days {
691            spawn_monitored_task!(async move {
692                let last_processed = Arc::new(Mutex::new(HashMap::new()));
693                loop {
694                    let db = perpetual_db_for_compaction.clone();
695                    let state = Arc::clone(&last_processed);
696                    let result = tokio::task::spawn_blocking(move || {
697                        Self::compact_next_sst_file(db, delay_days, state)
698                    })
699                    .await;
700                    let mut sleep_interval_secs = 1;
701                    match result {
702                        Err(err) => error!("Failed to compact sst file: {:?}", err),
703                        Ok(Err(err)) => error!("Failed to compact sst file: {:?}", err),
704                        Ok(Ok(None)) => {
705                            sleep_interval_secs = 3600;
706                        }
707                        _ => {}
708                    }
709                    tokio::time::sleep(Duration::from_secs(sleep_interval_secs)).await;
710                }
711            });
712        }
713
714        metrics
715            .num_epochs_to_retain_for_objects
716            .set(config.num_epochs_to_retain as i64);
717        metrics.num_epochs_to_retain_for_checkpoints.set(
718            config
719                .num_epochs_to_retain_for_checkpoints
720                .unwrap_or_default() as i64,
721        );
722
723        tokio::task::spawn(async move {
724            loop {
725                tokio::select! {
726                    _ = objects_prune_interval.tick(), if config.num_epochs_to_retain != u64::MAX => {
727                        if let Err(err) = Self::prune_objects_for_eligible_epochs(&perpetual_db, &checkpoint_store, rest_index.as_deref(), &objects_lock_table, config.clone(), metrics.clone(), indirect_objects_threshold, epoch_duration_ms).await {
728                            error!("Failed to prune objects: {:?}", err);
729                        }
730                    },
731                    _ = checkpoints_prune_interval.tick(), if !matches!(config.num_epochs_to_retain_for_checkpoints(), None | Some(u64::MAX) | Some(0)) => {
732                        if let Err(err) = Self::prune_checkpoints_for_eligible_epochs(&perpetual_db, &checkpoint_store, rest_index.as_deref(), &objects_lock_table, config.clone(), metrics.clone(), indirect_objects_threshold, archive_readers.clone(), epoch_duration_ms).await {
733                            error!("Failed to prune checkpoints: {:?}", err);
734                        }
735                    },
736                    _ = &mut recv => break,
737                }
738            }
739        });
740        sender
741    }
742
743    /// Initializes a new instance of `AuthorityStorePruner` with the provided
744    /// configuration, database connections, and metrics registry.
745    pub fn new(
746        perpetual_db: Arc<AuthorityPerpetualTables>,
747        checkpoint_store: Arc<CheckpointStore>,
748        rest_index: Option<Arc<RestIndexStore>>,
749        objects_lock_table: Arc<RwLockTable<ObjectContentDigest>>,
750        mut pruning_config: AuthorityStorePruningConfig,
751        is_validator: bool,
752        epoch_duration_ms: u64,
753        registry: &Registry,
754        indirect_objects_threshold: usize,
755        archive_readers: ArchiveReaderBalancer,
756    ) -> Self {
757        if pruning_config.num_epochs_to_retain > 0 && pruning_config.num_epochs_to_retain < u64::MAX
758        {
759            warn!(
760                "Using objects pruner with num_epochs_to_retain = {} can lead to performance issues",
761                pruning_config.num_epochs_to_retain
762            );
763            if is_validator {
764                warn!("Resetting to aggressive pruner.");
765                pruning_config.num_epochs_to_retain = 0;
766            } else {
767                warn!("Consider using an aggressive pruner (num_epochs_to_retain = 0)");
768            }
769        }
770        AuthorityStorePruner {
771            _objects_pruner_cancel_handle: Self::setup_pruning(
772                pruning_config,
773                epoch_duration_ms,
774                perpetual_db,
775                checkpoint_store,
776                rest_index,
777                objects_lock_table,
778                AuthorityStorePruningMetrics::new(registry),
779                indirect_objects_threshold,
780                archive_readers,
781            ),
782        }
783    }
784
785    /// Compacts the entire range of objects stored in the `AuthorityStore` by
786    /// invoking a range compaction on the database.
787    pub fn compact(perpetual_db: &Arc<AuthorityPerpetualTables>) -> Result<(), TypedStoreError> {
788        perpetual_db.objects.compact_range(
789            &ObjectKey(ObjectID::ZERO, SequenceNumber::MIN),
790            &ObjectKey(ObjectID::MAX, SequenceNumber::MAX),
791        )
792    }
793}
794
795#[cfg(test)]
796mod tests {
797    use std::{collections::HashSet, path::Path, sync::Arc, time::Duration};
798
799    use iota_storage::mutex_table::RwLockTable;
800    use iota_types::{
801        base_types::{ObjectDigest, ObjectID, SequenceNumber},
802        effects::{TransactionEffects, TransactionEffectsAPI},
803        object::Object,
804        storage::ObjectKey,
805    };
806    use more_asserts as ma;
807    use prometheus::Registry;
808    use tracing::log::info;
809    use typed_store::{
810        Map,
811        rocks::{DBMap, MetricConf, ReadWriteOptions, util::reference_count_merge_operator},
812    };
813
814    use super::AuthorityStorePruner;
815    use crate::authority::{
816        authority_store_pruner::AuthorityStorePruningMetrics,
817        authority_store_tables::AuthorityPerpetualTables,
818        authority_store_types::{
819            ObjectContentDigest, StoreData, StoreObject, StoreObjectPair, StoreObjectWrapper,
820            get_store_object_pair,
821        },
822    };
823
824    fn get_keys_after_pruning(path: &Path) -> anyhow::Result<HashSet<ObjectKey>> {
825        let perpetual_db_path = path.join(Path::new("perpetual"));
826        let cf_names = AuthorityPerpetualTables::describe_tables();
827        let cfs: Vec<&str> = cf_names.keys().map(|x| x.as_str()).collect();
828        let mut db_options = typed_store::rocksdb::Options::default();
829        db_options.set_merge_operator(
830            "refcount operator",
831            reference_count_merge_operator,
832            reference_count_merge_operator,
833        );
834        let perpetual_db = typed_store::rocks::open_cf(
835            perpetual_db_path,
836            Some(db_options),
837            MetricConf::new("perpetual_pruning"),
838            &cfs,
839        );
840
841        let mut after_pruning = HashSet::new();
842        let objects = DBMap::<ObjectKey, StoreObjectWrapper>::reopen(
843            &perpetual_db?,
844            Some("objects"),
845            // open the db to bypass default db options which ignores range tombstones
846            // so we can read the accurate number of retained versions
847            &ReadWriteOptions::default(),
848            false,
849        )?;
850        let iter = objects.unbounded_iter();
851        for (k, _v) in iter {
852            after_pruning.insert(k);
853        }
854        Ok(after_pruning)
855    }
856
857    type GenerateTestDataResult = (Vec<ObjectKey>, Vec<ObjectKey>, Vec<ObjectKey>);
858
859    fn generate_test_data(
860        db: Arc<AuthorityPerpetualTables>,
861        num_versions_per_object: u64,
862        num_object_versions_to_retain: u64,
863        total_unique_object_ids: u32,
864        indirect_object_threshold: usize,
865    ) -> Result<GenerateTestDataResult, anyhow::Error> {
866        assert!(num_versions_per_object >= num_object_versions_to_retain);
867
868        let (mut to_keep, mut to_delete, mut tombstones) = (vec![], vec![], vec![]);
869        let mut batch = db.objects.batch();
870
871        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids.into())?;
872        for id in ids {
873            for (counter, seq) in (0..num_versions_per_object).rev().enumerate() {
874                let object_key = ObjectKey(id, SequenceNumber::from_u64(seq));
875                if counter < num_object_versions_to_retain.try_into().unwrap() {
876                    // latest `num_object_versions_to_retain` should not have been pruned
877                    to_keep.push(object_key);
878                } else {
879                    to_delete.push(object_key);
880                }
881                let StoreObjectPair(obj, indirect_obj) = get_store_object_pair(
882                    Object::immutable_with_id_for_testing(id),
883                    indirect_object_threshold,
884                );
885                batch.insert_batch(
886                    &db.objects,
887                    [(ObjectKey(id, SequenceNumber::from(seq)), obj.clone())],
888                )?;
889                if let StoreObject::Value(o) = obj.into_inner() {
890                    if let StoreData::IndirectObject(metadata) = o.data {
891                        batch.merge_batch(
892                            &db.indirect_move_objects,
893                            [(metadata.digest, indirect_obj.unwrap())],
894                        )?;
895                    }
896                }
897            }
898
899            // Adding a tombstone for deleted object.
900            if num_object_versions_to_retain == 0 {
901                let tombstone_key = ObjectKey(id, SequenceNumber::from(num_versions_per_object));
902                println!("Adding tombstone object {:?}", tombstone_key);
903                batch.insert_batch(
904                    &db.objects,
905                    [(tombstone_key, StoreObjectWrapper::V1(StoreObject::Deleted))],
906                )?;
907                tombstones.push(tombstone_key);
908            }
909        }
910        batch.write().unwrap();
911        assert_eq!(
912            to_keep.len() as u64,
913            std::cmp::min(num_object_versions_to_retain, num_versions_per_object)
914                * total_unique_object_ids as u64
915        );
916        assert_eq!(
917            tombstones.len() as u64,
918            if num_object_versions_to_retain == 0 {
919                total_unique_object_ids as u64
920            } else {
921                0
922            }
923        );
924        Ok((to_keep, to_delete, tombstones))
925    }
926
927    pub(crate) fn lock_table() -> Arc<RwLockTable<ObjectContentDigest>> {
928        Arc::new(RwLockTable::new(1))
929    }
930
931    async fn run_pruner(
932        path: &Path,
933        num_versions_per_object: u64,
934        num_object_versions_to_retain: u64,
935        total_unique_object_ids: u32,
936        indirect_object_threshold: usize,
937    ) -> Vec<ObjectKey> {
938        let registry = Registry::default();
939        let metrics = AuthorityStorePruningMetrics::new(&registry);
940        let to_keep = {
941            let db = Arc::new(AuthorityPerpetualTables::open(path, None));
942            let (to_keep, to_delete, tombstones) = generate_test_data(
943                db.clone(),
944                num_versions_per_object,
945                num_object_versions_to_retain,
946                total_unique_object_ids,
947                indirect_object_threshold,
948            )
949            .unwrap();
950            let mut effects = TransactionEffects::default();
951            for object in to_delete {
952                effects.unsafe_add_deleted_live_object_for_testing((
953                    object.0,
954                    object.1,
955                    ObjectDigest::MIN,
956                ));
957            }
958            for object in tombstones {
959                effects.unsafe_add_object_tombstone_for_testing((
960                    object.0,
961                    object.1,
962                    ObjectDigest::MIN,
963                ));
964            }
965            AuthorityStorePruner::prune_objects(
966                vec![effects],
967                &db,
968                &lock_table(),
969                0,
970                metrics,
971                indirect_object_threshold,
972            )
973            .await
974            .unwrap();
975            to_keep
976        };
977        tokio::time::sleep(Duration::from_secs(3)).await;
978        to_keep
979    }
980
981    // Tests pruning old version of live objects.
982    #[tokio::test]
983    async fn test_pruning_objects() {
984        let path = tempfile::tempdir().unwrap().into_path();
985        let to_keep = run_pruner(&path, 3, 2, 1000, 0).await;
986        assert_eq!(
987            HashSet::from_iter(to_keep),
988            get_keys_after_pruning(&path).unwrap()
989        );
990        run_pruner(&tempfile::tempdir().unwrap().into_path(), 3, 2, 1000, 0).await;
991    }
992
993    // Tests pruning deleted objects (object tombstones).
994    #[tokio::test]
995    async fn test_pruning_tombstones() {
996        let path = tempfile::tempdir().unwrap().into_path();
997        let to_keep = run_pruner(&path, 0, 0, 1000, 0).await;
998        assert_eq!(to_keep.len(), 0);
999        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1000
1001        let path = tempfile::tempdir().unwrap().into_path();
1002        let to_keep = run_pruner(&path, 3, 0, 1000, 0).await;
1003        assert_eq!(to_keep.len(), 0);
1004        assert_eq!(get_keys_after_pruning(&path).unwrap().len(), 0);
1005    }
1006
1007    #[tokio::test]
1008    async fn test_ref_count_pruning() {
1009        let path = tempfile::tempdir().unwrap().into_path();
1010        run_pruner(&path, 3, 2, 1000, 1).await;
1011        {
1012            let perpetual_db = AuthorityPerpetualTables::open(&path, None);
1013            let count = perpetual_db.indirect_move_objects.keys().count();
1014            // references are not reset, expected to have 1000 unique objects
1015            assert_eq!(count, 1000);
1016        }
1017
1018        let path = tempfile::tempdir().unwrap().into_path();
1019        run_pruner(&path, 3, 0, 1000, 1).await;
1020        {
1021            let perpetual_db = AuthorityPerpetualTables::open(&path, None);
1022            perpetual_db.indirect_move_objects.flush().unwrap();
1023            perpetual_db
1024                .indirect_move_objects
1025                .compact_range(&ObjectDigest::MIN, &ObjectDigest::MAX)
1026                .unwrap();
1027            perpetual_db
1028                .indirect_move_objects
1029                .compact_range(&ObjectDigest::MIN, &ObjectDigest::MAX)
1030                .unwrap();
1031            let count = perpetual_db.indirect_move_objects.keys().count();
1032            assert_eq!(count, 0);
1033        }
1034    }
1035
1036    #[cfg(not(target_env = "msvc"))]
1037    #[tokio::test]
1038    async fn test_db_size_after_compaction() -> Result<(), anyhow::Error> {
1039        let primary_path = tempfile::tempdir()?.into_path();
1040        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None));
1041        let total_unique_object_ids = 10_000;
1042        let num_versions_per_object = 10;
1043        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1044        let mut to_delete = vec![];
1045        for id in ids {
1046            for i in (0..num_versions_per_object).rev() {
1047                if i < num_versions_per_object - 2 {
1048                    to_delete.push((id, SequenceNumber::from(i)));
1049                }
1050                let obj = get_store_object_pair(Object::immutable_with_id_for_testing(id), 0).0;
1051                perpetual_db
1052                    .objects
1053                    .insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1054            }
1055        }
1056
1057        fn get_sst_size(path: &Path) -> u64 {
1058            let mut size = 0;
1059            for entry in std::fs::read_dir(path).unwrap() {
1060                let entry = entry.unwrap();
1061                let path = entry.path();
1062                if let Some(ext) = path.extension() {
1063                    if ext != "sst" {
1064                        continue;
1065                    }
1066                    size += std::fs::metadata(path).unwrap().len();
1067                }
1068            }
1069            size
1070        }
1071
1072        let db_path = primary_path.clone().join("perpetual");
1073        let start = ObjectKey(ObjectID::ZERO, SequenceNumber::MIN);
1074        let end = ObjectKey(ObjectID::MAX, SequenceNumber::MAX);
1075
1076        perpetual_db.objects.rocksdb.flush()?;
1077        perpetual_db.objects.compact_range_to_bottom(&start, &end)?;
1078        let before_compaction_size = get_sst_size(&db_path);
1079
1080        let mut effects = TransactionEffects::default();
1081        for object in to_delete {
1082            effects.unsafe_add_deleted_live_object_for_testing((
1083                object.0,
1084                object.1,
1085                ObjectDigest::MIN,
1086            ));
1087        }
1088        let registry = Registry::default();
1089        let metrics = AuthorityStorePruningMetrics::new(&registry);
1090        let total_pruned = AuthorityStorePruner::prune_objects(
1091            vec![effects],
1092            &perpetual_db,
1093            &lock_table(),
1094            0,
1095            metrics,
1096            0,
1097        )
1098        .await;
1099        info!("Total pruned keys = {:?}", total_pruned);
1100
1101        perpetual_db.objects.rocksdb.flush()?;
1102        perpetual_db.objects.compact_range_to_bottom(&start, &end)?;
1103        let after_compaction_size = get_sst_size(&db_path);
1104
1105        info!(
1106            "Before compaction disk size = {:?}, after compaction disk size = {:?}",
1107            before_compaction_size, after_compaction_size
1108        );
1109        ma::assert_le!(after_compaction_size, before_compaction_size);
1110        Ok(())
1111    }
1112}
1113
1114#[cfg(test)]
1115#[cfg(not(target_os = "macos"))]
1116#[cfg(not(target_env = "msvc"))]
1117mod pprof_tests {
1118    use std::sync::Arc;
1119
1120    use iota_types::{
1121        base_types::{ObjectDigest, ObjectID, SequenceNumber, VersionNumber},
1122        effects::{TransactionEffects, TransactionEffectsAPI},
1123        object::Object,
1124        storage::ObjectKey,
1125    };
1126    use pprof::Symbol;
1127    use prometheus::Registry;
1128    use tracing::log::{error, info};
1129    use typed_store::{Map, rocks::DBMap};
1130
1131    use super::AuthorityStorePruner;
1132    use crate::authority::{
1133        authority_store_pruner::{AuthorityStorePruningMetrics, tests, tests::lock_table},
1134        authority_store_tables::AuthorityPerpetualTables,
1135        authority_store_types::{StoreObjectWrapper, get_store_object_pair},
1136    };
1137
1138    fn insert_keys(
1139        objects: &DBMap<ObjectKey, StoreObjectWrapper>,
1140    ) -> Result<TransactionEffects, anyhow::Error> {
1141        let mut to_delete = vec![];
1142        let num_versions_to_keep = 2;
1143        let total_unique_object_ids = 100_000;
1144        let num_versions_per_object = 10;
1145        let ids = ObjectID::in_range(ObjectID::ZERO, total_unique_object_ids)?;
1146        for id in ids {
1147            for i in (0..num_versions_per_object).rev() {
1148                let obj = get_store_object_pair(Object::immutable_with_id_for_testing(id), 0).0;
1149                objects.insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1150                if i < num_versions_per_object - num_versions_to_keep {
1151                    to_delete.push((id, SequenceNumber::from(i)));
1152                }
1153                objects.insert(&ObjectKey(id, SequenceNumber::from(i)), &obj)?;
1154            }
1155        }
1156
1157        let mut effects = TransactionEffects::default();
1158        for object in to_delete {
1159            effects.unsafe_add_deleted_live_object_for_testing((
1160                object.0,
1161                object.1,
1162                ObjectDigest::MIN,
1163            ));
1164        }
1165        Ok(effects)
1166    }
1167
1168    fn read_keys(
1169        objects: &DBMap<ObjectKey, StoreObjectWrapper>,
1170        num_reads: u32,
1171    ) -> Result<(), anyhow::Error> {
1172        let mut i = 0;
1173        while i < num_reads {
1174            let _res = objects.get(&ObjectKey(ObjectID::random(), VersionNumber::MAX))?;
1175            i += 1;
1176        }
1177        Ok(())
1178    }
1179
1180    fn is_rocksdb_range_tombstone_frame(vs: &[Symbol]) -> bool {
1181        for symbol in vs.iter() {
1182            if symbol
1183                .name()
1184                .contains("rocksdb::FragmentedRangeTombstoneList")
1185            {
1186                return true;
1187            }
1188        }
1189        false
1190    }
1191
1192    #[tokio::test]
1193    async fn ensure_no_tombstone_fragmentation_in_stack_frame_with_ignore_tombstones()
1194    -> Result<(), anyhow::Error> {
1195        // This test writes a bunch of objects to objects table, invokes pruning on it
1196        // and then does a bunch of get(). We open the db with
1197        // `ignore_range_delete` set to true (default mode). We then record a
1198        // cpu profile of the `get()` calls and do not find any range fragmentation
1199        // stack frame in it.
1200        let registry = Registry::default();
1201        let metrics = AuthorityStorePruningMetrics::new(&registry);
1202        let primary_path = tempfile::tempdir()?.into_path();
1203        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None));
1204        let effects = insert_keys(&perpetual_db.objects)?;
1205        AuthorityStorePruner::prune_objects(
1206            vec![effects],
1207            &perpetual_db,
1208            &tests::lock_table(),
1209            0,
1210            metrics,
1211            1,
1212        )
1213        .await?;
1214        let guard = pprof::ProfilerGuardBuilder::default()
1215            .frequency(1000)
1216            .build()
1217            .unwrap();
1218        read_keys(&perpetual_db.objects, 1000)?;
1219        if let Ok(report) = guard.report().build() {
1220            assert!(!report.data.keys().any(|f| {
1221                f.frames
1222                    .iter()
1223                    .any(|vs| is_rocksdb_range_tombstone_frame(vs))
1224            }));
1225        }
1226        Ok(())
1227    }
1228
1229    #[tokio::test]
1230    async fn ensure_no_tombstone_fragmentation_in_stack_frame_after_flush()
1231    -> Result<(), anyhow::Error> {
1232        // This test writes a bunch of objects to objects table, invokes pruning on it
1233        // and then does a bunch of get(). We open the db with
1234        // `ignore_range_delete` set to true (default mode). We then record a
1235        // cpu profile of the `get()` calls and do not find any range fragmentation
1236        // stack frame in it.
1237        let primary_path = tempfile::tempdir()?.into_path();
1238        let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&primary_path, None));
1239        let effects = insert_keys(&perpetual_db.objects)?;
1240        let registry = Registry::default();
1241        let metrics = AuthorityStorePruningMetrics::new(&registry);
1242        AuthorityStorePruner::prune_objects(
1243            vec![effects],
1244            &perpetual_db,
1245            &lock_table(),
1246            0,
1247            metrics,
1248            1,
1249        )
1250        .await?;
1251        if let Ok(()) = perpetual_db.objects.flush() {
1252            info!("Completed flushing objects table");
1253        } else {
1254            error!("Failed to flush objects table");
1255        }
1256        let guard = pprof::ProfilerGuardBuilder::default()
1257            .frequency(1000)
1258            .build()
1259            .unwrap();
1260        read_keys(&perpetual_db.objects, 1000)?;
1261        if let Ok(report) = guard.report().build() {
1262            assert!(!report.data.keys().any(|f| {
1263                f.frames
1264                    .iter()
1265                    .any(|vs| is_rocksdb_range_tombstone_frame(vs))
1266            }));
1267        }
1268        Ok(())
1269    }
1270}