iota_tool/db_tool/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::{Path, PathBuf};
6
7use anyhow::{anyhow, bail};
8use clap::Parser;
9use iota_core::{
10    authority::{
11        authority_per_epoch_store::AuthorityEpochTables,
12        authority_store_tables::AuthorityPerpetualTables,
13    },
14    checkpoints::CheckpointStore,
15};
16use iota_types::{
17    base_types::{EpochId, ObjectID},
18    digests::{CheckpointContentsDigest, TransactionDigest},
19    effects::TransactionEffectsAPI,
20    messages_checkpoint::{CheckpointDigest, CheckpointSequenceNumber},
21    storage::ObjectStore,
22};
23use typed_store::rocks::MetricConf;
24
25use self::{
26    db_dump::{StoreName, dump_table, duplicate_objects_summary, list_tables, table_summary},
27    index_search::{SearchRange, search_index},
28};
29use crate::db_tool::db_dump::{compact, print_table_metadata, prune_checkpoints, prune_objects};
30pub mod db_dump;
31mod index_search;
32
33#[derive(Parser)]
34pub enum DbToolCommand {
35    ListTables,
36    Dump(Options),
37    IndexSearchKeyRange(IndexSearchKeyRangeOptions),
38    IndexSearchCount(IndexSearchCountOptions),
39    TableSummary(Options),
40    DuplicatesSummary,
41    ListDBMetadata(Options),
42    PrintLastConsensusIndex,
43    PrintTransaction(PrintTransactionOptions),
44    PrintObject(PrintObjectOptions),
45    PrintCheckpoint(PrintCheckpointOptions),
46    PrintCheckpointContent(PrintCheckpointContentOptions),
47    ResetDB,
48    RewindCheckpointExecution(RewindCheckpointExecutionOptions),
49    Compact,
50    PruneObjects,
51    PruneCheckpoints,
52    SetCheckpointWatermark(SetCheckpointWatermarkOptions),
53}
54
55#[derive(Parser)]
56pub struct IndexSearchKeyRangeOptions {
57    #[arg(long, short = 't')]
58    table_name: String,
59    #[arg(long, short = 's')]
60    start: String,
61    #[arg(long = "end", short = 'e')]
62    end_key: String,
63}
64
65#[derive(Parser)]
66pub struct IndexSearchCountOptions {
67    #[arg(long, short = 't')]
68    table_name: String,
69    #[arg(long, short = 's')]
70    start: String,
71    #[arg(long, short = 'c')]
72    count: u64,
73}
74
75#[derive(Parser)]
76pub struct Options {
77    /// The type of store to dump
78    #[arg(long = "store", short = 's', value_enum)]
79    store_name: StoreName,
80    /// The name of the table to dump
81    #[arg(long, short = 't')]
82    table_name: String,
83    /// The size of page to dump. This is a u16
84    #[arg(long, short = 'p')]
85    page_size: u16,
86    /// The page number to dump
87    #[arg(long = "page-num", short = 'n')]
88    page_number: usize,
89
90    // TODO: We should load this automatically from the system object in AuthorityPerpetualTables.
91    // This is very difficult to do right now because you can't share code between
92    // AuthorityPerpetualTables and AuthorityEpochTablesReadonly.
93    /// The epoch to use when loading AuthorityEpochTables.
94    #[arg(long, short = 'e')]
95    epoch: Option<EpochId>,
96}
97
98#[derive(Parser)]
99pub struct PrintTransactionOptions {
100    #[arg(long, help = "The transaction digest to print")]
101    digest: TransactionDigest,
102}
103
104#[derive(Parser)]
105pub struct PrintObjectOptions {
106    #[arg(long, help = "The object id to print")]
107    id: ObjectID,
108    #[arg(long, help = "The object version to print")]
109    version: Option<u64>,
110}
111
112#[derive(Parser)]
113pub struct PrintCheckpointOptions {
114    #[arg(long, help = "The checkpoint digest to print")]
115    digest: CheckpointDigest,
116}
117
118#[derive(Parser)]
119pub struct PrintCheckpointContentOptions {
120    #[arg(
121        long,
122        help = "The checkpoint content digest (NOT the checkpoint digest)"
123    )]
124    digest: CheckpointContentsDigest,
125}
126
127#[derive(Parser)]
128pub struct RemoveTransactionOptions {
129    #[arg(long, help = "The transaction digest to remove")]
130    digest: TransactionDigest,
131
132    #[arg(long)]
133    confirm: bool,
134
135    /// The epoch to use when loading AuthorityEpochTables.
136    /// Defaults to the current epoch.
137    #[arg(long, short = 'e')]
138    epoch: Option<EpochId>,
139}
140
141#[derive(Parser)]
142pub struct RemoveObjectLockOptions {
143    #[arg(long, help = "The object ID to remove")]
144    id: ObjectID,
145
146    #[arg(long, help = "The object version to remove")]
147    version: u64,
148
149    #[arg(long)]
150    confirm: bool,
151}
152
153#[derive(Parser)]
154pub struct RewindCheckpointExecutionOptions {
155    #[arg(long)]
156    epoch: EpochId,
157
158    #[arg(long)]
159    checkpoint_sequence_number: u64,
160}
161
162#[derive(Parser)]
163pub struct SetCheckpointWatermarkOptions {
164    #[arg(long)]
165    highest_verified: Option<CheckpointSequenceNumber>,
166
167    #[arg(long)]
168    highest_synced: Option<CheckpointSequenceNumber>,
169}
170
171pub async fn execute_db_tool_command(db_path: PathBuf, cmd: DbToolCommand) -> anyhow::Result<()> {
172    match cmd {
173        DbToolCommand::ListTables => print_db_all_tables(db_path),
174        DbToolCommand::Dump(d) => print_all_entries(
175            d.store_name,
176            d.epoch,
177            db_path,
178            &d.table_name,
179            d.page_size,
180            d.page_number,
181        ),
182        DbToolCommand::TableSummary(d) => {
183            print_db_table_summary(d.store_name, d.epoch, db_path, &d.table_name)
184        }
185        DbToolCommand::DuplicatesSummary => print_db_duplicates_summary(db_path),
186        DbToolCommand::ListDBMetadata(d) => {
187            print_table_metadata(d.store_name, d.epoch, db_path, &d.table_name)
188        }
189        DbToolCommand::PrintLastConsensusIndex => print_last_consensus_index(&db_path),
190        DbToolCommand::PrintTransaction(d) => print_transaction(&db_path, d),
191        DbToolCommand::PrintObject(o) => print_object(&db_path, o),
192        DbToolCommand::PrintCheckpoint(d) => print_checkpoint(&db_path, d),
193        DbToolCommand::PrintCheckpointContent(d) => print_checkpoint_content(&db_path, d),
194        DbToolCommand::ResetDB => reset_db_to_genesis(&db_path),
195        DbToolCommand::RewindCheckpointExecution(d) => {
196            rewind_checkpoint_execution(&db_path, d.epoch, d.checkpoint_sequence_number)
197        }
198        DbToolCommand::Compact => compact(db_path),
199        DbToolCommand::PruneObjects => prune_objects(db_path).await,
200        DbToolCommand::PruneCheckpoints => prune_checkpoints(db_path).await,
201        DbToolCommand::IndexSearchKeyRange(rg) => {
202            let res = search_index(
203                db_path,
204                rg.table_name,
205                rg.start,
206                SearchRange::ExclusiveLastKey(rg.end_key),
207            )?;
208            for (k, v) in res {
209                println!("{k}: {v}");
210            }
211            Ok(())
212        }
213        DbToolCommand::IndexSearchCount(sc) => {
214            let res = search_index(
215                db_path,
216                sc.table_name,
217                sc.start,
218                SearchRange::Count(sc.count),
219            )?;
220            for (k, v) in res {
221                println!("{k}: {v}");
222            }
223            Ok(())
224        }
225        DbToolCommand::SetCheckpointWatermark(d) => set_checkpoint_watermark(&db_path, d),
226    }
227}
228
229pub fn print_db_all_tables(db_path: PathBuf) -> anyhow::Result<()> {
230    list_tables(db_path)?.iter().for_each(|t| println!("{t}"));
231    Ok(())
232}
233
234pub fn print_db_duplicates_summary(db_path: PathBuf) -> anyhow::Result<()> {
235    let (total_count, duplicate_count, total_bytes, duplicated_bytes) =
236        duplicate_objects_summary(db_path);
237    println!(
238        "Total objects = {total_count}, duplicated objects = {duplicate_count}, total bytes = {total_bytes}, duplicated bytes = {duplicated_bytes}"
239    );
240    Ok(())
241}
242
243pub fn print_last_consensus_index(path: &Path) -> anyhow::Result<()> {
244    let epoch_tables = AuthorityEpochTables::open_tables_read_write(
245        path.to_path_buf(),
246        MetricConf::default(),
247        None,
248        None,
249    );
250    let last_index = epoch_tables.get_last_consensus_index()?;
251    println!("Last consensus index is {last_index:?}");
252    Ok(())
253}
254
255pub fn print_transaction(path: &Path, opt: PrintTransactionOptions) -> anyhow::Result<()> {
256    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None);
257    if let Some((epoch, checkpoint_seq_num)) =
258        perpetual_db.get_checkpoint_sequence_number(&opt.digest)?
259    {
260        println!(
261            "Transaction {:?} executed in epoch {} checkpoint {}",
262            opt.digest, epoch, checkpoint_seq_num
263        );
264    };
265    if let Some(effects) = perpetual_db.get_effects(&opt.digest)? {
266        println!(
267            "Transaction {:?} dependencies: {:#?}",
268            opt.digest,
269            effects.dependencies(),
270        );
271    };
272    Ok(())
273}
274
275pub fn print_object(path: &Path, opt: PrintObjectOptions) -> anyhow::Result<()> {
276    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None);
277
278    let obj = if let Some(version) = opt.version {
279        perpetual_db.get_object_by_key(&opt.id, version.into())?
280    } else {
281        perpetual_db.get_object(&opt.id)?
282    };
283
284    if let Some(obj) = obj {
285        println!("Object {:?}:\n{:#?}", opt.id, obj);
286    } else {
287        println!("Object {:?} not found", opt.id);
288    }
289
290    Ok(())
291}
292
293pub fn print_checkpoint(path: &Path, opt: PrintCheckpointOptions) -> anyhow::Result<()> {
294    let checkpoint_store = CheckpointStore::new(&path.join("checkpoints"));
295    let checkpoint = checkpoint_store
296        .get_checkpoint_by_digest(&opt.digest)?
297        .ok_or(anyhow!(
298            "Checkpoint digest {:?} not found in checkpoint store",
299            opt.digest
300        ))?;
301    println!("Checkpoint: {checkpoint:?}");
302    drop(checkpoint_store);
303    print_checkpoint_content(
304        path,
305        PrintCheckpointContentOptions {
306            digest: checkpoint.content_digest,
307        },
308    )
309}
310
311pub fn print_checkpoint_content(
312    path: &Path,
313    opt: PrintCheckpointContentOptions,
314) -> anyhow::Result<()> {
315    let checkpoint_store = CheckpointStore::new(&path.join("checkpoints"));
316    let contents = checkpoint_store
317        .get_checkpoint_contents(&opt.digest)?
318        .ok_or(anyhow!(
319            "Checkpoint content digest {:?} not found in checkpoint store",
320            opt.digest
321        ))?;
322    println!("Checkpoint content: {contents:?}");
323    Ok(())
324}
325
326pub fn reset_db_to_genesis(path: &Path) -> anyhow::Result<()> {
327    // Follow the below steps to test:
328    //
329    // Get a db snapshot. Either generate one by running stress locally and enabling
330    // db checkpoints or download one from S3 bucket (pretty big in size though).
331    // Download the snapshot for the epoch you want to restore to the local disk.
332    // You will find one snapshot per epoch in the S3 bucket. We need to place the
333    // snapshot in the dir where config is pointing to. If db-config in
334    // fullnode.yaml is /opt/iota/db/authorities_db and we want to restore from
335    // epoch 10, we want to copy the snapshot to /opt/iota/db/authorities_dblike
336    // this: aws s3 cp s3://myBucket/dir /opt/iota/db/authorities_db/
337    // --recursive —exclude “*” —include “epoch_10*” Mark downloaded snapshot as
338    // live: mv /opt/iota/db/authorities_db/epoch_10
339    // /opt/iota/db/authorities_db/live Reset the downloaded db to execute from
340    // genesis with: cargo run --package iota-tool -- db-tool --db-path
341    // /opt/iota/db/authorities_db/live reset-db Start the iota full node: cargo
342    // run --release --bin iota-node -- --config-path ~/db_checkpoints/fullnode.yaml
343    // A sample fullnode.yaml config would be:
344    // ---
345    // db-path:  /opt/iota/db/authorities_db
346    // network-address: /ip4/0.0.0.0/tcp/8080/http
347    // json-rpc-address: "0.0.0.0:9000"
348    // websocket-address: "0.0.0.0:9001"
349    // metrics-address: "0.0.0.0:9184"
350    // admin-interface-address: "127.0.0.1:1337"
351    // grpc-load-shed: ~
352    // grpc-concurrency-limit: ~
353    // p2p-config:
354    //   listen-address: "0.0.0.0:8084"
355    // genesis:
356    //   genesis-file-location:  <path to genesis blob for the network>
357    // authority-store-pruning-config:
358    //   num-latest-epoch-dbs-to-retain: 3
359    //   epoch-db-pruning-period-secs: 3600
360    //   num-epochs-to-retain: 18446744073709551615
361    //   max-checkpoints-in-batch: 10
362    //   max-transactions-in-batch: 1000
363    let perpetual_db = AuthorityPerpetualTables::open_tables_read_write(
364        path.join("store").join("perpetual"),
365        MetricConf::default(),
366        None,
367        None,
368    );
369    perpetual_db.reset_db_for_execution_since_genesis()?;
370
371    let checkpoint_db = CheckpointStore::open_tables_read_write(
372        path.join("checkpoints"),
373        MetricConf::default(),
374        None,
375        None,
376    );
377    checkpoint_db.reset_db_for_execution_since_genesis()?;
378
379    let epoch_db = AuthorityEpochTables::open_tables_read_write(
380        path.join("store"),
381        MetricConf::default(),
382        None,
383        None,
384    );
385    epoch_db.reset_db_for_execution_since_genesis()?;
386
387    Ok(())
388}
389
390/// Force sets the highest executed checkpoint.
391/// NOTE: Does not force re-execution of transactions.
392/// Run with: cargo run --package iota-tool -- db-tool --db-path
393/// /opt/iota/db/authorities_db/live rewind-checkpoint-execution --epoch 3
394/// --checkpoint-sequence-number 300000
395pub fn rewind_checkpoint_execution(
396    path: &Path,
397    epoch: EpochId,
398    checkpoint_sequence_number: u64,
399) -> anyhow::Result<()> {
400    let checkpoint_db = CheckpointStore::open_tables_read_write(
401        path.join("checkpoints"),
402        MetricConf::default(),
403        None,
404        None,
405    );
406    let Some(checkpoint) =
407        checkpoint_db.get_checkpoint_by_sequence_number(checkpoint_sequence_number)?
408    else {
409        bail!("Checkpoint {checkpoint_sequence_number} not found!");
410    };
411    if epoch != checkpoint.epoch() {
412        bail!(
413            "Checkpoint {checkpoint_sequence_number} is in epoch {} not {epoch}!",
414            checkpoint.epoch()
415        );
416    }
417    let highest_executed_sequence_number = checkpoint_db
418        .get_highest_executed_checkpoint_seq_number()?
419        .unwrap_or_default();
420    if checkpoint_sequence_number > highest_executed_sequence_number {
421        bail!(
422            "Must rewind checkpoint execution to be not later than highest executed ({} > {})!",
423            checkpoint_sequence_number,
424            highest_executed_sequence_number
425        );
426    }
427    checkpoint_db.set_highest_executed_checkpoint_subtle(&checkpoint)?;
428    Ok(())
429}
430
431pub fn print_db_table_summary(
432    store: StoreName,
433    epoch: Option<EpochId>,
434    path: PathBuf,
435    table_name: &str,
436) -> anyhow::Result<()> {
437    let summary = table_summary(store, epoch, path, table_name)?;
438    let quantiles = [25, 50, 75, 90, 99];
439    println!(
440        "Total num keys = {}, total key bytes = {}, total value bytes = {}",
441        summary.num_keys, summary.key_bytes_total, summary.value_bytes_total
442    );
443    println!("Key size distribution:\n");
444    quantiles.iter().for_each(|q| {
445        println!(
446            "p{:?} -> {:?} bytes\n",
447            q,
448            summary.key_hist.value_at_quantile(*q as f64 / 100.0)
449        );
450    });
451    println!("Value size distribution:\n");
452    quantiles.iter().for_each(|q| {
453        println!(
454            "p{:?} -> {:?} bytes\n",
455            q,
456            summary.value_hist.value_at_quantile(*q as f64 / 100.0)
457        );
458    });
459    Ok(())
460}
461
462pub fn print_all_entries(
463    store: StoreName,
464    epoch: Option<EpochId>,
465    path: PathBuf,
466    table_name: &str,
467    page_size: u16,
468    page_number: usize,
469) -> anyhow::Result<()> {
470    for (k, v) in dump_table(store, epoch, path, table_name, page_size, page_number)? {
471        println!("{k:>100?}: {v:?}");
472    }
473    Ok(())
474}
475
476/// Force sets state sync checkpoint watermarks.
477/// Run with (for example):
478/// cargo run --package iota-tool -- db-tool --db-path
479/// /opt/iota/db/authorities_db/live set_checkpoint_watermark --highest-synced
480/// 300000
481pub fn set_checkpoint_watermark(
482    path: &Path,
483    options: SetCheckpointWatermarkOptions,
484) -> anyhow::Result<()> {
485    let checkpoint_db = CheckpointStore::open_tables_read_write(
486        path.join("checkpoints"),
487        MetricConf::default(),
488        None,
489        None,
490    );
491
492    if let Some(highest_verified) = options.highest_verified {
493        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_verified)?
494        else {
495            bail!("Checkpoint {highest_verified} not found");
496        };
497        checkpoint_db.update_highest_verified_checkpoint(&checkpoint)?;
498    }
499    if let Some(highest_synced) = options.highest_synced {
500        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_synced)?
501        else {
502            bail!("Checkpoint {highest_synced} not found");
503        };
504        checkpoint_db.update_highest_synced_checkpoint(&checkpoint)?;
505    }
506    Ok(())
507}