iota_indexer/handlers/
pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, time::Duration};
6
7use tokio_util::sync::CancellationToken;
8use tracing::{error, info};
9
10use super::checkpoint_handler::CheckpointHandler;
11use crate::{
12    errors::IndexerError,
13    metrics::IndexerMetrics,
14    store::{IndexerStore, PgIndexerStore, pg_partition_manager::PgPartitionManager},
15    types::IndexerResult,
16};
17
18pub struct Pruner {
19    pub store: PgIndexerStore,
20    pub partition_manager: PgPartitionManager,
21    pub epochs_to_keep: u64,
22    pub metrics: IndexerMetrics,
23}
24
25impl Pruner {
26    pub fn new(
27        store: PgIndexerStore,
28        epochs_to_keep: u64,
29        metrics: IndexerMetrics,
30    ) -> Result<Self, IndexerError> {
31        let blocking_cp = CheckpointHandler::pg_blocking_cp(store.clone()).unwrap();
32        let partition_manager = PgPartitionManager::new(blocking_cp.clone())?;
33        Ok(Self {
34            store,
35            partition_manager,
36            epochs_to_keep,
37            metrics,
38        })
39    }
40
41    pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
42        let mut last_seen_max_epoch = 0;
43        // The first epoch that has not yet been pruned.
44        let mut next_prune_epoch = None;
45        while !cancel.is_cancelled() {
46            let (min_epoch, max_epoch) = self.store.get_available_epoch_range().await?;
47            if max_epoch == last_seen_max_epoch {
48                tokio::time::sleep(Duration::from_secs(5)).await;
49                continue;
50            }
51            last_seen_max_epoch = max_epoch;
52
53            // Not all partitioned tables are epoch-partitioned, so we need to filter them
54            // out.
55            let table_partitions: HashMap<_, _> = self
56                .partition_manager
57                .get_table_partitions()?
58                .into_iter()
59                .filter(|(table_name, _)| {
60                    self.partition_manager
61                        .get_strategy(table_name)
62                        .is_epoch_partitioned()
63                })
64                .collect();
65
66            let prune_to_epoch = last_seen_max_epoch.saturating_sub(self.epochs_to_keep - 1);
67
68            for (table_name, (min_partition, max_partition)) in &table_partitions {
69                if last_seen_max_epoch != *max_partition {
70                    error!(
71                        "Epochs are out of sync for table {table_name}: max_epoch={last_seen_max_epoch}, max_partition={max_partition}",
72                    );
73                }
74                for epoch in *min_partition..prune_to_epoch {
75                    if cancel.is_cancelled() {
76                        info!("Pruner task cancelled.");
77                        return Ok(());
78                    }
79                    self.partition_manager
80                        .drop_table_partition(table_name.clone(), epoch)?;
81                    info!(
82                        "Batch dropped table partition {} epoch {}",
83                        table_name, epoch
84                    );
85                }
86            }
87
88            let prune_start_epoch = next_prune_epoch.unwrap_or(min_epoch);
89
90            for epoch in prune_start_epoch..prune_to_epoch {
91                if cancel.is_cancelled() {
92                    info!("Pruner task cancelled.");
93                    return Ok(());
94                }
95                info!("Pruning epoch {}", epoch);
96                if let Err(err) = self.store.prune_epoch(epoch).await {
97                    error!("Failed to prune epoch {epoch}: {err}");
98                    break;
99                };
100                self.metrics.last_pruned_epoch.set(epoch as i64);
101                info!("Pruned epoch {}", epoch);
102                next_prune_epoch = Some(epoch + 1);
103            }
104        }
105        info!("Pruner task cancelled.");
106        Ok(())
107    }
108}