iota_tool/db_tool/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::{Path, PathBuf};
6
7use anyhow::{anyhow, bail};
8use clap::Parser;
9use iota_core::{
10    authority::{
11        authority_per_epoch_store::AuthorityEpochTables,
12        authority_store_tables::AuthorityPerpetualTables,
13    },
14    checkpoints::CheckpointStore,
15};
16use iota_types::{
17    base_types::{EpochId, ObjectID},
18    digests::{CheckpointContentsDigest, TransactionDigest},
19    effects::TransactionEffectsAPI,
20    messages_checkpoint::{CheckpointDigest, CheckpointSequenceNumber},
21    storage::ObjectStore,
22};
23use typed_store::rocks::MetricConf;
24
25use self::{
26    db_dump::{StoreName, dump_table, duplicate_objects_summary, list_tables, table_summary},
27    index_search::{SearchRange, search_index},
28};
29use crate::db_tool::db_dump::{compact, print_table_metadata, prune_checkpoints, prune_objects};
30pub mod db_dump;
31mod index_search;
32
33#[derive(Parser)]
34pub enum DbToolCommand {
35    ListTables,
36    Dump(Options),
37    IndexSearchKeyRange(IndexSearchKeyRangeOptions),
38    IndexSearchCount(IndexSearchCountOptions),
39    TableSummary(Options),
40    DuplicatesSummary,
41    ListDBMetadata(Options),
42    PrintLastConsensusIndex,
43    PrintTransaction(PrintTransactionOptions),
44    PrintObject(PrintObjectOptions),
45    PrintCheckpoint(PrintCheckpointOptions),
46    PrintCheckpointContent(PrintCheckpointContentOptions),
47    ResetDB,
48    RewindCheckpointExecution(RewindCheckpointExecutionOptions),
49    Compact,
50    PruneObjects,
51    PruneCheckpoints,
52    SetCheckpointWatermark(SetCheckpointWatermarkOptions),
53}
54
55#[derive(Parser)]
56pub struct IndexSearchKeyRangeOptions {
57    #[arg(long, short = 't')]
58    table_name: String,
59    #[arg(long, short = 's')]
60    start: String,
61    #[arg(long = "end", short = 'e')]
62    end_key: String,
63}
64
65#[derive(Parser)]
66pub struct IndexSearchCountOptions {
67    #[arg(long, short = 't')]
68    table_name: String,
69    #[arg(long, short = 's')]
70    start: String,
71    #[arg(long, short = 'c')]
72    count: u64,
73}
74
75#[derive(Parser)]
76pub struct Options {
77    /// The type of store to dump
78    #[arg(long = "store", short = 's', value_enum)]
79    store_name: StoreName,
80    /// The name of the table to dump
81    #[arg(long, short = 't')]
82    table_name: String,
83    /// The size of page to dump. This is a u16
84    #[arg(long, short = 'p')]
85    page_size: u16,
86    /// The page number to dump
87    #[arg(long = "page-num", short = 'n')]
88    page_number: usize,
89
90    // TODO: We should load this automatically from the system object in AuthorityPerpetualTables.
91    // This is very difficult to do right now because you can't share code between
92    // AuthorityPerpetualTables and AuthorityEpochTablesReadonly.
93    /// The epoch to use when loading AuthorityEpochTables.
94    #[arg(long, short = 'e')]
95    epoch: Option<EpochId>,
96}
97
98#[derive(Parser)]
99pub struct PrintTransactionOptions {
100    #[arg(long, help = "The transaction digest to print")]
101    digest: TransactionDigest,
102}
103
104#[derive(Parser)]
105pub struct PrintObjectOptions {
106    #[arg(long, help = "The object id to print")]
107    id: ObjectID,
108    #[arg(long, help = "The object version to print")]
109    version: Option<u64>,
110}
111
112#[derive(Parser)]
113pub struct PrintCheckpointOptions {
114    #[arg(long, help = "The checkpoint digest to print")]
115    digest: CheckpointDigest,
116}
117
118#[derive(Parser)]
119pub struct PrintCheckpointContentOptions {
120    #[arg(
121        long,
122        help = "The checkpoint content digest (NOT the checkpoint digest)"
123    )]
124    digest: CheckpointContentsDigest,
125}
126
127#[derive(Parser)]
128pub struct RemoveTransactionOptions {
129    #[arg(long, help = "The transaction digest to remove")]
130    digest: TransactionDigest,
131
132    #[arg(long)]
133    confirm: bool,
134
135    /// The epoch to use when loading AuthorityEpochTables.
136    /// Defaults to the current epoch.
137    #[arg(long, short = 'e')]
138    epoch: Option<EpochId>,
139}
140
141#[derive(Parser)]
142pub struct RemoveObjectLockOptions {
143    #[arg(long, help = "The object ID to remove")]
144    id: ObjectID,
145
146    #[arg(long, help = "The object version to remove")]
147    version: u64,
148
149    #[arg(long)]
150    confirm: bool,
151}
152
153#[derive(Parser)]
154pub struct RewindCheckpointExecutionOptions {
155    #[arg(long)]
156    epoch: EpochId,
157
158    #[arg(long)]
159    checkpoint_sequence_number: u64,
160}
161
162#[derive(Parser)]
163pub struct SetCheckpointWatermarkOptions {
164    #[arg(long)]
165    highest_verified: Option<CheckpointSequenceNumber>,
166
167    #[arg(long)]
168    highest_synced: Option<CheckpointSequenceNumber>,
169}
170
171pub async fn execute_db_tool_command(db_path: PathBuf, cmd: DbToolCommand) -> anyhow::Result<()> {
172    match cmd {
173        DbToolCommand::ListTables => print_db_all_tables(db_path),
174        DbToolCommand::Dump(d) => print_all_entries(
175            d.store_name,
176            d.epoch,
177            db_path,
178            &d.table_name,
179            d.page_size,
180            d.page_number,
181        ),
182        DbToolCommand::TableSummary(d) => {
183            print_db_table_summary(d.store_name, d.epoch, db_path, &d.table_name)
184        }
185        DbToolCommand::DuplicatesSummary => print_db_duplicates_summary(db_path),
186        DbToolCommand::ListDBMetadata(d) => {
187            print_table_metadata(d.store_name, d.epoch, db_path, &d.table_name)
188        }
189        DbToolCommand::PrintLastConsensusIndex => print_last_consensus_index(&db_path),
190        DbToolCommand::PrintTransaction(d) => print_transaction(&db_path, d),
191        DbToolCommand::PrintObject(o) => print_object(&db_path, o),
192        DbToolCommand::PrintCheckpoint(d) => print_checkpoint(&db_path, d),
193        DbToolCommand::PrintCheckpointContent(d) => print_checkpoint_content(&db_path, d),
194        DbToolCommand::ResetDB => reset_db_to_genesis(&db_path),
195        DbToolCommand::RewindCheckpointExecution(d) => {
196            rewind_checkpoint_execution(&db_path, d.epoch, d.checkpoint_sequence_number)
197        }
198        DbToolCommand::Compact => compact(db_path),
199        DbToolCommand::PruneObjects => prune_objects(db_path).await,
200        DbToolCommand::PruneCheckpoints => prune_checkpoints(db_path).await,
201        DbToolCommand::IndexSearchKeyRange(rg) => {
202            let res = search_index(
203                db_path,
204                rg.table_name,
205                rg.start,
206                SearchRange::ExclusiveLastKey(rg.end_key),
207            )?;
208            for (k, v) in res {
209                println!("{}: {}", k, v);
210            }
211            Ok(())
212        }
213        DbToolCommand::IndexSearchCount(sc) => {
214            let res = search_index(
215                db_path,
216                sc.table_name,
217                sc.start,
218                SearchRange::Count(sc.count),
219            )?;
220            for (k, v) in res {
221                println!("{}: {}", k, v);
222            }
223            Ok(())
224        }
225        DbToolCommand::SetCheckpointWatermark(d) => set_checkpoint_watermark(&db_path, d),
226    }
227}
228
229pub fn print_db_all_tables(db_path: PathBuf) -> anyhow::Result<()> {
230    list_tables(db_path)?.iter().for_each(|t| println!("{}", t));
231    Ok(())
232}
233
234pub fn print_db_duplicates_summary(db_path: PathBuf) -> anyhow::Result<()> {
235    let (total_count, duplicate_count, total_bytes, duplicated_bytes) =
236        duplicate_objects_summary(db_path);
237    println!(
238        "Total objects = {}, duplicated objects = {}, total bytes = {}, duplicated bytes = {}",
239        total_count, duplicate_count, total_bytes, duplicated_bytes
240    );
241    Ok(())
242}
243
244pub fn print_last_consensus_index(path: &Path) -> anyhow::Result<()> {
245    let epoch_tables = AuthorityEpochTables::open_tables_read_write(
246        path.to_path_buf(),
247        MetricConf::default(),
248        None,
249        None,
250    );
251    let last_index = epoch_tables.get_last_consensus_index()?;
252    println!("Last consensus index is {:?}", last_index);
253    Ok(())
254}
255
256pub fn print_transaction(path: &Path, opt: PrintTransactionOptions) -> anyhow::Result<()> {
257    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None);
258    if let Some((epoch, checkpoint_seq_num)) =
259        perpetual_db.get_checkpoint_sequence_number(&opt.digest)?
260    {
261        println!(
262            "Transaction {:?} executed in epoch {} checkpoint {}",
263            opt.digest, epoch, checkpoint_seq_num
264        );
265    };
266    if let Some(effects) = perpetual_db.get_effects(&opt.digest)? {
267        println!(
268            "Transaction {:?} dependencies: {:#?}",
269            opt.digest,
270            effects.dependencies(),
271        );
272    };
273    Ok(())
274}
275
276pub fn print_object(path: &Path, opt: PrintObjectOptions) -> anyhow::Result<()> {
277    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None);
278
279    let obj = if let Some(version) = opt.version {
280        perpetual_db.get_object_by_key(&opt.id, version.into())?
281    } else {
282        perpetual_db.get_object(&opt.id)?
283    };
284
285    if let Some(obj) = obj {
286        println!("Object {:?}:\n{:#?}", opt.id, obj);
287    } else {
288        println!("Object {:?} not found", opt.id);
289    }
290
291    Ok(())
292}
293
294pub fn print_checkpoint(path: &Path, opt: PrintCheckpointOptions) -> anyhow::Result<()> {
295    let checkpoint_store = CheckpointStore::new(&path.join("checkpoints"));
296    let checkpoint = checkpoint_store
297        .get_checkpoint_by_digest(&opt.digest)?
298        .ok_or(anyhow!(
299            "Checkpoint digest {:?} not found in checkpoint store",
300            opt.digest
301        ))?;
302    println!("Checkpoint: {:?}", checkpoint);
303    drop(checkpoint_store);
304    print_checkpoint_content(
305        path,
306        PrintCheckpointContentOptions {
307            digest: checkpoint.content_digest,
308        },
309    )
310}
311
312pub fn print_checkpoint_content(
313    path: &Path,
314    opt: PrintCheckpointContentOptions,
315) -> anyhow::Result<()> {
316    let checkpoint_store = CheckpointStore::new(&path.join("checkpoints"));
317    let contents = checkpoint_store
318        .get_checkpoint_contents(&opt.digest)?
319        .ok_or(anyhow!(
320            "Checkpoint content digest {:?} not found in checkpoint store",
321            opt.digest
322        ))?;
323    println!("Checkpoint content: {:?}", contents);
324    Ok(())
325}
326
327pub fn reset_db_to_genesis(path: &Path) -> anyhow::Result<()> {
328    // Follow the below steps to test:
329    //
330    // Get a db snapshot. Either generate one by running stress locally and enabling
331    // db checkpoints or download one from S3 bucket (pretty big in size though).
332    // Download the snapshot for the epoch you want to restore to the local disk.
333    // You will find one snapshot per epoch in the S3 bucket. We need to place the
334    // snapshot in the dir where config is pointing to. If db-config in
335    // fullnode.yaml is /opt/iota/db/authorities_db and we want to restore from
336    // epoch 10, we want to copy the snapshot to /opt/iota/db/authorities_dblike
337    // this: aws s3 cp s3://myBucket/dir /opt/iota/db/authorities_db/
338    // --recursive —exclude “*” —include “epoch_10*” Mark downloaded snapshot as
339    // live: mv /opt/iota/db/authorities_db/epoch_10
340    // /opt/iota/db/authorities_db/live Reset the downloaded db to execute from
341    // genesis with: cargo run --package iota-tool -- db-tool --db-path
342    // /opt/iota/db/authorities_db/live reset-db Start the iota full node: cargo
343    // run --release --bin iota-node -- --config-path ~/db_checkpoints/fullnode.yaml
344    // A sample fullnode.yaml config would be:
345    // ---
346    // db-path:  /opt/iota/db/authorities_db
347    // network-address: /ip4/0.0.0.0/tcp/8080/http
348    // json-rpc-address: "0.0.0.0:9000"
349    // websocket-address: "0.0.0.0:9001"
350    // metrics-address: "0.0.0.0:9184"
351    // admin-interface-address: "127.0.0.1:1337"
352    // grpc-load-shed: ~
353    // grpc-concurrency-limit: ~
354    // p2p-config:
355    //   listen-address: "0.0.0.0:8084"
356    // genesis:
357    //   genesis-file-location:  <path to genesis blob for the network>
358    // authority-store-pruning-config:
359    //   num-latest-epoch-dbs-to-retain: 3
360    //   epoch-db-pruning-period-secs: 3600
361    //   num-epochs-to-retain: 18446744073709551615
362    //   max-checkpoints-in-batch: 10
363    //   max-transactions-in-batch: 1000
364    let perpetual_db = AuthorityPerpetualTables::open_tables_read_write(
365        path.join("store").join("perpetual"),
366        MetricConf::default(),
367        None,
368        None,
369    );
370    perpetual_db.reset_db_for_execution_since_genesis()?;
371
372    let checkpoint_db = CheckpointStore::open_tables_read_write(
373        path.join("checkpoints"),
374        MetricConf::default(),
375        None,
376        None,
377    );
378    checkpoint_db.reset_db_for_execution_since_genesis()?;
379
380    let epoch_db = AuthorityEpochTables::open_tables_read_write(
381        path.join("store"),
382        MetricConf::default(),
383        None,
384        None,
385    );
386    epoch_db.reset_db_for_execution_since_genesis()?;
387
388    Ok(())
389}
390
391/// Force sets the highest executed checkpoint.
392/// NOTE: Does not force re-execution of transactions.
393/// Run with: cargo run --package iota-tool -- db-tool --db-path
394/// /opt/iota/db/authorities_db/live rewind-checkpoint-execution --epoch 3
395/// --checkpoint-sequence-number 300000
396pub fn rewind_checkpoint_execution(
397    path: &Path,
398    epoch: EpochId,
399    checkpoint_sequence_number: u64,
400) -> anyhow::Result<()> {
401    let checkpoint_db = CheckpointStore::open_tables_read_write(
402        path.join("checkpoints"),
403        MetricConf::default(),
404        None,
405        None,
406    );
407    let Some(checkpoint) =
408        checkpoint_db.get_checkpoint_by_sequence_number(checkpoint_sequence_number)?
409    else {
410        bail!("Checkpoint {checkpoint_sequence_number} not found!");
411    };
412    if epoch != checkpoint.epoch() {
413        bail!(
414            "Checkpoint {checkpoint_sequence_number} is in epoch {} not {epoch}!",
415            checkpoint.epoch()
416        );
417    }
418    let highest_executed_sequence_number = checkpoint_db
419        .get_highest_executed_checkpoint_seq_number()?
420        .unwrap_or_default();
421    if checkpoint_sequence_number > highest_executed_sequence_number {
422        bail!(
423            "Must rewind checkpoint execution to be not later than highest executed ({} > {})!",
424            checkpoint_sequence_number,
425            highest_executed_sequence_number
426        );
427    }
428    checkpoint_db.set_highest_executed_checkpoint_subtle(&checkpoint)?;
429    Ok(())
430}
431
432pub fn print_db_table_summary(
433    store: StoreName,
434    epoch: Option<EpochId>,
435    path: PathBuf,
436    table_name: &str,
437) -> anyhow::Result<()> {
438    let summary = table_summary(store, epoch, path, table_name)?;
439    let quantiles = [25, 50, 75, 90, 99];
440    println!(
441        "Total num keys = {}, total key bytes = {}, total value bytes = {}",
442        summary.num_keys, summary.key_bytes_total, summary.value_bytes_total
443    );
444    println!("Key size distribution:\n");
445    quantiles.iter().for_each(|q| {
446        println!(
447            "p{:?} -> {:?} bytes\n",
448            q,
449            summary.key_hist.value_at_quantile(*q as f64 / 100.0)
450        );
451    });
452    println!("Value size distribution:\n");
453    quantiles.iter().for_each(|q| {
454        println!(
455            "p{:?} -> {:?} bytes\n",
456            q,
457            summary.value_hist.value_at_quantile(*q as f64 / 100.0)
458        );
459    });
460    Ok(())
461}
462
463pub fn print_all_entries(
464    store: StoreName,
465    epoch: Option<EpochId>,
466    path: PathBuf,
467    table_name: &str,
468    page_size: u16,
469    page_number: usize,
470) -> anyhow::Result<()> {
471    for (k, v) in dump_table(store, epoch, path, table_name, page_size, page_number)? {
472        println!("{:>100?}: {:?}", k, v);
473    }
474    Ok(())
475}
476
477/// Force sets state sync checkpoint watermarks.
478/// Run with (for example):
479/// cargo run --package iota-tool -- db-tool --db-path
480/// /opt/iota/db/authorities_db/live set_checkpoint_watermark --highest-synced
481/// 300000
482pub fn set_checkpoint_watermark(
483    path: &Path,
484    options: SetCheckpointWatermarkOptions,
485) -> anyhow::Result<()> {
486    let checkpoint_db = CheckpointStore::open_tables_read_write(
487        path.join("checkpoints"),
488        MetricConf::default(),
489        None,
490        None,
491    );
492
493    if let Some(highest_verified) = options.highest_verified {
494        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_verified)?
495        else {
496            bail!("Checkpoint {highest_verified} not found");
497        };
498        checkpoint_db.update_highest_verified_checkpoint(&checkpoint)?;
499    }
500    if let Some(highest_synced) = options.highest_synced {
501        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_synced)?
502        else {
503            bail!("Checkpoint {highest_synced} not found");
504        };
505        checkpoint_db.update_highest_synced_checkpoint(&checkpoint)?;
506    }
507    Ok(())
508}