1use std::{fs, path::PathBuf, time::Duration};
6
7use consensus_config::Epoch;
8use iota_metrics::spawn_logged_monitored_task;
9use prometheus::{
10 IntCounter, IntCounterVec, IntGauge, Registry, register_int_counter_vec_with_registry,
11 register_int_counter_with_registry, register_int_gauge_with_registry,
12};
13use tokio::{sync::mpsc, time::Instant};
14use tracing::{error, info, warn};
15use typed_store::rocks::safe_drop_db;
16
17struct Metrics {
18 last_pruned_consensus_db_epoch: IntGauge,
19 successfully_pruned_consensus_dbs: IntCounter,
20 error_pruning_consensus_dbs: IntCounterVec,
21}
22
23impl Metrics {
24 fn new(registry: &Registry) -> Self {
25 Self {
26 last_pruned_consensus_db_epoch: register_int_gauge_with_registry!(
27 "last_pruned_consensus_db_epoch",
28 "The last epoch for which the consensus store was pruned",
29 registry
30 )
31 .unwrap(),
32 successfully_pruned_consensus_dbs: register_int_counter_with_registry!(
33 "successfully_pruned_consensus_dbs",
34 "The number of consensus dbs successfully pruned",
35 registry
36 )
37 .unwrap(),
38 error_pruning_consensus_dbs: register_int_counter_vec_with_registry!(
39 "error_pruning_consensus_dbs",
40 "The number of errors encountered while pruning consensus dbs",
41 &["mode"],
42 registry
43 )
44 .unwrap(),
45 }
46 }
47}
48
49pub struct ConsensusStorePruner {
50 tx_remove: mpsc::Sender<Epoch>,
51 _handle: tokio::task::JoinHandle<()>,
52}
53
54impl ConsensusStorePruner {
55 pub fn new(
56 base_path: PathBuf,
57 epoch_retention: u64,
58 epoch_prune_period: Duration,
59 registry: &Registry,
60 ) -> Self {
61 let (tx_remove, mut rx_remove) = mpsc::channel(1);
62 let metrics = Metrics::new(registry);
63
64 let _handle = spawn_logged_monitored_task!(async {
65 info!(
66 "Starting consensus store pruner with epoch retention {epoch_retention} and prune period {epoch_prune_period:?}"
67 );
68
69 let mut timeout = tokio::time::interval_at(
70 Instant::now() + Duration::from_secs(60), epoch_prune_period,
72 );
73
74 let mut latest_epoch = 0;
75 loop {
76 tokio::select! {
77 _ = timeout.tick() => {
78 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
79 }
80 result = rx_remove.recv() => {
81 if result.is_none() {
82 info!("Closing consensus store pruner");
83 break;
84 }
85 latest_epoch = result.unwrap();
86 Self::prune_old_epoch_data(&base_path, latest_epoch, epoch_retention, &metrics).await;
87 }
88 }
89 }
90 });
91
92 Self { tx_remove, _handle }
93 }
94
95 pub async fn prune(&self, current_epoch: Epoch) {
99 let result = self.tx_remove.send(current_epoch).await;
100 if result.is_err() {
101 error!(
102 "Error sending message to data removal task for epoch {:?}",
103 current_epoch,
104 );
105 }
106 }
107
108 async fn prune_old_epoch_data(
109 storage_base_path: &PathBuf,
110 current_epoch: Epoch,
111 epoch_retention: u64,
112 metrics: &Metrics,
113 ) {
114 let drop_boundary = current_epoch.saturating_sub(epoch_retention);
115
116 info!(
117 "Consensus store prunning for current epoch {}. Will remove epochs < {:?}",
118 current_epoch, drop_boundary
119 );
120
121 let files = match fs::read_dir(storage_base_path) {
123 Ok(f) => f,
124 Err(e) => {
125 error!(
126 "Can not read the files in the storage path directory for epoch cleanup: {:?}",
127 e
128 );
129 return;
130 }
131 };
132
133 for file_res in files {
135 let f = match file_res {
136 Ok(f) => f,
137 Err(e) => {
138 error!(
139 "Error while cleaning up storage of previous epochs: {:?}",
140 e
141 );
142 continue;
143 }
144 };
145
146 let name = f.file_name();
147 let file_epoch_string = match name.to_str() {
148 Some(f) => f,
149 None => continue,
150 };
151
152 let file_epoch = match file_epoch_string.to_owned().parse::<u64>() {
153 Ok(f) => f,
154 Err(e) => {
155 error!(
156 "Could not parse file \"{file_epoch_string}\" in storage path into epoch for cleanup: {:?}",
157 e
158 );
159 continue;
160 }
161 };
162
163 if file_epoch < drop_boundary {
164 const WAIT_BEFORE_FORCE_DELETE: Duration = Duration::from_secs(5);
165 if let Err(e) = safe_drop_db(f.path(), WAIT_BEFORE_FORCE_DELETE).await {
166 warn!(
167 "Could not prune old consensus storage \"{:?}\" directory with safe approach. Will fallback to force delete: {:?}",
168 f.path(),
169 e
170 );
171
172 metrics
173 .error_pruning_consensus_dbs
174 .with_label_values(&["safe"])
175 .inc();
176
177 if let Err(err) = fs::remove_dir_all(f.path()) {
178 error!(
179 "Could not prune old consensus storage \"{:?}\" directory with force delete: {:?}",
180 f.path(),
181 err
182 );
183 metrics
184 .error_pruning_consensus_dbs
185 .with_label_values(&["force"])
186 .inc();
187 } else {
188 info!(
189 "Successfully pruned consensus epoch storage directory with force delete: {:?}",
190 f.path()
191 );
192 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
193 metrics
194 .last_pruned_consensus_db_epoch
195 .set(last_epoch.max(file_epoch as i64));
196 metrics.successfully_pruned_consensus_dbs.inc();
197 }
198 } else {
199 info!(
200 "Successfully pruned consensus epoch storage directory: {:?}",
201 f.path()
202 );
203 let last_epoch = metrics.last_pruned_consensus_db_epoch.get();
204 metrics
205 .last_pruned_consensus_db_epoch
206 .set(last_epoch.max(file_epoch as i64));
207 metrics.successfully_pruned_consensus_dbs.inc();
208 }
209 }
210 }
211
212 info!(
213 "Completed old epoch data removal process for epoch {:?}",
214 current_epoch
215 );
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use std::fs;
222
223 use prometheus::Registry;
224 use tokio::time::sleep;
225
226 use crate::epoch::consensus_store_pruner::{ConsensusStorePruner, Metrics};
227
228 #[tokio::test]
229 async fn test_remove_old_epoch_data() {
230 telemetry_subscribers::init_for_testing();
231 let metrics = Metrics::new(&Registry::new());
232
233 {
234 let epoch_retention = 0;
236 let current_epoch = 0;
237
238 let base_directory = tempfile::tempdir().unwrap().keep();
239
240 create_epoch_directories(&base_directory, vec!["0", "other"]);
241
242 ConsensusStorePruner::prune_old_epoch_data(
243 &base_directory,
244 current_epoch,
245 epoch_retention,
246 &metrics,
247 )
248 .await;
249
250 let epochs_left = read_epoch_directories(&base_directory);
251
252 assert_eq!(epochs_left.len(), 1);
253 assert_eq!(epochs_left[0], 0);
254 }
255
256 {
257 let epoch_retention = 1;
260 let current_epoch = 100;
261
262 let base_directory = tempfile::tempdir().unwrap().keep();
263
264 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
265
266 ConsensusStorePruner::prune_old_epoch_data(
267 &base_directory,
268 current_epoch,
269 epoch_retention,
270 &metrics,
271 )
272 .await;
273
274 let epochs_left = read_epoch_directories(&base_directory);
275
276 assert_eq!(epochs_left.len(), 2);
277 assert_eq!(epochs_left[0], 99);
278 assert_eq!(epochs_left[1], 100);
279 }
280
281 {
282 let epoch_retention = 0;
286 let current_epoch = 100;
287
288 let base_directory = tempfile::tempdir().unwrap().keep();
289
290 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
291
292 ConsensusStorePruner::prune_old_epoch_data(
293 &base_directory,
294 current_epoch,
295 epoch_retention,
296 &metrics,
297 )
298 .await;
299
300 let epochs_left = read_epoch_directories(&base_directory);
301
302 assert_eq!(epochs_left.len(), 1);
303 assert_eq!(epochs_left[0], 100);
304 }
305 }
306
307 #[tokio::test(flavor = "current_thread")]
308 async fn test_consensus_store_pruner() {
309 let epoch_retention = 1;
310 let epoch_prune_period = std::time::Duration::from_millis(500);
311
312 let base_directory = tempfile::tempdir().unwrap().keep();
313
314 create_epoch_directories(&base_directory, vec!["97", "98", "99", "100", "other"]);
316
317 let pruner = ConsensusStorePruner::new(
318 base_directory.clone(),
319 epoch_retention,
320 epoch_prune_period,
321 &Registry::new(),
322 );
323
324 sleep(3 * epoch_prune_period).await;
327
328 let epoch_dirs = read_epoch_directories(&base_directory);
330 assert_eq!(epoch_dirs.len(), 4);
331
332 pruner.prune(100).await;
334
335 sleep(2 * epoch_prune_period).await;
338
339 let epoch_dirs = read_epoch_directories(&base_directory);
340 assert_eq!(epoch_dirs.len(), 2);
341 assert_eq!(epoch_dirs[0], 99);
342 assert_eq!(epoch_dirs[1], 100);
343 }
344
345 fn create_epoch_directories(base_directory: &std::path::Path, epochs: Vec<&str>) {
346 for epoch in epochs {
347 let mut path = base_directory.to_path_buf();
348 path.push(epoch);
349 fs::create_dir(path).unwrap();
350 }
351 }
352
353 fn read_epoch_directories(base_directory: &std::path::Path) -> Vec<u64> {
354 let files = fs::read_dir(base_directory).unwrap();
355
356 let mut epochs = Vec::new();
357 for file_res in files {
358 let file_epoch_string = file_res.unwrap().file_name().to_str().unwrap().to_owned();
359 if let Ok(file_epoch) = file_epoch_string.parse::<u64>() {
360 epochs.push(file_epoch);
361 }
362 }
363
364 epochs.sort();
365 epochs
366 }
367}