iota_core/epoch/
consensus_store_pruner.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{fs, path::PathBuf, time::Duration};
6
7use consensus_config::Epoch;
8use iota_metrics::spawn_logged_monitored_task;
9use prometheus::{
10    IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
11    register_int_counter_with_registry, register_int_gauge_with_registry,
12};
13use tokio::{
14    sync::mpsc,
15    time::{Instant, sleep},
16};
17use tracing::{error, info};
18use typed_store::rocks::safe_drop_db;
19
20struct Metrics {
21    last_pruned_consensus_db_epoch: IntGauge,
22    successfully_pruned_consensus_dbs: IntCounter,
23    error_pruning_consensus_dbs: IntCounterVec,
24}
25
26impl Metrics {
27    fn new(registry: &Registry) -> Self {
28        Self {
29            last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
30                "last_pruned_consensus_db_epoch",
31                "The last epoch for which the consensus store was pruned",
32                registry
33            )
34            .unwrap(),
35            successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
36                "successfully_pruned_consensus_dbs",
37                "The number of consensus dbs successfully pruned",
38                registry
39            )
40            .unwrap(),
41            error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
42                "error_pruning_consensus_dbs",
43                "The number of errors encountered while pruning consensus dbs",
44                &["mode"],
45                registry
46            )
47            .unwrap(),
48        }
49    }
50}
51
52pub struct ConsensusStorePruner {
53    tx_remove: mpsc::Sender<Epoch>,
54    _handle: tokio::task::JoinHandle<()>,
55}
56
57impl ConsensusStorePruner {
58    pub fn new(
59        base_path: PathBuf,
60        epoch_retention: u64,
61        epoch_prune_period: Duration,
62        registry: &Registry,
63    ) -> Self {
64        let (tx_remove, mut rx_remove) = mpsc::channel(1);
65        let metrics = Metrics::new(registry);
66
67        let _handle = spawn_logged_monitored_task!(async {
68            info!(
69                "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
70            );
71
72            let mut timeout = tokio::time::interval_at(
73                Instant::now() + Duration::from_secs(60), /* allow some time for the node to boot etc before attempting to prune */
74                epoch_prune_period,
75            );
76
77            let mut latest_epoch = 0;
78            loop {
79                tokio::select! {
80                    _ = timeout.tick() => {
81                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
82                    }
83                    result = rx_remove.recv() => {
84                        if result.is_none() {
85                            info!("Closing consensus store pruner");
86                            break;
87                        }
88                        latest_epoch = result.unwrap();
89                        Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
90                    }
91                }
92            }
93        });
94
95        Self { tx_remove, _handle }
96    }
97
98    /// This method will remove all epoch data stores and directories that are
99    /// older than the current epoch minus the epoch retention. The method
100    /// ensures that always the `current_epoch` data is retained.
101    pub async fn prune(&self, current_epoch: Epoch) {
102        let result = self.tx_remove.send(current_epoch).await;
103        if result.is_err() {
104            error!(
105                "Error sending message to data removal task for epoch {:?}",
106                current_epoch,
107            );
108        }
109    }
110
111    async fn prune_old_epoch_data(
112        storage_base_path: &PathBuf,
113        current_epoch: Epoch,
114        epoch_retention: u64,
115        metrics: &Metrics,
116    ) {
117        let drop_boundary = current_epoch.saturating_sub(epoch_retention);
118
119        info!(
120            "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
121            current_epoch, drop_boundary
122        );
123
124        // Get all the epoch stores in the base path directory
125        let files = match fs::read_dir(storage_base_path) {
126            Ok(f) => f,
127            Err(e) => {
128                error!(
129                    "Can not read the files in the storage path directory for epoch cleanup: {:?}",
130                    e
131                );
132                return;
133            }
134        };
135
136        // Look for any that are less than the drop boundary and drop
137        for file_res in files {
138            let f = match file_res {
139                Ok(f) => f,
140                Err(e) => {
141                    error!(
142                        "Error while cleaning up storage of previous epochs: {:?}",
143                        e
144                    );
145                    continue;
146                }
147            };
148
149            let name = f.file_name();
150            let file_epoch_string = match name.to_str() {
151                Some(f) => f,
152                None => continue,
153            };
154
155            let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
156                Ok(f) => f,
157                Err(e) => {
158                    error!(
159                        "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
160                        e
161                    );
162                    continue;
163                }
164            };
165
166            if file_epoch < drop_boundary {
167                if let Err(e) = safe_drop_db(f.path()) {
168                    error!(
169                        "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
170                        f.path(),
171                        e
172                    );
173
174                    metrics
175                        .error_pruning_consensus_dbs
176                        .with_label_values(&["safe"])
177                        .inc();
178
179                    const WAIT_BEFORE_FORCE_DELETE: Duration = Duration::from_secs(5);
180                    sleep(WAIT_BEFORE_FORCE_DELETE).await;
181
182                    if let Err(err) = fs::remove_dir_all(f.path()) {
183                        error!(
184                            "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
185                            f.path(),
186                            err
187                        );
188                        metrics
189                            .error_pruning_consensus_dbs
190                            .with_label_values(&["force"])
191                            .inc();
192                    } else {
193                        info!(
194                            "Successfully pruned consensus epoch storage directory with force delete: {:?}",
195                            f.path()
196                        );
197                        let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
198                        metrics
199                            .last_pruned_consensus_db_epoch
200                            .set(last_epoch.max(file_epoch as i64));
201                        metrics.successfully_pruned_consensus_dbs.inc();
202                    }
203                } else {
204                    info!(
205                        "Successfully pruned consensus epoch storage directory: {:?}",
206                        f.path()
207                    );
208                    let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
209                    metrics
210                        .last_pruned_consensus_db_epoch
211                        .set(last_epoch.max(file_epoch as i64));
212                    metrics.successfully_pruned_consensus_dbs.inc();
213                }
214            }
215        }
216
217        info!(
218            "Completed old epoch data removal process for epoch {:?}",
219            current_epoch
220        );
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use std::fs;
227
228    use prometheus::Registry;
229    use tokio::time::sleep;
230
231    use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
232
233    #[tokio::test]
234    async fn test_remove_old_epoch_data() {
235        telemetry_subscribers::init_for_testing();
236        let metrics = Metrics::new(&Registry::new());
237
238        {
239            // Epoch 0 should not be removed when it's current epoch.
240            let epoch_retention = 0;
241            let current_epoch = 0;
242
243            let base_directory = tempfile::tempdir().unwrap().into_path();
244
245            create_epoch_directories(&base_directory, vec!["0", "other"]);
246
247            ConsensusStorePruner::prune_old_epoch_data(
248                &base_directory,
249                current_epoch,
250                epoch_retention,
251                &metrics,
252            )
253            .await;
254
255            let epochs_left = read_epoch_directories(&base_directory);
256
257            assert_eq!(epochs_left.len(), 1);
258            assert_eq!(epochs_left[0], 0);
259        }
260
261        {
262            // Every directory should be retained only for 1 epoch. We expect any epoch
263            // directories < 99 to be removed.
264            let epoch_retention = 1;
265            let current_epoch = 100;
266
267            let base_directory = tempfile::tempdir().unwrap().into_path();
268
269            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
270
271            ConsensusStorePruner::prune_old_epoch_data(
272                &base_directory,
273                current_epoch,
274                epoch_retention,
275                &metrics,
276            )
277            .await;
278
279            let epochs_left = read_epoch_directories(&base_directory);
280
281            assert_eq!(epochs_left.len(), 2);
282            assert_eq!(epochs_left[0], 99);
283            assert_eq!(epochs_left[1], 100);
284        }
285
286        {
287            // Every directory should be retained only for 0 epochs. That means only the
288            // current epoch directory should be retained and everything else
289            // deleted.
290            let epoch_retention = 0;
291            let current_epoch = 100;
292
293            let base_directory = tempfile::tempdir().unwrap().into_path();
294
295            create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
296
297            ConsensusStorePruner::prune_old_epoch_data(
298                &base_directory,
299                current_epoch,
300                epoch_retention,
301                &metrics,
302            )
303            .await;
304
305            let epochs_left = read_epoch_directories(&base_directory);
306
307            assert_eq!(epochs_left.len(), 1);
308            assert_eq!(epochs_left[0], 100);
309        }
310    }
311
312    #[tokio::test(flavor = "current_thread")]
313    async fn test_consensus_store_pruner() {
314        let epoch_retention = 1;
315        let epoch_prune_period = std::time::Duration::from_millis(500);
316
317        let base_directory = tempfile::tempdir().unwrap().into_path();
318
319        // We create some directories up to epoch 100
320        create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
321
322        let pruner = ConsensusStorePruner::new(
323            base_directory.clone(),
324            epoch_retention,
325            epoch_prune_period,
326            &Registry::new(),
327        );
328
329        // We let the pruner run for a couple of times to prune the old directories.
330        // Since the default epoch of 0 is used no dirs should be pruned.
331        sleep(3 * epoch_prune_period).await;
332
333        // We expect the directories to be the same as before
334        let epoch_dirs = read_epoch_directories(&base_directory);
335        assert_eq!(epoch_dirs.len(), 4);
336
337        // Then we update the epoch and instruct to prune for current epoch = 100
338        pruner.prune(100).await;
339
340        // We let the pruner run and check again the directories - no directories of
341        // epoch < 99 should be left
342        sleep(2 * epoch_prune_period).await;
343
344        let epoch_dirs = read_epoch_directories(&base_directory);
345        assert_eq!(epoch_dirs.len(), 2);
346        assert_eq!(epoch_dirs[0], 99);
347        assert_eq!(epoch_dirs[1], 100);
348    }
349
350    fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
351        for epoch in epochs {
352            let mut path = base_directory.to_path_buf();
353            path.push(epoch);
354            fs::create_dir(path).unwrap();
355        }
356    }
357
358    fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
359        let files = fs::read_dir(base_directory).unwrap();
360
361        let mut epochs = Vec::new();
362        for file_res in files {
363            let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
364            if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
365                epochs.push(file_epoch);
366            }
367        }
368
369        epochs.sort();
370        epochs
371    }
372}