iota_indexer/handlers/
pruner.rs1use std::{collections::HashMap, time::Duration};
6
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9
10use super::checkpoint_handler::CheckpointHandler;
11use crate::{
12 errors::IndexerError,
13 metrics::IndexerMetrics,
14 store::{IndexerStore, PgIndexerStore, pg_partition_manager::PgPartitionManager},
15 types::IndexerResult,
16};
17
18pub struct Pruner {
19 pub store: PgIndexerStore,
20 pub partition_manager: PgPartitionManager,
21 pub epochs_to_keep: u64,
22 pub metrics: IndexerMetrics,
23}
24
25impl Pruner {
26 pub fn new(
27 store: PgIndexerStore,
28 epochs_to_keep: u64,
29 metrics: IndexerMetrics,
30 ) -> Result<Self, IndexerError> {
31 let blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap();
32 let partition_manager = PgPartitionManager::new(blocking_cp.clone())?;
33 Ok(Self {
34 store,
35 partition_manager,
36 epochs_to_keep,
37 metrics,
38 })
39 }
40
41 pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
42 loop {
43 if cancel.is_cancelled() {
44 info!("Pruner task cancelled.");
45 return Ok(());
46 }
47
48 let (mut min_epoch, mut max_epoch) = self.store.get_available_epoch_range().await?;
49 while min_epoch + self.epochs_to_keep > max_epoch {
50 if cancel.is_cancelled() {
51 info!("Pruner task cancelled.");
52 return Ok(());
53 }
54 tokio::time::sleep(Duration::from_secs(5)).await;
55 (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
56 }
57
58 let table_partitions: HashMap<_, _> = self
61 .partition_manager
62 .get_table_partitions()?
63 .into_iter()
64 .filter(|(table_name, _)| {
65 self.partition_manager
66 .get_strategy(table_name)
67 .is_epoch_partitioned()
68 })
69 .collect();
70
71 for (table_name, (min_partition, max_partition)) in &table_partitions {
72 if max_epoch != *max_partition {
73 error!(
74 "Epochs are out of sync for table {}: max_epoch={}, max_partition={}",
75 table_name, max_epoch, max_partition
76 );
77 }
78 for epoch in *min_partition..min_epoch {
82 self.partition_manager
83 .drop_table_partition(table_name.clone(), epoch)?;
84 info!(
85 "Batch dropped table partition {} epoch {}",
86 table_name, epoch
87 );
88 }
89 }
90
91 for epoch in min_epoch..max_epoch.saturating_sub(self.epochs_to_keep - 1) {
92 if cancel.is_cancelled() {
93 info!("Pruner task cancelled.");
94 return Ok(());
95 }
96 info!("Pruning epoch {}", epoch);
97 for table_name in table_partitions.keys() {
98 self.partition_manager
99 .drop_table_partition(table_name.clone(), epoch)?;
100 info!("Dropped table partition {} epoch {}", table_name, epoch);
101 }
102 self.store.prune_epoch(epoch).await.unwrap_or_else(|e| {
103 error!("Failed to prune epoch {}: {}", epoch, e);
104 });
105 self.metrics.last_pruned_epoch.set(epoch as i64);
106 info!("Pruned epoch {}", epoch);
107 }
108 }
109 }
110}