1use std::{
6 collections::{BTreeMap, HashMap},
7 path::PathBuf,
8 str,
9 sync::Arc,
10};
11
12use anyhow::{Ok, anyhow};
13use clap::{Parser, ValueEnum};
14use comfy_table::{Cell, ContentArrangement, Row, Table};
15use iota_archival::reader::ArchiveReaderBalancer;
16use iota_config::node::AuthorityStorePruningConfig;
17use iota_core::{
18 authority::{
19 authority_per_epoch_store::AuthorityEpochTables,
20 authority_store_pruner::{
21 AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
22 },
23 authority_store_tables::AuthorityPerpetualTables,
24 authority_store_types::{StoreData, StoreObject},
25 },
26 checkpoints::CheckpointStore,
27 epoch::committee_store::CommitteeStoreTables,
28 rest_index::RestIndexStore,
29};
30use iota_storage::{IndexStoreTables, mutex_table::RwLockTable};
31use iota_types::base_types::{EpochId, ObjectID};
32use prometheus::Registry;
33use strum_macros::EnumString;
34use tracing::info;
35use typed_store::{
36 rocks::{MetricConf, default_db_options},
37 rocksdb::MultiThreaded,
38 traits::{Map, TableSummary},
39};
40
41#[derive(EnumString, Clone, Parser, Debug, ValueEnum)]
42pub enum StoreName {
43 Validator,
44 Index,
45 Epoch,
46}
47impl std::fmt::Display for StoreName {
48 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
49 write!(f, "{:?}", self)
50 }
51}
52
53pub fn list_tables(path: PathBuf) -> anyhow::Result<Vec<String>> {
54 typed_store::rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(
55 &default_db_options().options,
56 path,
57 )
58 .map_err(|e| e.into())
59 .map(|q| {
60 q.iter()
61 .filter_map(|s| {
62 if s != "default" {
64 Some(s.clone())
65 } else {
66 None
67 }
68 })
69 .collect()
70 })
71}
72
73pub fn table_summary(
74 store_name: StoreName,
75 epoch: Option<EpochId>,
76 db_path: PathBuf,
77 table_name: &str,
78) -> anyhow::Result<TableSummary> {
79 match store_name {
80 StoreName::Validator => {
81 let epoch_tables = AuthorityEpochTables::describe_tables();
82 if epoch_tables.contains_key(table_name) {
83 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
84 AuthorityEpochTables::open_readonly(epoch, &db_path).table_summary(table_name)
85 } else {
86 AuthorityPerpetualTables::open_readonly(&db_path).table_summary(table_name)
87 }
88 }
89 StoreName::Index => {
90 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
91 .table_summary(table_name)
92 }
93 StoreName::Epoch => {
94 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
95 .table_summary(table_name)
96 }
97 }
98 .map_err(|err| anyhow!(err.to_string()))
99}
100
101pub fn print_table_metadata(
102 store_name: StoreName,
103 epoch: Option<EpochId>,
104 db_path: PathBuf,
105 table_name: &str,
106) -> anyhow::Result<()> {
107 let db = match store_name {
108 StoreName::Validator => {
109 let epoch_tables = AuthorityEpochTables::describe_tables();
110 if epoch_tables.contains_key(table_name) {
111 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
112 AuthorityEpochTables::open_readonly(epoch, &db_path)
113 .next_shared_object_versions
114 .rocksdb
115 } else {
116 AuthorityPerpetualTables::open_readonly(&db_path)
117 .objects
118 .rocksdb
119 }
120 }
121 StoreName::Index => {
122 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
123 .event_by_move_module
124 .rocksdb
125 }
126 StoreName::Epoch => {
127 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
128 .committee_map
129 .rocksdb
130 }
131 };
132
133 let mut table = Table::new();
134 table
135 .set_content_arrangement(ContentArrangement::Dynamic)
136 .set_width(200)
137 .set_header(vec![
138 "name",
139 "level",
140 "num_entries",
141 "start_key",
142 "end_key",
143 "num_deletions",
144 "file_size",
145 ]);
146
147 for file in db.live_files()?.iter() {
148 if file.column_family_name != table_name {
149 continue;
150 }
151 let mut row = Row::new();
152 row.add_cell(Cell::new(&file.name));
153 row.add_cell(Cell::new(file.level));
154 row.add_cell(Cell::new(file.num_entries));
155 row.add_cell(Cell::new(hex::encode(
156 file.start_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
157 )));
158 row.add_cell(Cell::new(hex::encode(
159 file.end_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
160 )));
161 row.add_cell(Cell::new(file.num_deletions));
162 row.add_cell(Cell::new(file.size));
163 table.add_row(row);
164 }
165
166 eprintln!("{}", table);
167 Ok(())
168}
169
170pub fn duplicate_objects_summary(db_path: PathBuf) -> (usize, usize, usize, usize) {
171 let perpetual_tables = AuthorityPerpetualTables::open_readonly(&db_path);
172 let iter = perpetual_tables.objects.unbounded_iter();
173 let mut total_count = 0;
174 let mut duplicate_count = 0;
175 let mut total_bytes = 0;
176 let mut duplicated_bytes = 0;
177
178 let mut object_id: ObjectID = ObjectID::random();
179 let mut data: HashMap<Vec<u8>, usize> = HashMap::new();
180
181 for (key, value) in iter {
182 if let StoreObject::Value(store_object) = value.migrate().into_inner() {
183 if let StoreData::Move(object) = store_object.data {
184 if object_id != key.0 {
185 for (k, cnt) in data.iter() {
186 total_bytes += k.len() * cnt;
187 duplicated_bytes += k.len() * (cnt - 1);
188 total_count += cnt;
189 duplicate_count += cnt - 1;
190 }
191 object_id = key.0;
192 data.clear();
193 }
194 *data.entry(object.contents().to_vec()).or_default() += 1;
195 }
196 }
197 }
198 (total_count, duplicate_count, total_bytes, duplicated_bytes)
199}
200
201pub fn compact(db_path: PathBuf) -> anyhow::Result<()> {
202 let perpetual = Arc::new(AuthorityPerpetualTables::open(&db_path, None));
203 AuthorityStorePruner::compact(&perpetual)?;
204 Ok(())
205}
206
207pub async fn prune_objects(db_path: PathBuf) -> anyhow::Result<()> {
208 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
209 let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
210 db_path.join("checkpoints"),
211 MetricConf::default(),
212 None,
213 None,
214 ));
215 let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
216 let highest_pruned_checkpoint = checkpoint_store.get_highest_pruned_checkpoint_seq_number()?;
217 let latest_checkpoint = checkpoint_store.get_highest_executed_checkpoint()?;
218 info!(
219 "Latest executed checkpoint sequence num: {}",
220 latest_checkpoint.map(|x| x.sequence_number).unwrap_or(0)
221 );
222 info!("Highest pruned checkpoint: {}", highest_pruned_checkpoint);
223 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
224 let lock_table = Arc::new(RwLockTable::new(1));
225 info!("Pruning setup for db at path: {:?}", db_path.display());
226 let pruning_config = AuthorityStorePruningConfig {
227 num_epochs_to_retain: 0,
228 ..Default::default()
229 };
230 info!("Starting object pruning");
231 AuthorityStorePruner::prune_objects_for_eligible_epochs(
232 &perpetual_db,
233 &checkpoint_store,
234 Some(&rest_index),
235 &lock_table,
236 pruning_config,
237 metrics,
238 usize::MAX,
239 EPOCH_DURATION_MS_FOR_TESTING,
240 )
241 .await?;
242 Ok(())
243}
244
245pub async fn prune_checkpoints(db_path: PathBuf) -> anyhow::Result<()> {
246 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
247 let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
248 db_path.join("checkpoints"),
249 MetricConf::default(),
250 None,
251 None,
252 ));
253 let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
254 let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
255 let lock_table = Arc::new(RwLockTable::new(1));
256 info!("Pruning setup for db at path: {:?}", db_path.display());
257 let pruning_config = AuthorityStorePruningConfig {
258 num_epochs_to_retain_for_checkpoints: Some(1),
259 ..Default::default()
260 };
261 info!("Starting txns and effects pruning");
262 let archive_readers = ArchiveReaderBalancer::default();
263 AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
264 &perpetual_db,
265 &checkpoint_store,
266 Some(&rest_index),
267 &lock_table,
268 pruning_config,
269 metrics,
270 usize::MAX,
271 archive_readers,
272 EPOCH_DURATION_MS_FOR_TESTING,
273 )
274 .await?;
275 Ok(())
276}
277
278pub fn dump_table(
280 store_name: StoreName,
281 epoch: Option<EpochId>,
282 db_path: PathBuf,
283 table_name: &str,
284 page_size: u16,
285 page_number: usize,
286) -> anyhow::Result<BTreeMap<String, String>> {
287 match store_name {
288 StoreName::Validator => {
289 let epoch_tables = AuthorityEpochTables::describe_tables();
290 if epoch_tables.contains_key(table_name) {
291 let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
292 AuthorityEpochTables::open_readonly(epoch, &db_path).dump(
293 table_name,
294 page_size,
295 page_number,
296 )
297 } else {
298 AuthorityPerpetualTables::open_readonly(&db_path).dump(
299 table_name,
300 page_size,
301 page_number,
302 )
303 }
304 }
305 StoreName::Index => {
306 IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default()).dump(
307 table_name,
308 page_size,
309 page_number,
310 )
311 }
312 StoreName::Epoch => {
313 CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
314 .dump(table_name, page_size, page_number)
315 }
316 }
317 .map_err(|err| anyhow!(err.to_string()))
318}
319
320#[cfg(test)]
321mod test {
322 use iota_core::authority::{
323 authority_per_epoch_store::AuthorityEpochTables,
324 authority_store_tables::AuthorityPerpetualTables,
325 };
326
327 use crate::db_tool::db_dump::{StoreName, dump_table, list_tables};
328
329 #[tokio::test]
330 async fn db_dump_population() -> Result<(), anyhow::Error> {
331 let primary_path = tempfile::tempdir()?.into_path();
332
333 let _: AuthorityEpochTables = AuthorityEpochTables::open(0, &primary_path, None);
335 let _: AuthorityPerpetualTables = AuthorityPerpetualTables::open(&primary_path, None);
336
337 let tables = {
339 let mut epoch_tables =
340 list_tables(AuthorityEpochTables::path(0, &primary_path)).unwrap();
341 let mut perpetual_tables =
342 list_tables(AuthorityPerpetualTables::path(&primary_path)).unwrap();
343 epoch_tables.append(&mut perpetual_tables);
344 epoch_tables
345 };
346
347 let mut missing_tables = vec![];
348 for t in tables {
349 println!("{}", t);
350 if dump_table(
351 StoreName::Validator,
352 Some(0),
353 primary_path.clone(),
354 &t,
355 0,
356 0,
357 )
358 .is_err()
359 {
360 missing_tables.push(t);
361 }
362 }
363 if missing_tables.is_empty() {
364 return Ok(());
365 }
366 panic!(
367 "{}",
368 format!(
369 "Missing {} table(s) from DB dump registration function: {:?} \n Update the dump function.",
370 missing_tables.len(),
371 missing_tables
372 )
373 );
374 }
375}