iota_indexer/handlers/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, time::Duration};
6
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9
10use super::checkpoint_handler::CheckpointHandler;
11use crate::{
12    errors::IndexerError,
13    metrics::IndexerMetrics,
14    store::{IndexerStore, PgIndexerStore, pg_partition_manager::PgPartitionManager},
15    types::IndexerResult,
16};
17
18pub struct Pruner {
19    pub store: PgIndexerStore,
20    pub partition_manager: PgPartitionManager,
21    pub epochs_to_keep: u64,
22    pub metrics: IndexerMetrics,
23}
24
25impl Pruner {
26    pub fn new(
27        store: PgIndexerStore,
28        epochs_to_keep: u64,
29        metrics: IndexerMetrics,
30    ) -> Result<Self, IndexerError> {
31        let blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap();
32        let partition_manager = PgPartitionManager::new(blocking_cp.clone())?;
33        Ok(Self {
34            store,
35            partition_manager,
36            epochs_to_keep,
37            metrics,
38        })
39    }
40
41    pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
42        loop {
43            if cancel.is_cancelled() {
44                info!("Pruner task cancelled.");
45                return Ok(());
46            }
47
48            let (mut min_epoch, mut max_epoch) = self.store.get_available_epoch_range().await?;
49            while min_epoch + self.epochs_to_keep > max_epoch {
50                if cancel.is_cancelled() {
51                    info!("Pruner task cancelled.");
52                    return Ok(());
53                }
54                tokio::time::sleep(Duration::from_secs(5)).await;
55                (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
56            }
57
58            // Not all partitioned tables are epoch-partitioned, so we need to filter them
59            // out.
60            let table_partitions: HashMap<_, _> = self
61                .partition_manager
62                .get_table_partitions()?
63                .into_iter()
64                .filter(|(table_name, _)| {
65                    self.partition_manager
66                        .get_strategy(table_name)
67                        .is_epoch_partitioned()
68                })
69                .collect();
70
71            for (table_name, (min_partition, max_partition)) in &table_partitions {
72                if max_epoch != *max_partition {
73                    error!(
74                        "Epochs are out of sync for table {}: max_epoch={}, max_partition={}",
75                        table_name, max_epoch, max_partition
76                    );
77                }
78                // drop partitions if pruning is enabled afterwards, where all epochs before
79                // min_epoch would have been pruned already if the pruner was
80                // running.
81                for epoch in *min_partition..min_epoch {
82                    self.partition_manager
83                        .drop_table_partition(table_name.clone(), epoch)?;
84                    info!(
85                        "Batch dropped table partition {} epoch {}",
86                        table_name, epoch
87                    );
88                }
89            }
90
91            for epoch in min_epoch..max_epoch.saturating_sub(self.epochs_to_keep - 1) {
92                if cancel.is_cancelled() {
93                    info!("Pruner task cancelled.");
94                    return Ok(());
95                }
96                info!("Pruning epoch {}", epoch);
97                for table_name in table_partitions.keys() {
98                    self.partition_manager
99                        .drop_table_partition(table_name.clone(), epoch)?;
100                    info!("Dropped table partition {} epoch {}", table_name, epoch);
101                }
102                self.store.prune_epoch(epoch).await.unwrap_or_else(|e| {
103                    error!("Failed to prune epoch {}: {}", epoch, e);
104                });
105                self.metrics.last_pruned_epoch.set(epoch as i64);
106                info!("Pruned epoch {}", epoch);
107            }
108        }
109    }
110}