iota_indexer/handlers/
pruner.rs1use std::{collections::HashMap, time::Duration};
6
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9
10use super::checkpoint_handler::CheckpointHandler;
11use crate::{
12 errors::IndexerError,
13 metrics::IndexerMetrics,
14 store::{IndexerStore, PgIndexerStore, pg_partition_manager::PgPartitionManager},
15 types::IndexerResult,
16};
17
18pub struct Pruner {
19 pub store: PgIndexerStore,
20 pub partition_manager: PgPartitionManager,
21 pub epochs_to_keep: u64,
22 pub metrics: IndexerMetrics,
23}
24
25impl Pruner {
26 pub fn new(
27 store: PgIndexerStore,
28 epochs_to_keep: u64,
29 metrics: IndexerMetrics,
30 ) -> Result<Self, IndexerError> {
31 let blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap();
32 let partition_manager = PgPartitionManager::new(blocking_cp.clone())?;
33 Ok(Self {
34 store,
35 partition_manager,
36 epochs_to_keep,
37 metrics,
38 })
39 }
40
41 pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
42 let mut last_seen_max_epoch = 0;
43 let mut next_prune_epoch = None;
45 while !cancel.is_cancelled() {
46 let (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
47 if max_epoch == last_seen_max_epoch {
48 tokio::time::sleep(Duration::from_secs(5)).await;
49 continue;
50 }
51 last_seen_max_epoch = max_epoch;
52
53 let table_partitions: HashMap<_, _> = self
56 .partition_manager
57 .get_table_partitions()?
58 .into_iter()
59 .filter(|(table_name, _)| {
60 self.partition_manager
61 .get_strategy(table_name)
62 .is_epoch_partitioned()
63 })
64 .collect();
65
66 let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1);
67
68 for (table_name, (min_partition, max_partition)) in &table_partitions {
69 if last_seen_max_epoch != *max_partition {
70 error!(
71 "Epochs are out of sync for table {table_name}: max_epoch={last_seen_max_epoch}, max_partition={max_partition}",
72 );
73 }
74 for epoch in *min_partition..prune_to_epoch {
75 if cancel.is_cancelled() {
76 info!("Pruner task cancelled.");
77 return Ok(());
78 }
79 self.partition_manager
80 .drop_table_partition(table_name.clone(), epoch)?;
81 info!(
82 "Batch dropped table partition {} epoch {}",
83 table_name, epoch
84 );
85 }
86 }
87
88 let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch);
89
90 for epoch in prune_start_epoch..prune_to_epoch {
91 if cancel.is_cancelled() {
92 info!("Pruner task cancelled.");
93 return Ok(());
94 }
95 info!("Pruning epoch {}", epoch);
96 if let Err(err) = self.store.prune_epoch(epoch).await {
97 error!("Failed to prune epoch {epoch}: {err}");
98 break;
99 };
100 self.metrics.last_pruned_epoch.set(epoch as i64);
101 info!("Pruned epoch {}", epoch);
102 next_prune_epoch = Some(epoch + 1);
103 }
104 }
105 info!("Pruner task cancelled.");
106 Ok(())
107 }
108}