1use std::{fs, path::PathBuf, time::Duration};
6
7use consensus_config::Epoch;
8use iota_metrics::spawn_logged_monitored_task;
9use prometheus::{
10 IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
11 register_int_counter_with_registry, register_int_gauge_with_registry,
12};
13use tokio::{
14 sync::mpsc,
15 time::{Instant, sleep},
16};
17use tracing::{error, info};
18use typed_store::rocks::safe_drop_db;
19
20struct Metrics {
21 last_pruned_consensus_db_epoch: IntGauge,
22 successfully_pruned_consensus_dbs: IntCounter,
23 error_pruning_consensus_dbs: IntCounterVec,
24}
25
26impl Metrics {
27 fn new(registry: &Registry) -> Self {
28 Self {
29 last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
30 "last_pruned_consensus_db_epoch",
31 "The last epoch for which the consensus store was pruned",
32 registry
33 )
34 .unwrap(),
35 successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
36 "successfully_pruned_consensus_dbs",
37 "The number of consensus dbs successfully pruned",
38 registry
39 )
40 .unwrap(),
41 error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
42 "error_pruning_consensus_dbs",
43 "The number of errors encountered while pruning consensus dbs",
44 &["mode"],
45 registry
46 )
47 .unwrap(),
48 }
49 }
50}
51
52pub struct ConsensusStorePruner {
53 tx_remove: mpsc::Sender<Epoch>,
54 _handle: tokio::task::JoinHandle<()>,
55}
56
57impl ConsensusStorePruner {
58 pub fn new(
59 base_path: PathBuf,
60 epoch_retention: u64,
61 epoch_prune_period: Duration,
62 registry: &Registry,
63 ) -> Self {
64 let (tx_remove, mut rx_remove) = mpsc::channel(1);
65 let metrics = Metrics::new(registry);
66
67 let _handle = spawn_logged_monitored_task!(async {
68 info!(
69 "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
70 );
71
72 let mut timeout = tokio::time::interval_at(
73 Instant::now() + Duration::from_secs(60), epoch_prune_period,
75 );
76
77 let mut latest_epoch = 0;
78 loop {
79 tokio::select! {
80 _ = timeout.tick() => {
81 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
82 }
83 result = rx_remove.recv() => {
84 if result.is_none() {
85 info!("Closing consensus store pruner");
86 break;
87 }
88 latest_epoch = result.unwrap();
89 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
90 }
91 }
92 }
93 });
94
95 Self { tx_remove, _handle }
96 }
97
98 pub async fn prune(&self, current_epoch: Epoch) {
102 let result = self.tx_remove.send(current_epoch).await;
103 if result.is_err() {
104 error!(
105 "Error sending message to data removal task for epoch {:?}",
106 current_epoch,
107 );
108 }
109 }
110
111 async fn prune_old_epoch_data(
112 storage_base_path: &PathBuf,
113 current_epoch: Epoch,
114 epoch_retention: u64,
115 metrics: &Metrics,
116 ) {
117 let drop_boundary = current_epoch.saturating_sub(epoch_retention);
118
119 info!(
120 "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
121 current_epoch, drop_boundary
122 );
123
124 let files = match fs::read_dir(storage_base_path) {
126 Ok(f) => f,
127 Err(e) => {
128 error!(
129 "Can not read the files in the storage path directory for epoch cleanup: {:?}",
130 e
131 );
132 return;
133 }
134 };
135
136 for file_res in files {
138 let f = match file_res {
139 Ok(f) => f,
140 Err(e) => {
141 error!(
142 "Error while cleaning up storage of previous epochs: {:?}",
143 e
144 );
145 continue;
146 }
147 };
148
149 let name = f.file_name();
150 let file_epoch_string = match name.to_str() {
151 Some(f) => f,
152 None => continue,
153 };
154
155 let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
156 Ok(f) => f,
157 Err(e) => {
158 error!(
159 "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
160 e
161 );
162 continue;
163 }
164 };
165
166 if file_epoch < drop_boundary {
167 if let Err(e) = safe_drop_db(f.path()) {
168 error!(
169 "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
170 f.path(),
171 e
172 );
173
174 metrics
175 .error_pruning_consensus_dbs
176 .with_label_values(&["safe"])
177 .inc();
178
179 const WAIT_BEFORE_FORCE_DELETE: Duration = Duration::from_secs(5);
180 sleep(WAIT_BEFORE_FORCE_DELETE).await;
181
182 if let Err(err) = fs::remove_dir_all(f.path()) {
183 error!(
184 "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
185 f.path(),
186 err
187 );
188 metrics
189 .error_pruning_consensus_dbs
190 .with_label_values(&["force"])
191 .inc();
192 } else {
193 info!(
194 "Successfully pruned consensus epoch storage directory with force delete: {:?}",
195 f.path()
196 );
197 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
198 metrics
199 .last_pruned_consensus_db_epoch
200 .set(last_epoch.max(file_epoch as i64));
201 metrics.successfully_pruned_consensus_dbs.inc();
202 }
203 } else {
204 info!(
205 "Successfully pruned consensus epoch storage directory: {:?}",
206 f.path()
207 );
208 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
209 metrics
210 .last_pruned_consensus_db_epoch
211 .set(last_epoch.max(file_epoch as i64));
212 metrics.successfully_pruned_consensus_dbs.inc();
213 }
214 }
215 }
216
217 info!(
218 "Completed old epoch data removal process for epoch {:?}",
219 current_epoch
220 );
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use std::fs;
227
228 use prometheus::Registry;
229 use tokio::time::sleep;
230
231 use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
232
233 #[tokio::test]
234 async fn test_remove_old_epoch_data() {
235 telemetry_subscribers::init_for_testing();
236 let metrics = Metrics::new(&Registry::new());
237
238 {
239 let epoch_retention = 0;
241 let current_epoch = 0;
242
243 let base_directory = tempfile::tempdir().unwrap().into_path();
244
245 create_epoch_directories(&base_directory, vec!["0", "other"]);
246
247 ConsensusStorePruner::prune_old_epoch_data(
248 &base_directory,
249 current_epoch,
250 epoch_retention,
251 &metrics,
252 )
253 .await;
254
255 let epochs_left = read_epoch_directories(&base_directory);
256
257 assert_eq!(epochs_left.len(), 1);
258 assert_eq!(epochs_left[0], 0);
259 }
260
261 {
262 let epoch_retention = 1;
265 let current_epoch = 100;
266
267 let base_directory = tempfile::tempdir().unwrap().into_path();
268
269 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
270
271 ConsensusStorePruner::prune_old_epoch_data(
272 &base_directory,
273 current_epoch,
274 epoch_retention,
275 &metrics,
276 )
277 .await;
278
279 let epochs_left = read_epoch_directories(&base_directory);
280
281 assert_eq!(epochs_left.len(), 2);
282 assert_eq!(epochs_left[0], 99);
283 assert_eq!(epochs_left[1], 100);
284 }
285
286 {
287 let epoch_retention = 0;
291 let current_epoch = 100;
292
293 let base_directory = tempfile::tempdir().unwrap().into_path();
294
295 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
296
297 ConsensusStorePruner::prune_old_epoch_data(
298 &base_directory,
299 current_epoch,
300 epoch_retention,
301 &metrics,
302 )
303 .await;
304
305 let epochs_left = read_epoch_directories(&base_directory);
306
307 assert_eq!(epochs_left.len(), 1);
308 assert_eq!(epochs_left[0], 100);
309 }
310 }
311
312 #[tokio::test(flavor = "current_thread")]
313 async fn test_consensus_store_pruner() {
314 let epoch_retention = 1;
315 let epoch_prune_period = std::time::Duration::from_millis(500);
316
317 let base_directory = tempfile::tempdir().unwrap().into_path();
318
319 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
321
322 let pruner = ConsensusStorePruner::new(
323 base_directory.clone(),
324 epoch_retention,
325 epoch_prune_period,
326 &Registry::new(),
327 );
328
329 sleep(3 * epoch_prune_period).await;
332
333 let epoch_dirs = read_epoch_directories(&base_directory);
335 assert_eq!(epoch_dirs.len(), 4);
336
337 pruner.prune(100).await;
339
340 sleep(2 * epoch_prune_period).await;
343
344 let epoch_dirs = read_epoch_directories(&base_directory);
345 assert_eq!(epoch_dirs.len(), 2);
346 assert_eq!(epoch_dirs[0], 99);
347 assert_eq!(epoch_dirs[1], 100);
348 }
349
350 fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
351 for epoch in epochs {
352 let mut path = base_directory.to_path_buf();
353 path.push(epoch);
354 fs::create_dir(path).unwrap();
355 }
356 }
357
358 fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
359 let files = fs::read_dir(base_directory).unwrap();
360
361 let mut epochs = Vec::new();
362 for file_res in files {
363 let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
364 if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
365 epochs.push(file_epoch);
366 }
367 }
368
369 epochs.sort();
370 epochs
371 }
372}