iota_tool/
lib.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    collections::BTreeMap,
8    fmt::Write,
9    fs, io,
10    num::NonZeroUsize,
11    ops::Range,
12    path::{Path, PathBuf},
13    sync::{
14        Arc,
15        atomic::{AtomicU64, AtomicUsize, Ordering},
16    },
17    time::Duration,
18};
19
20use anyhow::{Result, anyhow};
21use clap::ValueEnum;
22use eyre::ContextCompat;
23use fastcrypto::{hash::MultisetHash, traits::ToFromBytes};
24use futures::{
25    StreamExt, TryStreamExt,
26    future::{AbortHandle, join_all},
27};
28use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
29use iota_archival::{
30    reader::{ArchiveReader, ArchiveReaderMetrics},
31    verify_archive_with_checksums, verify_archive_with_genesis_config,
32};
33use iota_config::{
34    NodeConfig,
35    genesis::Genesis,
36    node::ArchiveReaderConfig,
37    object_storage_config::{ObjectStoreConfig, ObjectStoreType},
38};
39use iota_core::{
40    authority::{AuthorityStore, authority_store_tables::AuthorityPerpetualTables},
41    authority_client::{AuthorityAPI, NetworkAuthorityClient},
42    checkpoints::CheckpointStore,
43    epoch::committee_store::CommitteeStore,
44    execution_cache::build_execution_cache_from_env,
45    storage::RocksDbStore,
46};
47use iota_network::default_iota_network_config;
48use iota_protocol_config::Chain;
49use iota_sdk::{IotaClient, IotaClientBuilder};
50use iota_snapshot::{reader::StateSnapshotReaderV1, setup_db_state};
51use iota_storage::{
52    object_store::{
53        ObjectStoreGetExt,
54        http::HttpDownloaderBuilder,
55        util::{MANIFEST_FILENAME, Manifest, PerEpochManifest, copy_file, exists, get_path},
56    },
57    verify_checkpoint_range,
58};
59use iota_types::{
60    accumulator::Accumulator,
61    base_types::*,
62    committee::QUORUM_THRESHOLD,
63    crypto::AuthorityPublicKeyBytes,
64    messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest},
65    messages_grpc::{
66        LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
67        TransactionInfoRequest, TransactionStatus,
68    },
69    multiaddr::Multiaddr,
70    object::Owner,
71    storage::{ReadStore, SharedInMemoryStore},
72};
73use itertools::Itertools;
74use prometheus::Registry;
75use serde::{Deserialize, Serialize};
76use tokio::{sync::mpsc, task::JoinHandle, time::Instant};
77use tracing::info;
78use typed_store::rocks::MetricConf;
79
80pub mod commands;
81pub mod db_tool;
82
83#[derive(
84    Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
85)]
86pub enum SnapshotVerifyMode {
87    /// verification of both db state and downloaded checkpoints are skipped.
88    /// This is the fastest mode, but is unsafe, and thus should only be used
89    /// if you fully trust the source for both the snapshot and the checkpoint
90    /// archive.
91    None,
92    /// verify snapshot state during download, but no post-restore db
93    /// verification. Checkpoint verification is performed.
94    #[default]
95    Normal,
96    /// In ADDITION to the behavior of `--verify normal`, verify db state
97    /// post-restore against the end of epoch state root commitment.
98    Strict,
99}
100
101// Make clients for fetching relevant data (objects, transactions, checkpoints)
102// from the current committee members.
103async fn make_clients(
104    iota_client: &Arc<IotaClient>,
105) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
106    let mut net_config = default_iota_network_config();
107    net_config.connect_timeout = Some(Duration::from_secs(5));
108    let mut authority_clients = BTreeMap::new();
109
110    let state = iota_client
111        .governance_api()
112        .get_latest_iota_system_state()
113        .await?;
114
115    for committee_member in state.iter_committee_members() {
116        let net_addr = Multiaddr::try_from(committee_member.net_address.clone())?;
117        let channel = net_config
118            .connect_lazy(&net_addr)
119            .map_err(|err| anyhow!(err.to_string()))?;
120        let client = NetworkAuthorityClient::new(channel);
121        let public_key_bytes =
122            AuthorityPublicKeyBytes::from_bytes(&committee_member.authority_pubkey_bytes)?;
123        authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
124    }
125
126    Ok(authority_clients)
127}
128
129type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
130pub struct ObjectData {
131    requested_id: ObjectID,
132    responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
133}
134
135trait OptionDebug<T> {
136    fn opt_debug(&self, def_str: &str) -> String;
137}
138
139impl<T> OptionDebug<T> for Option<T>
140where
141    T: std::fmt::Debug,
142{
143    fn opt_debug(&self, def_str: &str) -> String {
144        match self {
145            None => def_str.to_string(),
146            Some(t) => format!("{:?}", t),
147        }
148    }
149}
150
151#[expect(clippy::type_complexity)]
152pub struct GroupedObjectOutput {
153    pub grouped_results: BTreeMap<
154        Option<(
155            Option<SequenceNumber>,
156            ObjectDigest,
157            TransactionDigest,
158            Owner,
159            Option<TransactionDigest>,
160        )>,
161        Vec<AuthorityName>,
162    >,
163    pub voting_power: Vec<(
164        Option<(
165            Option<SequenceNumber>,
166            ObjectDigest,
167            TransactionDigest,
168            Owner,
169            Option<TransactionDigest>,
170        )>,
171        u64,
172    )>,
173    pub available_voting_power: u64,
174    pub fully_locked: bool,
175}
176
177impl GroupedObjectOutput {
178    pub fn new(
179        object_data: ObjectData,
180        committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
181    ) -> Self {
182        let mut grouped_results = BTreeMap::new();
183        let mut voting_power = BTreeMap::new();
184        let mut available_voting_power = 0;
185        for (name, _, (version, resp, _elapsed)) in &object_data.responses {
186            let stake = committee.get(name).unwrap();
187            let key = match resp {
188                Ok(r) => {
189                    let obj_digest = r.object.compute_object_reference().2;
190                    let parent_tx_digest = r.object.previous_transaction;
191                    let owner = r.object.owner;
192                    let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
193                    if lock.is_none() {
194                        available_voting_power += stake;
195                    }
196                    Some((*version, obj_digest, parent_tx_digest, owner, lock))
197                }
198                Err(_) => None,
199            };
200            let entry = grouped_results.entry(key).or_insert_with(Vec::new);
201            entry.push(*name);
202            let entry: &mut u64 = voting_power.entry(key).or_default();
203            *entry += stake;
204        }
205        let voting_power = voting_power
206            .into_iter()
207            .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
208            .collect::<Vec<_>>();
209        let mut fully_locked = false;
210        if !voting_power.is_empty()
211            && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
212        {
213            fully_locked = true;
214        }
215        Self {
216            grouped_results,
217            voting_power,
218            available_voting_power,
219            fully_locked,
220        }
221    }
222}
223
224impl std::fmt::Display for GroupedObjectOutput {
225    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226        writeln!(f, "available stake: {}", self.available_voting_power)?;
227        writeln!(f, "fully locked: {}", self.fully_locked)?;
228        writeln!(f, "{:<100}\n", "-".repeat(100))?;
229        for (key, stake) in &self.voting_power {
230            let val = self.grouped_results.get(key).unwrap();
231            writeln!(f, "total stake: {stake}")?;
232            match key {
233                Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
234                    let lock = lock.opt_debug("no-known-lock");
235                    writeln!(f, "obj ref: {obj_digest}")?;
236                    writeln!(f, "parent tx: {parent_tx_digest}")?;
237                    writeln!(f, "owner: {owner}")?;
238                    writeln!(f, "lock: {lock}")?;
239                    for (i, name) in val.iter().enumerate() {
240                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
241                    }
242                }
243                None => {
244                    writeln!(f, "ERROR")?;
245                    for (i, name) in val.iter().enumerate() {
246                        writeln!(f, "        {:<4} {:<20}", i, name.concise(),)?;
247                    }
248                }
249            };
250            writeln!(f, "{:<100}\n", "-".repeat(100))?;
251        }
252        Ok(())
253    }
254}
255
256struct ConciseObjectOutput(ObjectData);
257
258impl ConciseObjectOutput {
259    fn header() -> String {
260        format!(
261            "{:<20} {:<8} {:<66} {:<45} {}",
262            "validator", "version", "digest", "parent_cert", "owner"
263        )
264    }
265}
266
267impl std::fmt::Display for ConciseObjectOutput {
268    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269        for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
270            write!(
271                f,
272                "{:<20} {:<8}",
273                format!("{:?}", name.concise()),
274                version.map(|s| s.value()).opt_debug("-")
275            )?;
276            match resp {
277                Err(_) => writeln!(
278                    f,
279                    "{:<66} {:<45} {:<51}",
280                    "object-fetch-failed", "no-cert-available", "no-owner-available"
281                )?,
282                Ok(resp) => {
283                    let obj_digest = resp.object.compute_object_reference().2;
284                    let parent = resp.object.previous_transaction;
285                    let owner = resp.object.owner;
286                    write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
287                }
288            }
289            writeln!(f)?;
290        }
291        Ok(())
292    }
293}
294
295struct VerboseObjectOutput(ObjectData);
296
297impl std::fmt::Display for VerboseObjectOutput {
298    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299        writeln!(f, "Object: {}", self.0.requested_id)?;
300
301        for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
302            writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
303            writeln!(
304                f,
305                "-- version: {} ({:.3}s)",
306                version.opt_debug("<version not available>"),
307                timespent,
308            )?;
309
310            match resp {
311                Err(e) => writeln!(f, "Error fetching object: {}", e)?,
312                Ok(resp) => {
313                    writeln!(
314                        f,
315                        "  -- object digest: {}",
316                        resp.object.compute_object_reference().2
317                    )?;
318                    if resp.object.is_package() {
319                        writeln!(f, "  -- object: <Move Package>")?;
320                    } else if let Some(layout) = &resp.layout {
321                        writeln!(
322                            f,
323                            "  -- object: Move Object: {}",
324                            resp.object
325                                .data
326                                .try_as_move()
327                                .unwrap()
328                                .to_move_struct(layout)
329                                .unwrap()
330                        )?;
331                    }
332                    writeln!(f, "  -- owner: {}", resp.object.owner)?;
333                    writeln!(
334                        f,
335                        "  -- locked by: {}",
336                        resp.lock_for_debugging.opt_debug("<not locked>")
337                    )?;
338                }
339            }
340        }
341        Ok(())
342    }
343}
344
345pub async fn get_object(
346    obj_id: ObjectID,
347    version: Option<u64>,
348    validator: Option<AuthorityName>,
349    clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
350) -> Result<ObjectData> {
351    let responses = join_all(
352        clients
353            .iter()
354            .filter(|(name, _)| {
355                if let Some(v) = validator {
356                    v == **name
357                } else {
358                    true
359                }
360            })
361            .map(|(name, (address, client))| async {
362                let object_version = get_object_impl(client, obj_id, version).await;
363                (*name, address.clone(), object_version)
364            }),
365    )
366    .await;
367
368    Ok(ObjectData {
369        requested_id: obj_id,
370        responses,
371    })
372}
373
374pub async fn get_transaction_block(
375    tx_digest: TransactionDigest,
376    show_input_tx: bool,
377    fullnode_rpc: String,
378) -> Result<String> {
379    let iota_client = Arc::new(IotaClientBuilder::default().build(fullnode_rpc).await?);
380    let clients = make_clients(&iota_client).await?;
381    let timer = Instant::now();
382    let responses = join_all(clients.iter().map(|(name, (address, client))| async {
383        let result = client
384            .handle_transaction_info_request(TransactionInfoRequest {
385                transaction_digest: tx_digest,
386            })
387            .await;
388        (
389            *name,
390            address.clone(),
391            result,
392            timer.elapsed().as_secs_f64(),
393        )
394    }))
395    .await;
396
397    // Grab one validator that return Some(TransactionInfoResponse)
398    let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
399
400    let responses = responses
401        .iter()
402        .map(|r| {
403            let key =
404                r.2.as_ref()
405                    .map(|ok_result| match &ok_result.status {
406                        TransactionStatus::Signed(_) => None,
407                        TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
408                    })
409                    .ok();
410            let err = r.2.as_ref().err();
411            (key, err, r)
412        })
413        .sorted_by(|(k1, err1, _), (k2, err2, _)| {
414            Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
415        })
416        .chunk_by(|(_, _err, r)| {
417            r.2.as_ref().map(|ok_result| match &ok_result.status {
418                TransactionStatus::Signed(_) => None,
419                TransactionStatus::Executed(_, effects, _) => Some((
420                    ok_result.transaction.transaction_data(),
421                    effects.data(),
422                    effects.digest(),
423                )),
424            })
425        });
426    let mut s = String::new();
427    for (i, (key, group)) in responses.into_iter().enumerate() {
428        match key {
429            Ok(Some((tx, effects, effects_digest))) => {
430                writeln!(
431                    &mut s,
432                    "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
433                    i, tx_digest, effects_digest,
434                )?;
435                writeln!(&mut s, "{:#?}", effects)?;
436                if show_input_tx {
437                    writeln!(&mut s, "{:#?}", tx)?;
438                }
439            }
440            Ok(None) => {
441                writeln!(
442                    &mut s,
443                    "#{:<2} tx_digest: {:<68?} Signed but not executed",
444                    i, tx_digest
445                )?;
446                if show_input_tx {
447                    // In this case, we expect at least one validator knows about this tx
448                    let validator_aware_of_tx = validator_aware_of_tx.unwrap();
449                    let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
450                    let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
451                        transaction_digest: tx_digest,
452                    }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
453                    writeln!(&mut s, "{:#?}", tx_info)?;
454                }
455            }
456            other => {
457                writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
458            }
459        }
460        for (j, (_, _, res)) in group.enumerate() {
461            writeln!(
462                &mut s,
463                "        {:<4} {:<20} {:<56} ({:.3}s)",
464                j,
465                res.0.concise(),
466                format!("{}", res.1),
467                res.3
468            )?;
469        }
470        writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
471    }
472    Ok(s)
473}
474
475async fn get_object_impl(
476    client: &NetworkAuthorityClient,
477    id: ObjectID,
478    version: Option<u64>,
479) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
480    let start = Instant::now();
481    let resp = client
482        .handle_object_info_request(ObjectInfoRequest {
483            object_id: id,
484            generate_layout: LayoutGenerationOption::Generate,
485            request_kind: match version {
486                None => ObjectInfoRequestKind::LatestObjectInfo,
487                Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
488            },
489        })
490        .await
491        .map_err(anyhow::Error::from);
492    let elapsed = start.elapsed().as_secs_f64();
493
494    let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
495    (resp_version.map(SequenceNumber::from), resp, elapsed)
496}
497
498pub(crate) fn make_anemo_config() -> anemo_cli::Config {
499    use iota_network::{discovery::*, state_sync::*};
500
501    // TODO: implement `ServiceInfo` generation in anemo-build and use here.
502    anemo_cli::Config::new()
503        // IOTA discovery
504        .add_service(
505            "Discovery",
506            anemo_cli::ServiceInfo::new().add_method(
507                "GetKnownPeers",
508                anemo_cli::ron_method!(DiscoveryClient, get_known_peers, ()),
509            ),
510        )
511        // IOTA state sync
512        .add_service(
513            "StateSync",
514            anemo_cli::ServiceInfo::new()
515                .add_method(
516                    "PushCheckpointSummary",
517                    anemo_cli::ron_method!(
518                        StateSyncClient,
519                        push_checkpoint_summary,
520                        iota_types::messages_checkpoint::CertifiedCheckpointSummary
521                    ),
522                )
523                .add_method(
524                    "GetCheckpointSummary",
525                    anemo_cli::ron_method!(
526                        StateSyncClient,
527                        get_checkpoint_summary,
528                        GetCheckpointSummaryRequest
529                    ),
530                )
531                .add_method(
532                    "GetCheckpointContents",
533                    anemo_cli::ron_method!(
534                        StateSyncClient,
535                        get_checkpoint_contents,
536                        iota_types::messages_checkpoint::CheckpointContentsDigest
537                    ),
538                )
539                .add_method(
540                    "GetCheckpointAvailability",
541                    anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
542                ),
543        )
544}
545
546fn copy_dir_all(
547    src: impl AsRef<Path>,
548    dst: impl AsRef<Path>,
549    skip: Vec<PathBuf>,
550) -> io::Result<()> {
551    fs::create_dir_all(&dst)?;
552    for entry in fs::read_dir(src)? {
553        let entry = entry?;
554        let ty = entry.file_type()?;
555        if skip.contains(&entry.path()) {
556            continue;
557        }
558        if ty.is_dir() {
559            copy_dir_all(
560                entry.path(),
561                dst.as_ref().join(entry.file_name()),
562                skip.clone(),
563            )?;
564        } else {
565            fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
566        }
567    }
568    Ok(())
569}
570
571pub async fn restore_from_db_checkpoint(
572    config: &NodeConfig,
573    db_checkpoint_path: &Path,
574) -> Result<(), anyhow::Error> {
575    copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
576    Ok(())
577}
578
579fn start_summary_sync(
580    perpetual_db: Arc<AuthorityPerpetualTables>,
581    committee_store: Arc<CommitteeStore>,
582    checkpoint_store: Arc<CheckpointStore>,
583    m: MultiProgress,
584    genesis: Genesis,
585    archive_store_config: ObjectStoreConfig,
586    epoch: u64,
587    num_parallel_downloads: usize,
588    verify: bool,
589    all_checkpoints: bool,
590) -> JoinHandle<Result<(), anyhow::Error>> {
591    tokio::spawn(async move {
592        info!("Starting summary sync");
593        let store =
594            AuthorityStore::open_no_genesis(perpetual_db, usize::MAX, false, &Registry::default())?;
595        let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
596        let state_sync_store =
597            RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
598        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
599        // already
600        if checkpoint_store
601            .get_checkpoint_by_digest(genesis.checkpoint().digest())
602            .unwrap()
603            .is_none()
604        {
605            checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
606            checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
607            checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
608        }
609        // set up download of checkpoint summaries
610        let config = ArchiveReaderConfig {
611            remote_store_config: archive_store_config,
612            download_concurrency: NonZeroUsize::new(num_parallel_downloads).unwrap(),
613            use_for_pruning_watermark: false,
614        };
615        let metrics = ArchiveReaderMetrics::new(&Registry::default());
616        let archive_reader = ArchiveReader::new(config, &metrics)?;
617        archive_reader.sync_manifest_once().await?;
618        let manifest = archive_reader.get_manifest().await?;
619
620        let end_of_epoch_checkpoint_seq_nums = (0..=epoch)
621            .map(|e| manifest.next_checkpoint_after_epoch(e) - 1)
622            .collect::<Vec<_>>();
623        let last_checkpoint = end_of_epoch_checkpoint_seq_nums
624            .last()
625            .expect("Expected at least one checkpoint");
626
627        let num_to_sync = if all_checkpoints {
628            *last_checkpoint
629        } else {
630            end_of_epoch_checkpoint_seq_nums.len() as u64
631        };
632        let sync_progress_bar = m.add(
633            ProgressBar::new(num_to_sync).with_style(
634                ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
635                    .unwrap(),
636            ),
637        );
638
639        let cloned_progress_bar = sync_progress_bar.clone();
640        let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
641        let s_instant = Instant::now();
642
643        let cloned_counter = sync_checkpoint_counter.clone();
644        let latest_synced = checkpoint_store
645            .get_highest_synced_checkpoint()?
646            .map(|c| c.sequence_number)
647            .unwrap_or(0);
648        let s_start = latest_synced
649            .checked_add(1)
650            .context("Checkpoint overflow")
651            .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
652        tokio::spawn(async move {
653            loop {
654                if cloned_progress_bar.is_finished() {
655                    break;
656                }
657                let num_summaries = cloned_counter.load(Ordering::Relaxed);
658                let total_checkpoints_per_sec =
659                    num_summaries as f64 / s_instant.elapsed().as_secs_f64();
660                cloned_progress_bar.set_position(s_start + num_summaries);
661                cloned_progress_bar.set_message(format!(
662                    "checkpoints synced per sec: {}",
663                    total_checkpoints_per_sec
664                ));
665                tokio::time::sleep(Duration::from_secs(1)).await;
666            }
667        });
668
669        if all_checkpoints {
670            archive_reader
671                .read_summaries_for_range_no_verify(
672                    state_sync_store.clone(),
673                    s_start..last_checkpoint + 1,
674                    sync_checkpoint_counter,
675                )
676                .await?;
677        } else {
678            archive_reader
679                .read_summaries_for_list_no_verify(
680                    state_sync_store.clone(),
681                    end_of_epoch_checkpoint_seq_nums.clone(),
682                    sync_checkpoint_counter,
683                )
684                .await?;
685        }
686        sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
687
688        let checkpoint = checkpoint_store
689            .get_checkpoint_by_sequence_number(*last_checkpoint)?
690            .ok_or(anyhow!("Failed to read last checkpoint"))?;
691        if verify {
692            let verify_progress_bar = m.add(
693                ProgressBar::new(num_to_sync).with_style(
694                    ProgressStyle::with_template(
695                        "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
696                    )
697                    .unwrap(),
698                ),
699            );
700            let cloned_verify_progress_bar = verify_progress_bar.clone();
701            let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
702            let cloned_verify_counter = verify_checkpoint_counter.clone();
703            let v_instant = Instant::now();
704
705            tokio::spawn(async move {
706                let v_start = if all_checkpoints { s_start } else { 0 };
707                loop {
708                    if cloned_verify_progress_bar.is_finished() {
709                        break;
710                    }
711                    let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
712                    let total_checkpoints_per_sec =
713                        num_summaries as f64 / v_instant.elapsed().as_secs_f64();
714                    cloned_verify_progress_bar.set_position(v_start + num_summaries);
715                    cloned_verify_progress_bar.set_message(format!(
716                        "checkpoints verified per sec: {}",
717                        total_checkpoints_per_sec
718                    ));
719                    tokio::time::sleep(Duration::from_secs(1)).await;
720                }
721            });
722
723            if all_checkpoints {
724                // in this case we need to verify all the checkpoints in the range pairwise
725                let v_start = s_start;
726                // update highest verified to be highest synced. We will move back
727                // iff parallel verification succeeds
728                let latest_verified = checkpoint_store
729                    .get_checkpoint_by_sequence_number(latest_synced)
730                    .expect("Failed to get checkpoint")
731                    .expect("Expected checkpoint to exist after summary sync");
732                checkpoint_store
733                    .update_highest_verified_checkpoint(&latest_verified)
734                    .expect("Failed to update highest verified checkpoint");
735
736                let verify_range = v_start..last_checkpoint + 1;
737                verify_checkpoint_range(
738                    verify_range,
739                    state_sync_store,
740                    verify_checkpoint_counter,
741                    num_parallel_downloads,
742                )
743                .await;
744            } else {
745                // in this case we only need to verify the end of epoch checkpoints by checking
746                // signatures against the corresponding epoch committee.
747                for (cp_epoch, epoch_last_cp_seq_num) in
748                    end_of_epoch_checkpoint_seq_nums.iter().enumerate()
749                {
750                    let epoch_last_checkpoint = checkpoint_store
751                        .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
752                        .ok_or(anyhow!("Failed to read checkpoint"))?;
753                    let committee = state_sync_store
754                        .get_committee(cp_epoch as u64)
755                        .expect("store operation should not fail")
756                        .expect(
757                            "Expected committee to exist after syncing all end of epoch checkpoints",
758                        );
759                    epoch_last_checkpoint
760                        .verify_authority_signatures(&committee)
761                        .expect("Failed to verify checkpoint");
762                    verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
763                }
764            }
765
766            verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
767        }
768
769        checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
770        checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
771        checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
772        checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
773        Ok::<(), anyhow::Error>(())
774    })
775}
776
777pub async fn get_latest_available_epoch(
778    snapshot_store_config: &ObjectStoreConfig,
779) -> Result<u64, anyhow::Error> {
780    let remote_object_store = if snapshot_store_config.no_sign_request {
781        snapshot_store_config.make_http()?
782    } else {
783        snapshot_store_config.make().map(Arc::new)?
784    };
785    let manifest_contents = remote_object_store
786        .get_bytes(&get_path(MANIFEST_FILENAME))
787        .await?;
788    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
789        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
790    let epoch = root_manifest
791        .available_epochs
792        .iter()
793        .max()
794        .ok_or(anyhow!("No snapshot found in manifest"))?;
795    Ok(*epoch)
796}
797
798pub async fn check_completed_snapshot(
799    snapshot_store_config: &ObjectStoreConfig,
800    epoch: EpochId,
801) -> Result<(), anyhow::Error> {
802    let success_marker = format!("epoch_{}/_SUCCESS", epoch);
803    let remote_object_store = if snapshot_store_config.no_sign_request {
804        snapshot_store_config.make_http()?
805    } else {
806        snapshot_store_config.make().map(Arc::new)?
807    };
808    if exists(&remote_object_store, &get_path(success_marker.as_str())).await {
809        Ok(())
810    } else {
811        Err(anyhow!(
812            "missing success marker at {}/{}",
813            snapshot_store_config.bucket.as_ref().unwrap_or(
814                &snapshot_store_config
815                    .clone()
816                    .aws_endpoint
817                    .unwrap_or("unknown_bucket".to_string())
818            ),
819            success_marker
820        ))
821    }
822}
823
824pub async fn download_formal_snapshot(
825    path: &Path,
826    epoch: EpochId,
827    genesis: &Path,
828    snapshot_store_config: ObjectStoreConfig,
829    archive_store_config: ObjectStoreConfig,
830    num_parallel_downloads: usize,
831    network: Chain,
832    verify: SnapshotVerifyMode,
833    all_checkpoints: bool,
834) -> Result<(), anyhow::Error> {
835    let m = MultiProgress::new();
836    m.println(format!(
837        "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
838        epoch, network, verify,
839    ))?;
840    let path = path.join("staging").to_path_buf();
841    if path.exists() {
842        fs::remove_dir_all(path.clone())?;
843    }
844    let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&path.join("store"), None));
845    let genesis = Genesis::load(genesis).unwrap();
846    let genesis_committee = genesis.committee()?;
847    let committee_store = Arc::new(CommitteeStore::new(
848        path.join("epochs"),
849        &genesis_committee,
850        None,
851    ));
852    let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
853        path.join("checkpoints"),
854        MetricConf::default(),
855        None,
856        None,
857    ));
858
859    let summaries_handle = start_summary_sync(
860        perpetual_db.clone(),
861        committee_store.clone(),
862        checkpoint_store.clone(),
863        m.clone(),
864        genesis.clone(),
865        archive_store_config.clone(),
866        epoch,
867        num_parallel_downloads,
868        verify != SnapshotVerifyMode::None,
869        all_checkpoints,
870    );
871    let (_abort_handle, abort_registration) = AbortHandle::new_pair();
872    let perpetual_db_clone = perpetual_db.clone();
873    let snapshot_dir = path.parent().unwrap().join("snapshot");
874    if snapshot_dir.exists() {
875        fs::remove_dir_all(snapshot_dir.clone())?;
876    }
877    let snapshot_dir_clone = snapshot_dir.clone();
878
879    // TODO if verify is false, we should skip generating these and
880    // not pass in a channel to the reader
881    let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
882    let m_clone = m.clone();
883
884    let snapshot_handle = tokio::spawn(async move {
885        let local_store_config = ObjectStoreConfig {
886            object_store: Some(ObjectStoreType::File),
887            directory: Some(snapshot_dir_clone.to_path_buf()),
888            ..Default::default()
889        };
890        let mut reader = StateSnapshotReaderV1::new(
891            epoch,
892            &snapshot_store_config,
893            &local_store_config,
894            usize::MAX,
895            NonZeroUsize::new(num_parallel_downloads).unwrap(),
896            m_clone,
897        )
898        .await
899        .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
900        reader
901            .read(&perpetual_db_clone, abort_registration, Some(sender))
902            .await
903            .unwrap_or_else(|err| panic!("Failed during read: {}", err));
904        Ok::<(), anyhow::Error>(())
905    });
906    let mut root_accumulator = Accumulator::default();
907    let mut num_live_objects = 0;
908    while let Some((partial_acc, num_objects)) = receiver.recv().await {
909        num_live_objects += num_objects;
910        root_accumulator.union(&partial_acc);
911    }
912    summaries_handle
913        .await
914        .expect("Task join failed")
915        .expect("Summaries task failed");
916
917    let last_checkpoint = checkpoint_store
918        .get_highest_verified_checkpoint()?
919        .expect("Expected nonempty checkpoint store");
920
921    // Perform snapshot state verification
922    if verify != SnapshotVerifyMode::None {
923        assert_eq!(
924            last_checkpoint.epoch(),
925            epoch,
926            "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
927            last_checkpoint.sequence_number,
928            epoch,
929            last_checkpoint.epoch()
930        );
931        let commitment = last_checkpoint
932            .end_of_epoch_data
933            .as_ref()
934            .expect("Expected highest verified checkpoint to have end of epoch data")
935            .epoch_commitments
936            .last()
937            .expect(
938                "End of epoch has no commitments. This likely means that the epoch \
939                you are attempting to restore from does not support end of epoch state \
940                digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
941                and for testnet, `--epoch` must be > 12.",
942            );
943        match commitment {
944            CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
945                let local_digest: ECMHLiveObjectSetDigest = root_accumulator.digest().into();
946                assert_eq!(
947                    *consensus_digest, local_digest,
948                    "End of epoch {} root state digest {} does not match \
949                    local root state hash {} computed from snapshot data",
950                    epoch, consensus_digest.digest, local_digest.digest,
951                );
952                let progress_bar = m.add(
953                    ProgressBar::new(1).with_style(
954                        ProgressStyle::with_template(
955                            "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
956                        )
957                        .unwrap(),
958                    ),
959                );
960                progress_bar.finish_with_message("Verification complete");
961            }
962        };
963    } else {
964        m.println(
965            "WARNING: Skipping snapshot verification! \
966            This is highly discouraged unless you fully trust the source of this snapshot and its contents.
967            If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
968        )?;
969    }
970
971    snapshot_handle
972        .await
973        .expect("Task join failed")
974        .expect("Snapshot restore task failed");
975
976    // TODO we should ensure this map is being updated for all end of epoch
977    // checkpoints during summary sync. This happens in
978    // `insert_{verified|certified}_checkpoint` in checkpoint store, but not in
979    // the corresponding functions in ObjectStore trait
980    checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
981
982    setup_db_state(
983        epoch,
984        root_accumulator.clone(),
985        perpetual_db.clone(),
986        checkpoint_store,
987        committee_store,
988        verify == SnapshotVerifyMode::Strict,
989        num_live_objects,
990        m,
991    )
992    .await?;
993
994    let new_path = path.parent().unwrap().join("live");
995    if new_path.exists() {
996        fs::remove_dir_all(new_path.clone())?;
997    }
998    fs::rename(&path, &new_path)?;
999    fs::remove_dir_all(snapshot_dir.clone())?;
1000    println!(
1001        "Successfully restored state from snapshot at end of epoch {}",
1002        epoch
1003    );
1004
1005    Ok(())
1006}
1007
1008pub async fn download_db_snapshot(
1009    path: &Path,
1010    epoch: u64,
1011    snapshot_store_config: ObjectStoreConfig,
1012    skip_indexes: bool,
1013    num_parallel_downloads: usize,
1014) -> Result<(), anyhow::Error> {
1015    let remote_store = if snapshot_store_config.no_sign_request {
1016        snapshot_store_config.make_http()?
1017    } else {
1018        snapshot_store_config.make().map(Arc::new)?
1019    };
1020
1021    // We rely on the top level MANIFEST file which contains all valid epochs
1022    let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
1023    let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
1024        .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
1025
1026    if !root_manifest.epoch_exists(epoch) {
1027        return Err(anyhow!(
1028            "Epoch dir {} doesn't exist on the remote store",
1029            epoch
1030        ));
1031    }
1032
1033    let epoch_path = format!("epoch_{}", epoch);
1034    let epoch_dir = get_path(&epoch_path);
1035
1036    let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
1037    let epoch_manifest_contents =
1038        String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
1039            .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1040
1041    let epoch_manifest =
1042        PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1043
1044    let mut files: Vec<String> = vec![];
1045    files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1046    files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1047    files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1048    if !skip_indexes {
1049        files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1050    }
1051    let local_store = ObjectStoreConfig {
1052        object_store: Some(ObjectStoreType::File),
1053        directory: Some(path.to_path_buf()),
1054        ..Default::default()
1055    }
1056    .make()?;
1057    let m = MultiProgress::new();
1058    let path = path.to_path_buf();
1059    let snapshot_handle = tokio::spawn(async move {
1060        let progress_bar = m.add(
1061            ProgressBar::new(files.len() as u64).with_style(
1062                ProgressStyle::with_template(
1063                    "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1064                )
1065                .unwrap(),
1066            ),
1067        );
1068        let cloned_progress_bar = progress_bar.clone();
1069        let file_counter = Arc::new(AtomicUsize::new(0));
1070        futures::stream::iter(files.iter())
1071            .map(|file| {
1072                let local_store = local_store.clone();
1073                let remote_store = remote_store.clone();
1074                let counter_cloned = file_counter.clone();
1075                async move {
1076                    counter_cloned.fetch_add(1, Ordering::Relaxed);
1077                    let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1078                    copy_file(&file_path, &file_path, &remote_store, &local_store).await?;
1079                    Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1080                }
1081            })
1082            .boxed()
1083            .buffer_unordered(num_parallel_downloads)
1084            .try_for_each(|path| {
1085                file_counter.fetch_sub(1, Ordering::Relaxed);
1086                cloned_progress_bar.inc(1);
1087                cloned_progress_bar.set_message(format!(
1088                    "Downloading file: {}, #downloads_in_progress: {}",
1089                    path,
1090                    file_counter.load(Ordering::Relaxed)
1091                ));
1092                futures::future::ready(Ok(()))
1093            })
1094            .await?;
1095        progress_bar.finish_with_message("Snapshot file download is complete");
1096        Ok::<(), anyhow::Error>(())
1097    });
1098
1099    let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1100    join_all(tasks)
1101        .await
1102        .into_iter()
1103        .collect::<Result<Vec<_>, _>>()?
1104        .into_iter()
1105        .for_each(|result| result.expect("Task failed"));
1106
1107    let store_dir = path.join("store");
1108    if store_dir.exists() {
1109        fs::remove_dir_all(&store_dir)?;
1110    }
1111    let epochs_dir = path.join("epochs");
1112    if epochs_dir.exists() {
1113        fs::remove_dir_all(&epochs_dir)?;
1114    }
1115    Ok(())
1116}
1117
1118pub async fn verify_archive(
1119    genesis: &Path,
1120    remote_store_config: ObjectStoreConfig,
1121    concurrency: usize,
1122    interactive: bool,
1123) -> Result<()> {
1124    verify_archive_with_genesis_config(genesis, remote_store_config, concurrency, interactive, 10)
1125        .await
1126}
1127
1128pub async fn dump_checkpoints_from_archive(
1129    remote_store_config: ObjectStoreConfig,
1130    start_checkpoint: u64,
1131    end_checkpoint: u64,
1132    max_content_length: usize,
1133) -> Result<()> {
1134    let metrics = ArchiveReaderMetrics::new(&Registry::default());
1135    let config = ArchiveReaderConfig {
1136        remote_store_config,
1137        download_concurrency: NonZeroUsize::new(1).unwrap(),
1138        use_for_pruning_watermark: false,
1139    };
1140    let store = SharedInMemoryStore::default();
1141    let archive_reader = ArchiveReader::new(config, &metrics)?;
1142    archive_reader.sync_manifest_once().await?;
1143    let checkpoint_counter = Arc::new(AtomicU64::new(0));
1144    let txn_counter = Arc::new(AtomicU64::new(0));
1145    archive_reader
1146        .read(
1147            store.clone(),
1148            Range {
1149                start: start_checkpoint,
1150                end: end_checkpoint,
1151            },
1152            txn_counter,
1153            checkpoint_counter,
1154            false,
1155        )
1156        .await?;
1157    for key in store
1158        .inner()
1159        .checkpoints()
1160        .values()
1161        .sorted_by(|a, b| a.sequence_number().cmp(&b.sequence_number))
1162    {
1163        let mut content = serde_json::to_string(
1164            &store
1165                .get_full_checkpoint_contents_by_sequence_number(key.sequence_number)?
1166                .unwrap(),
1167        )?;
1168        content.truncate(max_content_length);
1169        info!(
1170            "{}:{}:{:?}",
1171            key.sequence_number, key.content_digest, content
1172        );
1173    }
1174    Ok(())
1175}
1176
1177pub async fn verify_archive_by_checksum(
1178    remote_store_config: ObjectStoreConfig,
1179    concurrency: usize,
1180) -> Result<()> {
1181    verify_archive_with_checksums(remote_store_config, concurrency).await
1182}