iota_tool/db_tool/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::{Path, PathBuf};
6
7use anyhow::{anyhow, bail};
8use clap::Parser;
9use iota_core::{
10    authority::{
11        authority_per_epoch_store::AuthorityEpochTables,
12        authority_store_tables::AuthorityPerpetualTables,
13    },
14    checkpoints::CheckpointStore,
15};
16use iota_types::{
17    base_types::{EpochId, ObjectID},
18    digests::{CheckpointContentsDigest, TransactionDigest},
19    effects::TransactionEffectsAPI,
20    messages_checkpoint::{CheckpointDigest, CheckpointSequenceNumber},
21    storage::ObjectStore,
22};
23use typed_store::rocks::{MetricConf, safe_drop_db};
24
25use self::{
26    db_dump::{StoreName, dump_table, duplicate_objects_summary, list_tables, table_summary},
27    index_search::{SearchRange, search_index},
28};
29use crate::db_tool::db_dump::{compact, print_table_metadata, prune_checkpoints, prune_objects};
30pub mod db_dump;
31mod index_search;
32
33#[derive(Parser)]
34pub enum DbToolCommand {
35    ListTables,
36    Dump(Options),
37    IndexSearchKeyRange(IndexSearchKeyRangeOptions),
38    IndexSearchCount(IndexSearchCountOptions),
39    TableSummary(Options),
40    DuplicatesSummary,
41    ListDBMetadata(Options),
42    PrintLastConsensusIndex,
43    PrintTransaction(PrintTransactionOptions),
44    PrintObject(PrintObjectOptions),
45    PrintCheckpoint(PrintCheckpointOptions),
46    PrintCheckpointContent(PrintCheckpointContentOptions),
47    ResetDB,
48    RewindCheckpointExecution(RewindCheckpointExecutionOptions),
49    Compact,
50    PruneObjects,
51    PruneCheckpoints,
52    SetCheckpointWatermark(SetCheckpointWatermarkOptions),
53}
54
55#[derive(Parser)]
56pub struct IndexSearchKeyRangeOptions {
57    #[arg(long, short = 't')]
58    table_name: String,
59    #[arg(long, short = 's')]
60    start: String,
61    #[arg(long = "end", short = 'e')]
62    end_key: String,
63}
64
65#[derive(Parser)]
66pub struct IndexSearchCountOptions {
67    #[arg(long, short = 't')]
68    table_name: String,
69    #[arg(long, short = 's')]
70    start: String,
71    #[arg(long, short = 'c')]
72    count: u64,
73}
74
75#[derive(Parser)]
76pub struct Options {
77    /// The type of store to dump
78    #[arg(long = "store", short = 's', value_enum)]
79    store_name: StoreName,
80    /// The name of the table to dump
81    #[arg(long, short = 't')]
82    table_name: String,
83    /// The size of page to dump. This is a u16
84    #[arg(long, short = 'p')]
85    page_size: u16,
86    /// The page number to dump
87    #[arg(long = "page-num", short = 'n')]
88    page_number: usize,
89
90    // TODO: We should load this automatically from the system object in AuthorityPerpetualTables.
91    // This is very difficult to do right now because you can't share code between
92    // AuthorityPerpetualTables and AuthorityEpochTablesReadonly.
93    /// The epoch to use when loading AuthorityEpochTables.
94    #[arg(long, short = 'e')]
95    epoch: Option<EpochId>,
96}
97
98#[derive(Parser)]
99pub struct PrintTransactionOptions {
100    #[arg(long, help = "The transaction digest to print")]
101    digest: TransactionDigest,
102}
103
104#[derive(Parser)]
105pub struct PrintObjectOptions {
106    #[arg(long, help = "The object id to print")]
107    id: ObjectID,
108    #[arg(long, help = "The object version to print")]
109    version: Option<u64>,
110}
111
112#[derive(Parser)]
113pub struct PrintCheckpointOptions {
114    #[arg(long, help = "The checkpoint digest to print")]
115    digest: CheckpointDigest,
116}
117
118#[derive(Parser)]
119pub struct PrintCheckpointContentOptions {
120    #[arg(
121        long,
122        help = "The checkpoint content digest (NOT the checkpoint digest)"
123    )]
124    digest: CheckpointContentsDigest,
125}
126
127#[derive(Parser)]
128pub struct RemoveTransactionOptions {
129    #[arg(long, help = "The transaction digest to remove")]
130    digest: TransactionDigest,
131
132    #[arg(long)]
133    confirm: bool,
134
135    /// The epoch to use when loading AuthorityEpochTables.
136    /// Defaults to the current epoch.
137    #[arg(long, short = 'e')]
138    epoch: Option<EpochId>,
139}
140
141#[derive(Parser)]
142pub struct RemoveObjectLockOptions {
143    #[arg(long, help = "The object ID to remove")]
144    id: ObjectID,
145
146    #[arg(long, help = "The object version to remove")]
147    version: u64,
148
149    #[arg(long)]
150    confirm: bool,
151}
152
153#[derive(Parser)]
154pub struct RewindCheckpointExecutionOptions {
155    #[arg(long)]
156    epoch: EpochId,
157
158    #[arg(long)]
159    checkpoint_sequence_number: u64,
160}
161
162#[derive(Parser)]
163pub struct SetCheckpointWatermarkOptions {
164    #[arg(long)]
165    highest_verified: Option<CheckpointSequenceNumber>,
166
167    #[arg(long)]
168    highest_synced: Option<CheckpointSequenceNumber>,
169}
170
171pub async fn execute_db_tool_command(db_path: PathBuf, cmd: DbToolCommand) -> anyhow::Result<()> {
172    match cmd {
173        DbToolCommand::ListTables => print_db_all_tables(db_path),
174        DbToolCommand::Dump(d) => print_all_entries(
175            d.store_name,
176            d.epoch,
177            db_path,
178            &d.table_name,
179            d.page_size,
180            d.page_number,
181        ),
182        DbToolCommand::TableSummary(d) => {
183            print_db_table_summary(d.store_name, d.epoch, db_path, &d.table_name)
184        }
185        DbToolCommand::DuplicatesSummary => print_db_duplicates_summary(db_path),
186        DbToolCommand::ListDBMetadata(d) => {
187            print_table_metadata(d.store_name, d.epoch, db_path, &d.table_name)
188        }
189        DbToolCommand::PrintLastConsensusIndex => print_last_consensus_index(&db_path),
190        DbToolCommand::PrintTransaction(d) => print_transaction(&db_path, d),
191        DbToolCommand::PrintObject(o) => print_object(&db_path, o),
192        DbToolCommand::PrintCheckpoint(d) => print_checkpoint(&db_path, d),
193        DbToolCommand::PrintCheckpointContent(d) => print_checkpoint_content(&db_path, d),
194        DbToolCommand::ResetDB => reset_db_to_genesis(&db_path).await,
195        DbToolCommand::RewindCheckpointExecution(d) => {
196            rewind_checkpoint_execution(&db_path, d.epoch, d.checkpoint_sequence_number)
197        }
198        DbToolCommand::Compact => compact(db_path),
199        DbToolCommand::PruneObjects => prune_objects(db_path).await,
200        DbToolCommand::PruneCheckpoints => prune_checkpoints(db_path).await,
201        DbToolCommand::IndexSearchKeyRange(rg) => {
202            let res = search_index(
203                db_path,
204                rg.table_name,
205                rg.start,
206                SearchRange::ExclusiveLastKey(rg.end_key),
207            )?;
208            for (k, v) in res {
209                println!("{k}: {v}");
210            }
211            Ok(())
212        }
213        DbToolCommand::IndexSearchCount(sc) => {
214            let res = search_index(
215                db_path,
216                sc.table_name,
217                sc.start,
218                SearchRange::Count(sc.count),
219            )?;
220            for (k, v) in res {
221                println!("{k}: {v}");
222            }
223            Ok(())
224        }
225        DbToolCommand::SetCheckpointWatermark(d) => set_checkpoint_watermark(&db_path, d),
226    }
227}
228
229pub fn print_db_all_tables(db_path: PathBuf) -> anyhow::Result<()> {
230    list_tables(db_path)?.iter().for_each(|t| println!("{t}"));
231    Ok(())
232}
233
234pub fn print_db_duplicates_summary(db_path: PathBuf) -> anyhow::Result<()> {
235    let (total_count, duplicate_count, total_bytes, duplicated_bytes) =
236        duplicate_objects_summary(db_path)?;
237    println!(
238        "Total objects = {total_count}, duplicated objects = {duplicate_count}, total bytes = {total_bytes}, duplicated bytes = {duplicated_bytes}"
239    );
240    Ok(())
241}
242
243pub fn print_last_consensus_index(path: &Path) -> anyhow::Result<()> {
244    let epoch_tables = AuthorityEpochTables::open_tables_read_write(
245        path.to_path_buf(),
246        MetricConf::default(),
247        None,
248        None,
249    );
250    let last_index = epoch_tables.get_last_consensus_index()?;
251    println!("Last consensus index is {last_index:?}");
252    Ok(())
253}
254
255pub fn print_transaction(path: &Path, opt: PrintTransactionOptions) -> anyhow::Result<()> {
256    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None);
257    if let Some((epoch, checkpoint_seq_num)) =
258        perpetual_db.get_checkpoint_sequence_number(&opt.digest)?
259    {
260        println!(
261            "Transaction {:?} executed in epoch {} checkpoint {}",
262            opt.digest, epoch, checkpoint_seq_num
263        );
264    };
265    if let Some(effects) = perpetual_db.get_effects(&opt.digest)? {
266        println!(
267            "Transaction {:?} dependencies: {:#?}",
268            opt.digest,
269            effects.dependencies(),
270        );
271    };
272    Ok(())
273}
274
275pub fn print_object(path: &Path, opt: PrintObjectOptions) -> anyhow::Result<()> {
276    let perpetual_db = AuthorityPerpetualTables::open(&path.join("store"), None);
277
278    let obj = if let Some(version) = opt.version {
279        perpetual_db.try_get_object_by_key(&opt.id, version.into())?
280    } else {
281        perpetual_db.try_get_object(&opt.id)?
282    };
283
284    if let Some(obj) = obj {
285        println!("Object {:?}:\n{:#?}", opt.id, obj);
286    } else {
287        println!("Object {:?} not found", opt.id);
288    }
289
290    Ok(())
291}
292
293pub fn print_checkpoint(path: &Path, opt: PrintCheckpointOptions) -> anyhow::Result<()> {
294    let checkpoint_store = CheckpointStore::new(&path.join("checkpoints"));
295    let checkpoint = checkpoint_store
296        .get_checkpoint_by_digest(&opt.digest)?
297        .ok_or(anyhow!(
298            "Checkpoint digest {:?} not found in checkpoint store",
299            opt.digest
300        ))?;
301    println!("Checkpoint: {checkpoint:?}");
302    drop(checkpoint_store);
303    print_checkpoint_content(
304        path,
305        PrintCheckpointContentOptions {
306            digest: checkpoint.content_digest,
307        },
308    )
309}
310
311pub fn print_checkpoint_content(
312    path: &Path,
313    opt: PrintCheckpointContentOptions,
314) -> anyhow::Result<()> {
315    let checkpoint_store = CheckpointStore::new(&path.join("checkpoints"));
316    let contents = checkpoint_store
317        .get_checkpoint_contents(&opt.digest)?
318        .ok_or(anyhow!(
319            "Checkpoint content digest {:?} not found in checkpoint store",
320            opt.digest
321        ))?;
322    println!("Checkpoint content: {contents:?}");
323    Ok(())
324}
325
326pub async fn reset_db_to_genesis(path: &Path) -> anyhow::Result<()> {
327    // Follow the below steps to test:
328    //
329    // Get a db snapshot. Either generate one by running stress locally and enabling
330    // db checkpoints or download one from S3 bucket (pretty big in size though).
331    // Download the snapshot for the epoch you want to restore to the local disk.
332    // You will find one snapshot per epoch in the S3 bucket. We need to place the
333    // snapshot in the dir where config is pointing to. If db-config in
334    // fullnode.yaml is /opt/iota/db/authorities_db and we want to restore from
335    // epoch 10, we want to copy the snapshot to /opt/iota/db/authorities_dblike
336    // this: aws s3 cp s3://myBucket/dir /opt/iota/db/authorities_db/
337    // --recursive —exclude “*” —include “epoch_10*” Mark downloaded snapshot as
338    // live: mv /opt/iota/db/authorities_db/epoch_10
339    // /opt/iota/db/authorities_db/live Reset the downloaded db to execute from
340    // genesis with: cargo run --package iota-tool -- db-tool --db-path
341    // /opt/iota/db/authorities_db/live reset-db Start the iota full node: cargo
342    // run --release --bin iota-node -- --config-path ~/db_checkpoints/fullnode.yaml
343    // A sample fullnode.yaml config would be:
344    // ---
345    // db-path:  /opt/iota/db/authorities_db
346    // network-address: /ip4/0.0.0.0/tcp/8080/http
347    // json-rpc-address: "0.0.0.0:9000"
348    // websocket-address: "0.0.0.0:9001"
349    // metrics-address: "0.0.0.0:9184"
350    // admin-interface-address: "127.0.0.1:1337"
351    // grpc-load-shed: ~
352    // grpc-concurrency-limit: ~
353    // p2p-config:
354    //   listen-address: "0.0.0.0:8084"
355    // genesis:
356    //   genesis-file-location:  <path to genesis blob for the network>
357    // authority-store-pruning-config:
358    //   num-latest-epoch-dbs-to-retain: 3
359    //   epoch-db-pruning-period-secs: 3600
360    //   num-epochs-to-retain: 18446744073709551615
361    //   max-checkpoints-in-batch: 10
362    //   max-transactions-in-batch: 1000
363    safe_drop_db(
364        path.join("store").join("perpetual"),
365        std::time::Duration::from_secs(60),
366    )
367    .await?;
368
369    let checkpoint_db = CheckpointStore::new(&path.join("checkpoints"));
370    checkpoint_db.reset_db_for_execution_since_genesis()?;
371
372    Ok(())
373}
374
375/// Force sets the highest executed checkpoint.
376/// NOTE: Does not force re-execution of transactions.
377/// Run with: cargo run --package iota-tool -- db-tool --db-path
378/// /opt/iota/db/authorities_db/live rewind-checkpoint-execution --epoch 3
379/// --checkpoint-sequence-number 300000
380pub fn rewind_checkpoint_execution(
381    path: &Path,
382    epoch: EpochId,
383    checkpoint_sequence_number: u64,
384) -> anyhow::Result<()> {
385    let checkpoint_db = CheckpointStore::new(&path.join("checkpoints"));
386    let Some(checkpoint) =
387        checkpoint_db.get_checkpoint_by_sequence_number(checkpoint_sequence_number)?
388    else {
389        bail!("Checkpoint {checkpoint_sequence_number} not found!");
390    };
391    if epoch != checkpoint.epoch() {
392        bail!(
393            "Checkpoint {checkpoint_sequence_number} is in epoch {} not {epoch}!",
394            checkpoint.epoch()
395        );
396    }
397
398    let highest_executed_sequence_number = checkpoint_db
399        .get_highest_executed_checkpoint_seq_number()?
400        .unwrap_or_default();
401    if checkpoint_sequence_number > highest_executed_sequence_number {
402        bail!(
403            "Must rewind checkpoint execution to be not later than highest executed ({} > {})!",
404            checkpoint_sequence_number,
405            highest_executed_sequence_number
406        );
407    }
408    checkpoint_db.set_highest_executed_checkpoint_subtle(&checkpoint)?;
409    Ok(())
410}
411
412pub fn print_db_table_summary(
413    store: StoreName,
414    epoch: Option<EpochId>,
415    path: PathBuf,
416    table_name: &str,
417) -> anyhow::Result<()> {
418    let summary = table_summary(store, epoch, path, table_name)?;
419    let quantiles = [25, 50, 75, 90, 99];
420    println!(
421        "Total num keys = {}, total key bytes = {}, total value bytes = {}",
422        summary.num_keys, summary.key_bytes_total, summary.value_bytes_total
423    );
424    println!("Key size distribution:\n");
425    quantiles.iter().for_each(|q| {
426        println!(
427            "p{:?} -> {:?} bytes\n",
428            q,
429            summary.key_hist.value_at_quantile(*q as f64 / 100.0)
430        );
431    });
432    println!("Value size distribution:\n");
433    quantiles.iter().for_each(|q| {
434        println!(
435            "p{:?} -> {:?} bytes\n",
436            q,
437            summary.value_hist.value_at_quantile(*q as f64 / 100.0)
438        );
439    });
440    Ok(())
441}
442
443pub fn print_all_entries(
444    store: StoreName,
445    epoch: Option<EpochId>,
446    path: PathBuf,
447    table_name: &str,
448    page_size: u16,
449    page_number: usize,
450) -> anyhow::Result<()> {
451    for (k, v) in dump_table(store, epoch, path, table_name, page_size, page_number)? {
452        println!("{k:>100?}: {v:?}");
453    }
454    Ok(())
455}
456
457/// Force sets state sync checkpoint watermarks.
458/// Run with (for example):
459/// cargo run --package iota-tool -- db-tool --db-path
460/// /opt/iota/db/authorities_db/live set_checkpoint_watermark --highest-synced
461/// 300000
462pub fn set_checkpoint_watermark(
463    path: &Path,
464    options: SetCheckpointWatermarkOptions,
465) -> anyhow::Result<()> {
466    let checkpoint_db = CheckpointStore::new(&path.join("checkpoints"));
467
468    if let Some(highest_verified) = options.highest_verified {
469        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_verified)?
470        else {
471            bail!("Checkpoint {highest_verified} not found");
472        };
473        checkpoint_db.update_highest_verified_checkpoint(&checkpoint)?;
474    }
475    if let Some(highest_synced) = options.highest_synced {
476        let Some(checkpoint) = checkpoint_db.get_checkpoint_by_sequence_number(highest_synced)?
477        else {
478            bail!("Checkpoint {highest_synced} not found");
479        };
480        checkpoint_db.update_highest_synced_checkpoint(&checkpoint)?;
481    }
482    Ok(())
483}