iota_tool/db_tool/
db_dump.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    path::PathBuf,
8    str,
9    sync::Arc,
10};
11
12use anyhow::{Ok, anyhow};
13use clap::{Parser, ValueEnum};
14use comfy_table::{Cell, ContentArrangement, Row, Table};
15use iota_archival::reader::ArchiveReaderBalancer;
16use iota_config::node::AuthorityStorePruningConfig;
17use iota_core::{
18    authority::{
19        authority_per_epoch_store::AuthorityEpochTables,
20        authority_store_pruner::{
21            AuthorityStorePruner, AuthorityStorePruningMetrics, EPOCH_DURATION_MS_FOR_TESTING,
22        },
23        authority_store_tables::AuthorityPerpetualTables,
24        authority_store_types::{StoreData, StoreObject},
25    },
26    checkpoints::CheckpointStore,
27    epoch::committee_store::CommitteeStoreTables,
28    rest_index::RestIndexStore,
29};
30use iota_storage::{IndexStoreTables, mutex_table::RwLockTable};
31use iota_types::base_types::{EpochId, ObjectID};
32use prometheus::Registry;
33use strum_macros::EnumString;
34use tracing::info;
35use typed_store::{
36    rocks::{MetricConf, default_db_options},
37    rocksdb::MultiThreaded,
38    traits::{Map, TableSummary},
39};
40
41#[derive(EnumString, Clone, Parser, Debug, ValueEnum)]
42pub enum StoreName {
43    Validator,
44    Index,
45    Epoch,
46}
47impl std::fmt::Display for StoreName {
48    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
49        write!(f, "{:?}", self)
50    }
51}
52
53pub fn list_tables(path: PathBuf) -> anyhow::Result<Vec<String>> {
54    typed_store::rocksdb::DBWithThreadMode::<MultiThreaded>::list_cf(
55        &default_db_options().options,
56        path,
57    )
58    .map_err(|e| e.into())
59    .map(|q| {
60        q.iter()
61            .filter_map(|s| {
62                // The `default` table is not used
63                if s != "default" {
64                    Some(s.clone())
65                } else {
66                    None
67                }
68            })
69            .collect()
70    })
71}
72
73pub fn table_summary(
74    store_name: StoreName,
75    epoch: Option<EpochId>,
76    db_path: PathBuf,
77    table_name: &str,
78) -> anyhow::Result<TableSummary> {
79    match store_name {
80        StoreName::Validator => {
81            let epoch_tables = AuthorityEpochTables::describe_tables();
82            if epoch_tables.contains_key(table_name) {
83                let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
84                AuthorityEpochTables::open_readonly(epoch, &db_path).table_summary(table_name)
85            } else {
86                AuthorityPerpetualTables::open_readonly(&db_path).table_summary(table_name)
87            }
88        }
89        StoreName::Index => {
90            IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
91                .table_summary(table_name)
92        }
93        StoreName::Epoch => {
94            CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
95                .table_summary(table_name)
96        }
97    }
98    .map_err(|err| anyhow!(err.to_string()))
99}
100
101pub fn print_table_metadata(
102    store_name: StoreName,
103    epoch: Option<EpochId>,
104    db_path: PathBuf,
105    table_name: &str,
106) -> anyhow::Result<()> {
107    let db = match store_name {
108        StoreName::Validator => {
109            let epoch_tables = AuthorityEpochTables::describe_tables();
110            if epoch_tables.contains_key(table_name) {
111                let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
112                AuthorityEpochTables::open_readonly(epoch, &db_path)
113                    .next_shared_object_versions
114                    .rocksdb
115            } else {
116                AuthorityPerpetualTables::open_readonly(&db_path)
117                    .objects
118                    .rocksdb
119            }
120        }
121        StoreName::Index => {
122            IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
123                .event_by_move_module
124                .rocksdb
125        }
126        StoreName::Epoch => {
127            CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
128                .committee_map
129                .rocksdb
130        }
131    };
132
133    let mut table = Table::new();
134    table
135        .set_content_arrangement(ContentArrangement::Dynamic)
136        .set_width(200)
137        .set_header(vec![
138            "name",
139            "level",
140            "num_entries",
141            "start_key",
142            "end_key",
143            "num_deletions",
144            "file_size",
145        ]);
146
147    for file in db.live_files()?.iter() {
148        if file.column_family_name != table_name {
149            continue;
150        }
151        let mut row = Row::new();
152        row.add_cell(Cell::new(&file.name));
153        row.add_cell(Cell::new(file.level));
154        row.add_cell(Cell::new(file.num_entries));
155        row.add_cell(Cell::new(hex::encode(
156            file.start_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
157        )));
158        row.add_cell(Cell::new(hex::encode(
159            file.end_key.as_ref().unwrap_or(&"".as_bytes().to_vec()),
160        )));
161        row.add_cell(Cell::new(file.num_deletions));
162        row.add_cell(Cell::new(file.size));
163        table.add_row(row);
164    }
165
166    eprintln!("{}", table);
167    Ok(())
168}
169
170pub fn duplicate_objects_summary(db_path: PathBuf) -> (usize, usize, usize, usize) {
171    let perpetual_tables = AuthorityPerpetualTables::open_readonly(&db_path);
172    let iter = perpetual_tables.objects.unbounded_iter();
173    let mut total_count = 0;
174    let mut duplicate_count = 0;
175    let mut total_bytes = 0;
176    let mut duplicated_bytes = 0;
177
178    let mut object_id: ObjectID = ObjectID::random();
179    let mut data: HashMap<Vec<u8>, usize> = HashMap::new();
180
181    for (key, value) in iter {
182        if let StoreObject::Value(store_object) = value.migrate().into_inner() {
183            if let StoreData::Move(object) = store_object.data {
184                if object_id != key.0 {
185                    for (k, cnt) in data.iter() {
186                        total_bytes += k.len() * cnt;
187                        duplicated_bytes += k.len() * (cnt - 1);
188                        total_count += cnt;
189                        duplicate_count += cnt - 1;
190                    }
191                    object_id = key.0;
192                    data.clear();
193                }
194                *data.entry(object.contents().to_vec()).or_default() += 1;
195            }
196        }
197    }
198    (total_count, duplicate_count, total_bytes, duplicated_bytes)
199}
200
201pub fn compact(db_path: PathBuf) -> anyhow::Result<()> {
202    let perpetual = Arc::new(AuthorityPerpetualTables::open(&db_path, None));
203    AuthorityStorePruner::compact(&perpetual)?;
204    Ok(())
205}
206
207pub async fn prune_objects(db_path: PathBuf) -> anyhow::Result<()> {
208    let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
209    let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
210        db_path.join("checkpoints"),
211        MetricConf::default(),
212        None,
213        None,
214    ));
215    let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
216    let highest_pruned_checkpoint = checkpoint_store.get_highest_pruned_checkpoint_seq_number()?;
217    let latest_checkpoint = checkpoint_store.get_highest_executed_checkpoint()?;
218    info!(
219        "Latest executed checkpoint sequence num: {}",
220        latest_checkpoint.map(|x| x.sequence_number).unwrap_or(0)
221    );
222    info!("Highest pruned checkpoint: {}", highest_pruned_checkpoint);
223    let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
224    let lock_table = Arc::new(RwLockTable::new(1));
225    info!("Pruning setup for db at path: {:?}", db_path.display());
226    let pruning_config = AuthorityStorePruningConfig {
227        num_epochs_to_retain: 0,
228        ..Default::default()
229    };
230    info!("Starting object pruning");
231    AuthorityStorePruner::prune_objects_for_eligible_epochs(
232        &perpetual_db,
233        &checkpoint_store,
234        Some(&rest_index),
235        &lock_table,
236        pruning_config,
237        metrics,
238        usize::MAX,
239        EPOCH_DURATION_MS_FOR_TESTING,
240    )
241    .await?;
242    Ok(())
243}
244
245pub async fn prune_checkpoints(db_path: PathBuf) -> anyhow::Result<()> {
246    let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&db_path.join("store"), None));
247    let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
248        db_path.join("checkpoints"),
249        MetricConf::default(),
250        None,
251        None,
252    ));
253    let rest_index = RestIndexStore::new_without_init(db_path.join("rest_index"));
254    let metrics = AuthorityStorePruningMetrics::new(&Registry::default());
255    let lock_table = Arc::new(RwLockTable::new(1));
256    info!("Pruning setup for db at path: {:?}", db_path.display());
257    let pruning_config = AuthorityStorePruningConfig {
258        num_epochs_to_retain_for_checkpoints: Some(1),
259        ..Default::default()
260    };
261    info!("Starting txns and effects pruning");
262    let archive_readers = ArchiveReaderBalancer::default();
263    AuthorityStorePruner::prune_checkpoints_for_eligible_epochs(
264        &perpetual_db,
265        &checkpoint_store,
266        Some(&rest_index),
267        &lock_table,
268        pruning_config,
269        metrics,
270        usize::MAX,
271        archive_readers,
272        EPOCH_DURATION_MS_FOR_TESTING,
273    )
274    .await?;
275    Ok(())
276}
277
278// TODO: condense this using macro or trait dyn skills
279pub fn dump_table(
280    store_name: StoreName,
281    epoch: Option<EpochId>,
282    db_path: PathBuf,
283    table_name: &str,
284    page_size: u16,
285    page_number: usize,
286) -> anyhow::Result<BTreeMap<String, String>> {
287    match store_name {
288        StoreName::Validator => {
289            let epoch_tables = AuthorityEpochTables::describe_tables();
290            if epoch_tables.contains_key(table_name) {
291                let epoch = epoch.ok_or_else(|| anyhow!("--epoch is required"))?;
292                AuthorityEpochTables::open_readonly(epoch, &db_path).dump(
293                    table_name,
294                    page_size,
295                    page_number,
296                )
297            } else {
298                AuthorityPerpetualTables::open_readonly(&db_path).dump(
299                    table_name,
300                    page_size,
301                    page_number,
302                )
303            }
304        }
305        StoreName::Index => {
306            IndexStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default()).dump(
307                table_name,
308                page_size,
309                page_number,
310            )
311        }
312        StoreName::Epoch => {
313            CommitteeStoreTables::get_read_only_handle(db_path, None, None, MetricConf::default())
314                .dump(table_name, page_size, page_number)
315        }
316    }
317    .map_err(|err| anyhow!(err.to_string()))
318}
319
320#[cfg(test)]
321mod test {
322    use iota_core::authority::{
323        authority_per_epoch_store::AuthorityEpochTables,
324        authority_store_tables::AuthorityPerpetualTables,
325    };
326
327    use crate::db_tool::db_dump::{StoreName, dump_table, list_tables};
328
329    #[tokio::test]
330    async fn db_dump_population() -> Result<(), anyhow::Error> {
331        let primary_path = tempfile::tempdir()?.into_path();
332
333        // Open the DB for writing
334        let _: AuthorityEpochTables = AuthorityEpochTables::open(0, &primary_path, None);
335        let _: AuthorityPerpetualTables = AuthorityPerpetualTables::open(&primary_path, None);
336
337        // Get all the tables for AuthorityEpochTables
338        let tables = {
339            let mut epoch_tables =
340                list_tables(AuthorityEpochTables::path(0, &primary_path)).unwrap();
341            let mut perpetual_tables =
342                list_tables(AuthorityPerpetualTables::path(&primary_path)).unwrap();
343            epoch_tables.append(&mut perpetual_tables);
344            epoch_tables
345        };
346
347        let mut missing_tables = vec![];
348        for t in tables {
349            println!("{}", t);
350            if dump_table(
351                StoreName::Validator,
352                Some(0),
353                primary_path.clone(),
354                &t,
355                0,
356                0,
357            )
358            .is_err()
359            {
360                missing_tables.push(t);
361            }
362        }
363        if missing_tables.is_empty() {
364            return Ok(());
365        }
366        panic!(
367            "{}",
368            format!(
369                "Missing {} table(s) from DB dump registration function: {:?} \n Update the dump function.",
370                missing_tables.len(),
371                missing_tables
372            )
373        );
374    }
375}