iota_core/epoch/
consensus_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, path::PathBuf, time::Duration};
6
7use consensus_config::Epoch;
8use iota_metrics::spawn_logged_monitored_task;
9use prometheus::{
10    IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
11    register_int_counter_with_registry, register_int_gauge_with_registry,
12};
13use tokio::{sync::mpsc, time::Instant};
14use tracing::{error, info, warn};
15use typed_store::rocks::safe_drop_db;
16
17struct Metrics {
18    last_pruned_consensus_db_epoch: IntGauge,
19    successfully_pruned_consensus_dbs: IntCounter,
20    error_pruning_consensus_dbs: IntCounterVec,
21}
22
23impl Metrics {
24    fn new(registry: &Registry) -> Self {
25        Self {
26            last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
27                "last_pruned_consensus_db_epoch",
28                "The last epoch for which the consensus store was pruned",
29                registry
30            )
31            .unwrap(),
32            successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
33                "successfully_pruned_consensus_dbs",
34                "The number of consensus dbs successfully pruned",
35                registry
36            )
37            .unwrap(),
38            error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
39                "error_pruning_consensus_dbs",
40                "The number of errors encountered while pruning consensus dbs",
41                &["mode"],
42                registry
43            )
44            .unwrap(),
45        }
46    }
47}
48
49pub struct ConsensusStorePruner {
50    tx_remove: mpsc::Sender<Epoch>,
51    _handle: tokio::task::JoinHandle<()>,
52}
53
54impl ConsensusStorePruner {
55    pub fn new(
56        base_path: PathBuf,
57        epoch_retention: u64,
58        epoch_prune_period: Duration,
59        registry: &Registry,
60    ) -> Self {
61        let (tx_remove, mut rx_remove) = mpsc::channel(1);
62        let metrics = Metrics::new(registry);
63
64        let _handle = spawn_logged_monitored_task!(async {
65            info!(
66                "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
67            );
68
69            let mut timeout = tokio::time::interval_at(
70                Instant::now() + Duration::from_secs(60), /* allow some time for the node to boot etc before attempting to prune */
71                epoch_prune_period,
72            );
73
74            let mut latest_epoch = 0;
75            loop {
76                tokio::select! {
77                    _ = timeout.tick() => {
78                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
79                    }
80                    result = rx_remove.recv() => {
81                        if result.is_none() {
82                            info!("Closing consensus store pruner");
83                            break;
84                        }
85                        latest_epoch = result.unwrap();
86                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
87                    }
88                }
89            }
90        });
91
92        Self { tx_remove, _handle }
93    }
94
95    /// This method will remove all epoch data stores and directories that are
96    /// older than the current epoch minus the epoch retention. The method
97    /// ensures that always the `current_epoch` data is retained.
98    pub async fn prune(&self, current_epoch: Epoch) {
99        let result = self.tx_remove.send(current_epoch).await;
100        if result.is_err() {
101            error!(
102                "Error sending message to data removal task for epoch {:?}",
103                current_epoch,
104            );
105        }
106    }
107
108    async fn prune_old_epoch_data(
109        storage_base_path: &PathBuf,
110        current_epoch: Epoch,
111        epoch_retention: u64,
112        metrics: &Metrics,
113    ) {
114        let drop_boundary = current_epoch.saturating_sub(epoch_retention);
115
116        info!(
117            "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
118            current_epoch, drop_boundary
119        );
120
121        // Get all the epoch stores in the base path directory
122        let files = match fs::read_dir(storage_base_path) {
123            Ok(f) => f,
124            Err(e) => {
125                error!(
126                    "Can not read the files in the storage path directory for epoch cleanup: {:?}",
127                    e
128                );
129                return;
130            }
131        };
132
133        // Look for any that are less than the drop boundary and drop
134        for file_res in files {
135            let f = match file_res {
136                Ok(f) => f,
137                Err(e) => {
138                    error!(
139                        "Error while cleaning up storage of previous epochs: {:?}",
140                        e
141                    );
142                    continue;
143                }
144            };
145
146            let name = f.file_name();
147            let file_epoch_string = match name.to_str() {
148                Some(f) => f,
149                None => continue,
150            };
151
152            let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
153                Ok(f) => f,
154                Err(e) => {
155                    error!(
156                        "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
157                        e
158                    );
159                    continue;
160                }
161            };
162
163            if file_epoch < drop_boundary {
164                const WAIT_BEFORE_FORCE_DELETE: Duration = Duration::from_secs(5);
165                if let Err(e) = safe_drop_db(f.path(), WAIT_BEFORE_FORCE_DELETE).await {
166                    warn!(
167                        "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
168                        f.path(),
169                        e
170                    );
171
172                    metrics
173                        .error_pruning_consensus_dbs
174                        .with_label_values(&["safe"])
175                        .inc();
176
177                    if let Err(err) = fs::remove_dir_all(f.path()) {
178                        error!(
179                            "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
180                            f.path(),
181                            err
182                        );
183                        metrics
184                            .error_pruning_consensus_dbs
185                            .with_label_values(&["force"])
186                            .inc();
187                    } else {
188                        info!(
189                            "Successfully pruned consensus epoch storage directory with force delete: {:?}",
190                            f.path()
191                        );
192                        let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
193                        metrics
194                            .last_pruned_consensus_db_epoch
195                            .set(last_epoch.max(file_epoch as i64));
196                        metrics.successfully_pruned_consensus_dbs.inc();
197                    }
198                } else {
199                    info!(
200                        "Successfully pruned consensus epoch storage directory: {:?}",
201                        f.path()
202                    );
203                    let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
204                    metrics
205                        .last_pruned_consensus_db_epoch
206                        .set(last_epoch.max(file_epoch as i64));
207                    metrics.successfully_pruned_consensus_dbs.inc();
208                }
209            }
210        }
211
212        info!(
213            "Completed old epoch data removal process for epoch {:?}",
214            current_epoch
215        );
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use std::fs;
222
223    use prometheus::Registry;
224    use tokio::time::sleep;
225
226    use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
227
228    #[tokio::test]
229    async fn test_remove_old_epoch_data() {
230        telemetry_subscribers::init_for_testing();
231        let metrics = Metrics::new(&Registry::new());
232
233        {
234            // Epoch 0 should not be removed when it's current epoch.
235            let epoch_retention = 0;
236            let current_epoch = 0;
237
238            let base_directory = tempfile::tempdir().unwrap().keep();
239
240            create_epoch_directories(&base_directory, vec!["0", "other"]);
241
242            ConsensusStorePruner::prune_old_epoch_data(
243                &base_directory,
244                current_epoch,
245                epoch_retention,
246                &metrics,
247            )
248            .await;
249
250            let epochs_left = read_epoch_directories(&base_directory);
251
252            assert_eq!(epochs_left.len(), 1);
253            assert_eq!(epochs_left[0], 0);
254        }
255
256        {
257            // Every directory should be retained only for 1 epoch. We expect any epoch
258            // directories < 99 to be removed.
259            let epoch_retention = 1;
260            let current_epoch = 100;
261
262            let base_directory = tempfile::tempdir().unwrap().keep();
263
264            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
265
266            ConsensusStorePruner::prune_old_epoch_data(
267                &base_directory,
268                current_epoch,
269                epoch_retention,
270                &metrics,
271            )
272            .await;
273
274            let epochs_left = read_epoch_directories(&base_directory);
275
276            assert_eq!(epochs_left.len(), 2);
277            assert_eq!(epochs_left[0], 99);
278            assert_eq!(epochs_left[1], 100);
279        }
280
281        {
282            // Every directory should be retained only for 0 epochs. That means only the
283            // current epoch directory should be retained and everything else
284            // deleted.
285            let epoch_retention = 0;
286            let current_epoch = 100;
287
288            let base_directory = tempfile::tempdir().unwrap().keep();
289
290            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
291
292            ConsensusStorePruner::prune_old_epoch_data(
293                &base_directory,
294                current_epoch,
295                epoch_retention,
296                &metrics,
297            )
298            .await;
299
300            let epochs_left = read_epoch_directories(&base_directory);
301
302            assert_eq!(epochs_left.len(), 1);
303            assert_eq!(epochs_left[0], 100);
304        }
305    }
306
307    #[tokio::test(flavor = "current_thread")]
308    async fn test_consensus_store_pruner() {
309        let epoch_retention = 1;
310        let epoch_prune_period = std::time::Duration::from_millis(500);
311
312        let base_directory = tempfile::tempdir().unwrap().keep();
313
314        // We create some directories up to epoch 100
315        create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
316
317        let pruner = ConsensusStorePruner::new(
318            base_directory.clone(),
319            epoch_retention,
320            epoch_prune_period,
321            &Registry::new(),
322        );
323
324        // We let the pruner run for a couple of times to prune the old directories.
325        // Since the default epoch of 0 is used no dirs should be pruned.
326        sleep(3 * epoch_prune_period).await;
327
328        // We expect the directories to be the same as before
329        let epoch_dirs = read_epoch_directories(&base_directory);
330        assert_eq!(epoch_dirs.len(), 4);
331
332        // Then we update the epoch and instruct to prune for current epoch = 100
333        pruner.prune(100).await;
334
335        // We let the pruner run and check again the directories - no directories of
336        // epoch < 99 should be left
337        sleep(2 * epoch_prune_period).await;
338
339        let epoch_dirs = read_epoch_directories(&base_directory);
340        assert_eq!(epoch_dirs.len(), 2);
341        assert_eq!(epoch_dirs[0], 99);
342        assert_eq!(epoch_dirs[1], 100);
343    }
344
345    fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
346        for epoch in epochs {
347            let mut path = base_directory.to_path_buf();
348            path.push(epoch);
349            fs::create_dir(path).unwrap();
350        }
351    }
352
353    fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
354        let files = fs::read_dir(base_directory).unwrap();
355
356        let mut epochs = Vec::new();
357        for file_res in files {
358            let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
359            if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
360                epochs.push(file_epoch);
361            }
362        }
363
364        epochs.sort();
365        epochs
366    }
367}