1use std::{
7 collections::BTreeMap,
8 fmt::Write,
9 fs, io,
10 num::NonZeroUsize,
11 ops::Range,
12 path::{Path, PathBuf},
13 sync::{
14 Arc,
15 atomic::{AtomicU64, AtomicUsize, Ordering},
16 },
17 time::Duration,
18};
19
20use anyhow::{Result, anyhow};
21use clap::ValueEnum;
22use eyre::ContextCompat;
23use fastcrypto::{hash::MultisetHash, traits::ToFromBytes};
24use futures::{
25 StreamExt, TryStreamExt,
26 future::{AbortHandle, join_all},
27};
28use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
29use iota_archival::{
30 reader::{ArchiveReader, ArchiveReaderMetrics},
31 verify_archive_with_checksums, verify_archive_with_genesis_config,
32};
33use iota_config::{
34 NodeConfig,
35 genesis::Genesis,
36 node::ArchiveReaderConfig,
37 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
38};
39use iota_core::{
40 authority::{AuthorityStore, authority_store_tables::AuthorityPerpetualTables},
41 authority_client::{AuthorityAPI, NetworkAuthorityClient},
42 checkpoints::CheckpointStore,
43 epoch::committee_store::CommitteeStore,
44 execution_cache::build_execution_cache_from_env,
45 storage::RocksDbStore,
46};
47use iota_network::default_iota_network_config;
48use iota_protocol_config::Chain;
49use iota_sdk::{IotaClient, IotaClientBuilder};
50use iota_snapshot::{reader::StateSnapshotReaderV1, setup_db_state};
51use iota_storage::{
52 object_store::{
53 ObjectStoreGetExt,
54 http::HttpDownloaderBuilder,
55 util::{MANIFEST_FILENAME, Manifest, PerEpochManifest, copy_file, exists, get_path},
56 },
57 verify_checkpoint_range,
58};
59use iota_types::{
60 accumulator::Accumulator,
61 base_types::*,
62 committee::QUORUM_THRESHOLD,
63 crypto::AuthorityPublicKeyBytes,
64 messages_checkpoint::{CheckpointCommitment, ECMHLiveObjectSetDigest},
65 messages_grpc::{
66 LayoutGenerationOption, ObjectInfoRequest, ObjectInfoRequestKind, ObjectInfoResponse,
67 TransactionInfoRequest, TransactionStatus,
68 },
69 multiaddr::Multiaddr,
70 object::Owner,
71 storage::{ReadStore, SharedInMemoryStore},
72};
73use itertools::Itertools;
74use prometheus::Registry;
75use serde::{Deserialize, Serialize};
76use tokio::{sync::mpsc, task::JoinHandle, time::Instant};
77use tracing::info;
78use typed_store::rocks::MetricConf;
79
80pub mod commands;
81pub mod db_tool;
82
83#[derive(
84 Clone, Serialize, Deserialize, Debug, PartialEq, Copy, PartialOrd, Ord, Eq, ValueEnum, Default,
85)]
86pub enum SnapshotVerifyMode {
87 None,
92 #[default]
95 Normal,
96 Strict,
99}
100
101async fn make_clients(
104 iota_client: &Arc<IotaClient>,
105) -> Result<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>> {
106 let mut net_config = default_iota_network_config();
107 net_config.connect_timeout = Some(Duration::from_secs(5));
108 let mut authority_clients = BTreeMap::new();
109
110 let state = iota_client
111 .governance_api()
112 .get_latest_iota_system_state()
113 .await?;
114
115 for committee_member in state.iter_committee_members() {
116 let net_addr = Multiaddr::try_from(committee_member.net_address.clone())?;
117 let channel = net_config
118 .connect_lazy(&net_addr)
119 .map_err(|err| anyhow!(err.to_string()))?;
120 let client = NetworkAuthorityClient::new(channel);
121 let public_key_bytes =
122 AuthorityPublicKeyBytes::from_bytes(&committee_member.authority_pubkey_bytes)?;
123 authority_clients.insert(public_key_bytes, (net_addr.clone(), client));
124 }
125
126 Ok(authority_clients)
127}
128
129type ObjectVersionResponses = (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64);
130pub struct ObjectData {
131 requested_id: ObjectID,
132 responses: Vec<(AuthorityName, Multiaddr, ObjectVersionResponses)>,
133}
134
135trait OptionDebug<T> {
136 fn opt_debug(&self, def_str: &str) -> String;
137}
138
139impl<T> OptionDebug<T> for Option<T>
140where
141 T: std::fmt::Debug,
142{
143 fn opt_debug(&self, def_str: &str) -> String {
144 match self {
145 None => def_str.to_string(),
146 Some(t) => format!("{:?}", t),
147 }
148 }
149}
150
151#[expect(clippy::type_complexity)]
152pub struct GroupedObjectOutput {
153 pub grouped_results: BTreeMap<
154 Option<(
155 Option<SequenceNumber>,
156 ObjectDigest,
157 TransactionDigest,
158 Owner,
159 Option<TransactionDigest>,
160 )>,
161 Vec<AuthorityName>,
162 >,
163 pub voting_power: Vec<(
164 Option<(
165 Option<SequenceNumber>,
166 ObjectDigest,
167 TransactionDigest,
168 Owner,
169 Option<TransactionDigest>,
170 )>,
171 u64,
172 )>,
173 pub available_voting_power: u64,
174 pub fully_locked: bool,
175}
176
177impl GroupedObjectOutput {
178 pub fn new(
179 object_data: ObjectData,
180 committee: Arc<BTreeMap<AuthorityPublicKeyBytes, u64>>,
181 ) -> Self {
182 let mut grouped_results = BTreeMap::new();
183 let mut voting_power = BTreeMap::new();
184 let mut available_voting_power = 0;
185 for (name, _, (version, resp, _elapsed)) in &object_data.responses {
186 let stake = committee.get(name).unwrap();
187 let key = match resp {
188 Ok(r) => {
189 let obj_digest = r.object.compute_object_reference().2;
190 let parent_tx_digest = r.object.previous_transaction;
191 let owner = r.object.owner;
192 let lock = r.lock_for_debugging.as_ref().map(|lock| *lock.digest());
193 if lock.is_none() {
194 available_voting_power += stake;
195 }
196 Some((*version, obj_digest, parent_tx_digest, owner, lock))
197 }
198 Err(_) => None,
199 };
200 let entry = grouped_results.entry(key).or_insert_with(Vec::new);
201 entry.push(*name);
202 let entry: &mut u64 = voting_power.entry(key).or_default();
203 *entry += stake;
204 }
205 let voting_power = voting_power
206 .into_iter()
207 .sorted_by(|(_, v1), (_, v2)| Ord::cmp(v2, v1))
208 .collect::<Vec<_>>();
209 let mut fully_locked = false;
210 if !voting_power.is_empty()
211 && voting_power.first().unwrap().1 + available_voting_power < QUORUM_THRESHOLD
212 {
213 fully_locked = true;
214 }
215 Self {
216 grouped_results,
217 voting_power,
218 available_voting_power,
219 fully_locked,
220 }
221 }
222}
223
224impl std::fmt::Display for GroupedObjectOutput {
225 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
226 writeln!(f, "available stake: {}", self.available_voting_power)?;
227 writeln!(f, "fully locked: {}", self.fully_locked)?;
228 writeln!(f, "{:<100}\n", "-".repeat(100))?;
229 for (key, stake) in &self.voting_power {
230 let val = self.grouped_results.get(key).unwrap();
231 writeln!(f, "total stake: {stake}")?;
232 match key {
233 Some((_version, obj_digest, parent_tx_digest, owner, lock)) => {
234 let lock = lock.opt_debug("no-known-lock");
235 writeln!(f, "obj ref: {obj_digest}")?;
236 writeln!(f, "parent tx: {parent_tx_digest}")?;
237 writeln!(f, "owner: {owner}")?;
238 writeln!(f, "lock: {lock}")?;
239 for (i, name) in val.iter().enumerate() {
240 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
241 }
242 }
243 None => {
244 writeln!(f, "ERROR")?;
245 for (i, name) in val.iter().enumerate() {
246 writeln!(f, " {:<4} {:<20}", i, name.concise(),)?;
247 }
248 }
249 };
250 writeln!(f, "{:<100}\n", "-".repeat(100))?;
251 }
252 Ok(())
253 }
254}
255
256struct ConciseObjectOutput(ObjectData);
257
258impl ConciseObjectOutput {
259 fn header() -> String {
260 format!(
261 "{:<20} {:<8} {:<66} {:<45} {}",
262 "validator", "version", "digest", "parent_cert", "owner"
263 )
264 }
265}
266
267impl std::fmt::Display for ConciseObjectOutput {
268 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
269 for (name, _multi_addr, (version, resp, _time_elapsed)) in &self.0.responses {
270 write!(
271 f,
272 "{:<20} {:<8}",
273 format!("{:?}", name.concise()),
274 version.map(|s| s.value()).opt_debug("-")
275 )?;
276 match resp {
277 Err(_) => writeln!(
278 f,
279 "{:<66} {:<45} {:<51}",
280 "object-fetch-failed", "no-cert-available", "no-owner-available"
281 )?,
282 Ok(resp) => {
283 let obj_digest = resp.object.compute_object_reference().2;
284 let parent = resp.object.previous_transaction;
285 let owner = resp.object.owner;
286 write!(f, " {:<66} {:<45} {:<51}", obj_digest, parent, owner)?;
287 }
288 }
289 writeln!(f)?;
290 }
291 Ok(())
292 }
293}
294
295struct VerboseObjectOutput(ObjectData);
296
297impl std::fmt::Display for VerboseObjectOutput {
298 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
299 writeln!(f, "Object: {}", self.0.requested_id)?;
300
301 for (name, multiaddr, (version, resp, timespent)) in &self.0.responses {
302 writeln!(f, "validator: {:?}, addr: {:?}", name.concise(), multiaddr)?;
303 writeln!(
304 f,
305 "-- version: {} ({:.3}s)",
306 version.opt_debug("<version not available>"),
307 timespent,
308 )?;
309
310 match resp {
311 Err(e) => writeln!(f, "Error fetching object: {}", e)?,
312 Ok(resp) => {
313 writeln!(
314 f,
315 " -- object digest: {}",
316 resp.object.compute_object_reference().2
317 )?;
318 if resp.object.is_package() {
319 writeln!(f, " -- object: <Move Package>")?;
320 } else if let Some(layout) = &resp.layout {
321 writeln!(
322 f,
323 " -- object: Move Object: {}",
324 resp.object
325 .data
326 .try_as_move()
327 .unwrap()
328 .to_move_struct(layout)
329 .unwrap()
330 )?;
331 }
332 writeln!(f, " -- owner: {}", resp.object.owner)?;
333 writeln!(
334 f,
335 " -- locked by: {}",
336 resp.lock_for_debugging.opt_debug("<not locked>")
337 )?;
338 }
339 }
340 }
341 Ok(())
342 }
343}
344
345pub async fn get_object(
346 obj_id: ObjectID,
347 version: Option<u64>,
348 validator: Option<AuthorityName>,
349 clients: Arc<BTreeMap<AuthorityName, (Multiaddr, NetworkAuthorityClient)>>,
350) -> Result<ObjectData> {
351 let responses = join_all(
352 clients
353 .iter()
354 .filter(|(name, _)| {
355 if let Some(v) = validator {
356 v == **name
357 } else {
358 true
359 }
360 })
361 .map(|(name, (address, client))| async {
362 let object_version = get_object_impl(client, obj_id, version).await;
363 (*name, address.clone(), object_version)
364 }),
365 )
366 .await;
367
368 Ok(ObjectData {
369 requested_id: obj_id,
370 responses,
371 })
372}
373
374pub async fn get_transaction_block(
375 tx_digest: TransactionDigest,
376 show_input_tx: bool,
377 fullnode_rpc: String,
378) -> Result<String> {
379 let iota_client = Arc::new(IotaClientBuilder::default().build(fullnode_rpc).await?);
380 let clients = make_clients(&iota_client).await?;
381 let timer = Instant::now();
382 let responses = join_all(clients.iter().map(|(name, (address, client))| async {
383 let result = client
384 .handle_transaction_info_request(TransactionInfoRequest {
385 transaction_digest: tx_digest,
386 })
387 .await;
388 (
389 *name,
390 address.clone(),
391 result,
392 timer.elapsed().as_secs_f64(),
393 )
394 }))
395 .await;
396
397 let validator_aware_of_tx = responses.iter().find(|r| r.2.is_ok());
399
400 let responses = responses
401 .iter()
402 .map(|r| {
403 let key =
404 r.2.as_ref()
405 .map(|ok_result| match &ok_result.status {
406 TransactionStatus::Signed(_) => None,
407 TransactionStatus::Executed(_, effects, _) => Some(effects.digest()),
408 })
409 .ok();
410 let err = r.2.as_ref().err();
411 (key, err, r)
412 })
413 .sorted_by(|(k1, err1, _), (k2, err2, _)| {
414 Ord::cmp(k1, k2).then_with(|| Ord::cmp(err1, err2))
415 })
416 .chunk_by(|(_, _err, r)| {
417 r.2.as_ref().map(|ok_result| match &ok_result.status {
418 TransactionStatus::Signed(_) => None,
419 TransactionStatus::Executed(_, effects, _) => Some((
420 ok_result.transaction.transaction_data(),
421 effects.data(),
422 effects.digest(),
423 )),
424 })
425 });
426 let mut s = String::new();
427 for (i, (key, group)) in responses.into_iter().enumerate() {
428 match key {
429 Ok(Some((tx, effects, effects_digest))) => {
430 writeln!(
431 &mut s,
432 "#{:<2} tx_digest: {:<68?} effects_digest: {:?}",
433 i, tx_digest, effects_digest,
434 )?;
435 writeln!(&mut s, "{:#?}", effects)?;
436 if show_input_tx {
437 writeln!(&mut s, "{:#?}", tx)?;
438 }
439 }
440 Ok(None) => {
441 writeln!(
442 &mut s,
443 "#{:<2} tx_digest: {:<68?} Signed but not executed",
444 i, tx_digest
445 )?;
446 if show_input_tx {
447 let validator_aware_of_tx = validator_aware_of_tx.unwrap();
449 let client = &clients.get(&validator_aware_of_tx.0).unwrap().1;
450 let tx_info = client.handle_transaction_info_request(TransactionInfoRequest {
451 transaction_digest: tx_digest,
452 }).await.unwrap_or_else(|e| panic!("Validator {:?} should have known about tx_digest: {:?}, got error: {:?}", validator_aware_of_tx.0, tx_digest, e));
453 writeln!(&mut s, "{:#?}", tx_info)?;
454 }
455 }
456 other => {
457 writeln!(&mut s, "#{:<2} {:#?}", i, other)?;
458 }
459 }
460 for (j, (_, _, res)) in group.enumerate() {
461 writeln!(
462 &mut s,
463 " {:<4} {:<20} {:<56} ({:.3}s)",
464 j,
465 res.0.concise(),
466 format!("{}", res.1),
467 res.3
468 )?;
469 }
470 writeln!(&mut s, "{:<100}\n", "-".repeat(100))?;
471 }
472 Ok(s)
473}
474
475async fn get_object_impl(
476 client: &NetworkAuthorityClient,
477 id: ObjectID,
478 version: Option<u64>,
479) -> (Option<SequenceNumber>, Result<ObjectInfoResponse>, f64) {
480 let start = Instant::now();
481 let resp = client
482 .handle_object_info_request(ObjectInfoRequest {
483 object_id: id,
484 generate_layout: LayoutGenerationOption::Generate,
485 request_kind: match version {
486 None => ObjectInfoRequestKind::LatestObjectInfo,
487 Some(v) => ObjectInfoRequestKind::PastObjectInfoDebug(SequenceNumber::from_u64(v)),
488 },
489 })
490 .await
491 .map_err(anyhow::Error::from);
492 let elapsed = start.elapsed().as_secs_f64();
493
494 let resp_version = resp.as_ref().ok().map(|r| r.object.version().value());
495 (resp_version.map(SequenceNumber::from), resp, elapsed)
496}
497
498pub(crate) fn make_anemo_config() -> anemo_cli::Config {
499 use iota_network::{discovery::*, state_sync::*};
500
501 anemo_cli::Config::new()
503 .add_service(
505 "Discovery",
506 anemo_cli::ServiceInfo::new().add_method(
507 "GetKnownPeers",
508 anemo_cli::ron_method!(DiscoveryClient, get_known_peers, ()),
509 ),
510 )
511 .add_service(
513 "StateSync",
514 anemo_cli::ServiceInfo::new()
515 .add_method(
516 "PushCheckpointSummary",
517 anemo_cli::ron_method!(
518 StateSyncClient,
519 push_checkpoint_summary,
520 iota_types::messages_checkpoint::CertifiedCheckpointSummary
521 ),
522 )
523 .add_method(
524 "GetCheckpointSummary",
525 anemo_cli::ron_method!(
526 StateSyncClient,
527 get_checkpoint_summary,
528 GetCheckpointSummaryRequest
529 ),
530 )
531 .add_method(
532 "GetCheckpointContents",
533 anemo_cli::ron_method!(
534 StateSyncClient,
535 get_checkpoint_contents,
536 iota_types::messages_checkpoint::CheckpointContentsDigest
537 ),
538 )
539 .add_method(
540 "GetCheckpointAvailability",
541 anemo_cli::ron_method!(StateSyncClient, get_checkpoint_availability, ()),
542 ),
543 )
544}
545
546fn copy_dir_all(
547 src: impl AsRef<Path>,
548 dst: impl AsRef<Path>,
549 skip: Vec<PathBuf>,
550) -> io::Result<()> {
551 fs::create_dir_all(&dst)?;
552 for entry in fs::read_dir(src)? {
553 let entry = entry?;
554 let ty = entry.file_type()?;
555 if skip.contains(&entry.path()) {
556 continue;
557 }
558 if ty.is_dir() {
559 copy_dir_all(
560 entry.path(),
561 dst.as_ref().join(entry.file_name()),
562 skip.clone(),
563 )?;
564 } else {
565 fs::copy(entry.path(), dst.as_ref().join(entry.file_name()))?;
566 }
567 }
568 Ok(())
569}
570
571pub async fn restore_from_db_checkpoint(
572 config: &NodeConfig,
573 db_checkpoint_path: &Path,
574) -> Result<(), anyhow::Error> {
575 copy_dir_all(db_checkpoint_path, config.db_path(), vec![])?;
576 Ok(())
577}
578
579fn start_summary_sync(
580 perpetual_db: Arc<AuthorityPerpetualTables>,
581 committee_store: Arc<CommitteeStore>,
582 checkpoint_store: Arc<CheckpointStore>,
583 m: MultiProgress,
584 genesis: Genesis,
585 archive_store_config: ObjectStoreConfig,
586 epoch: u64,
587 num_parallel_downloads: usize,
588 verify: bool,
589 all_checkpoints: bool,
590) -> JoinHandle<Result<(), anyhow::Error>> {
591 tokio::spawn(async move {
592 info!("Starting summary sync");
593 let store =
594 AuthorityStore::open_no_genesis(perpetual_db, usize::MAX, false, &Registry::default())?;
595 let cache_traits = build_execution_cache_from_env(&Registry::default(), &store);
596 let state_sync_store =
597 RocksDbStore::new(cache_traits, committee_store, checkpoint_store.clone());
598 if checkpoint_store
601 .get_checkpoint_by_digest(genesis.checkpoint().digest())
602 .unwrap()
603 .is_none()
604 {
605 checkpoint_store.insert_checkpoint_contents(genesis.checkpoint_contents().clone())?;
606 checkpoint_store.insert_verified_checkpoint(&genesis.checkpoint())?;
607 checkpoint_store.update_highest_synced_checkpoint(&genesis.checkpoint())?;
608 }
609 let config = ArchiveReaderConfig {
611 remote_store_config: archive_store_config,
612 download_concurrency: NonZeroUsize::new(num_parallel_downloads).unwrap(),
613 use_for_pruning_watermark: false,
614 };
615 let metrics = ArchiveReaderMetrics::new(&Registry::default());
616 let archive_reader = ArchiveReader::new(config, &metrics)?;
617 archive_reader.sync_manifest_once().await?;
618 let manifest = archive_reader.get_manifest().await?;
619
620 let end_of_epoch_checkpoint_seq_nums = (0..=epoch)
621 .map(|e| manifest.next_checkpoint_after_epoch(e) - 1)
622 .collect::<Vec<_>>();
623 let last_checkpoint = end_of_epoch_checkpoint_seq_nums
624 .last()
625 .expect("Expected at least one checkpoint");
626
627 let num_to_sync = if all_checkpoints {
628 *last_checkpoint
629 } else {
630 end_of_epoch_checkpoint_seq_nums.len() as u64
631 };
632 let sync_progress_bar = m.add(
633 ProgressBar::new(num_to_sync).with_style(
634 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})")
635 .unwrap(),
636 ),
637 );
638
639 let cloned_progress_bar = sync_progress_bar.clone();
640 let sync_checkpoint_counter = Arc::new(AtomicU64::new(0));
641 let s_instant = Instant::now();
642
643 let cloned_counter = sync_checkpoint_counter.clone();
644 let latest_synced = checkpoint_store
645 .get_highest_synced_checkpoint()?
646 .map(|c| c.sequence_number)
647 .unwrap_or(0);
648 let s_start = latest_synced
649 .checked_add(1)
650 .context("Checkpoint overflow")
651 .map_err(|_| anyhow!("Failed to increment checkpoint"))?;
652 tokio::spawn(async move {
653 loop {
654 if cloned_progress_bar.is_finished() {
655 break;
656 }
657 let num_summaries = cloned_counter.load(Ordering::Relaxed);
658 let total_checkpoints_per_sec =
659 num_summaries as f64 / s_instant.elapsed().as_secs_f64();
660 cloned_progress_bar.set_position(s_start + num_summaries);
661 cloned_progress_bar.set_message(format!(
662 "checkpoints synced per sec: {}",
663 total_checkpoints_per_sec
664 ));
665 tokio::time::sleep(Duration::from_secs(1)).await;
666 }
667 });
668
669 if all_checkpoints {
670 archive_reader
671 .read_summaries_for_range_no_verify(
672 state_sync_store.clone(),
673 s_start..last_checkpoint + 1,
674 sync_checkpoint_counter,
675 )
676 .await?;
677 } else {
678 archive_reader
679 .read_summaries_for_list_no_verify(
680 state_sync_store.clone(),
681 end_of_epoch_checkpoint_seq_nums.clone(),
682 sync_checkpoint_counter,
683 )
684 .await?;
685 }
686 sync_progress_bar.finish_with_message("Checkpoint summary sync is complete");
687
688 let checkpoint = checkpoint_store
689 .get_checkpoint_by_sequence_number(*last_checkpoint)?
690 .ok_or(anyhow!("Failed to read last checkpoint"))?;
691 if verify {
692 let verify_progress_bar = m.add(
693 ProgressBar::new(num_to_sync).with_style(
694 ProgressStyle::with_template(
695 "[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})",
696 )
697 .unwrap(),
698 ),
699 );
700 let cloned_verify_progress_bar = verify_progress_bar.clone();
701 let verify_checkpoint_counter = Arc::new(AtomicU64::new(0));
702 let cloned_verify_counter = verify_checkpoint_counter.clone();
703 let v_instant = Instant::now();
704
705 tokio::spawn(async move {
706 let v_start = if all_checkpoints { s_start } else { 0 };
707 loop {
708 if cloned_verify_progress_bar.is_finished() {
709 break;
710 }
711 let num_summaries = cloned_verify_counter.load(Ordering::Relaxed);
712 let total_checkpoints_per_sec =
713 num_summaries as f64 / v_instant.elapsed().as_secs_f64();
714 cloned_verify_progress_bar.set_position(v_start + num_summaries);
715 cloned_verify_progress_bar.set_message(format!(
716 "checkpoints verified per sec: {}",
717 total_checkpoints_per_sec
718 ));
719 tokio::time::sleep(Duration::from_secs(1)).await;
720 }
721 });
722
723 if all_checkpoints {
724 let v_start = s_start;
726 let latest_verified = checkpoint_store
729 .get_checkpoint_by_sequence_number(latest_synced)
730 .expect("Failed to get checkpoint")
731 .expect("Expected checkpoint to exist after summary sync");
732 checkpoint_store
733 .update_highest_verified_checkpoint(&latest_verified)
734 .expect("Failed to update highest verified checkpoint");
735
736 let verify_range = v_start..last_checkpoint + 1;
737 verify_checkpoint_range(
738 verify_range,
739 state_sync_store,
740 verify_checkpoint_counter,
741 num_parallel_downloads,
742 )
743 .await;
744 } else {
745 for (cp_epoch, epoch_last_cp_seq_num) in
748 end_of_epoch_checkpoint_seq_nums.iter().enumerate()
749 {
750 let epoch_last_checkpoint = checkpoint_store
751 .get_checkpoint_by_sequence_number(*epoch_last_cp_seq_num)?
752 .ok_or(anyhow!("Failed to read checkpoint"))?;
753 let committee = state_sync_store
754 .get_committee(cp_epoch as u64)
755 .expect("store operation should not fail")
756 .expect(
757 "Expected committee to exist after syncing all end of epoch checkpoints",
758 );
759 epoch_last_checkpoint
760 .verify_authority_signatures(&committee)
761 .expect("Failed to verify checkpoint");
762 verify_checkpoint_counter.fetch_add(1, Ordering::Relaxed);
763 }
764 }
765
766 verify_progress_bar.finish_with_message("Checkpoint summary verification is complete");
767 }
768
769 checkpoint_store.update_highest_verified_checkpoint(&checkpoint)?;
770 checkpoint_store.update_highest_synced_checkpoint(&checkpoint)?;
771 checkpoint_store.update_highest_executed_checkpoint(&checkpoint)?;
772 checkpoint_store.update_highest_pruned_checkpoint(&checkpoint)?;
773 Ok::<(), anyhow::Error>(())
774 })
775}
776
777pub async fn get_latest_available_epoch(
778 snapshot_store_config: &ObjectStoreConfig,
779) -> Result<u64, anyhow::Error> {
780 let remote_object_store = if snapshot_store_config.no_sign_request {
781 snapshot_store_config.make_http()?
782 } else {
783 snapshot_store_config.make().map(Arc::new)?
784 };
785 let manifest_contents = remote_object_store
786 .get_bytes(&get_path(MANIFEST_FILENAME))
787 .await?;
788 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
789 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
790 let epoch = root_manifest
791 .available_epochs
792 .iter()
793 .max()
794 .ok_or(anyhow!("No snapshot found in manifest"))?;
795 Ok(*epoch)
796}
797
798pub async fn check_completed_snapshot(
799 snapshot_store_config: &ObjectStoreConfig,
800 epoch: EpochId,
801) -> Result<(), anyhow::Error> {
802 let success_marker = format!("epoch_{}/_SUCCESS", epoch);
803 let remote_object_store = if snapshot_store_config.no_sign_request {
804 snapshot_store_config.make_http()?
805 } else {
806 snapshot_store_config.make().map(Arc::new)?
807 };
808 if exists(&remote_object_store, &get_path(success_marker.as_str())).await {
809 Ok(())
810 } else {
811 Err(anyhow!(
812 "missing success marker at {}/{}",
813 snapshot_store_config.bucket.as_ref().unwrap_or(
814 &snapshot_store_config
815 .clone()
816 .aws_endpoint
817 .unwrap_or("unknown_bucket".to_string())
818 ),
819 success_marker
820 ))
821 }
822}
823
824pub async fn download_formal_snapshot(
825 path: &Path,
826 epoch: EpochId,
827 genesis: &Path,
828 snapshot_store_config: ObjectStoreConfig,
829 archive_store_config: ObjectStoreConfig,
830 num_parallel_downloads: usize,
831 network: Chain,
832 verify: SnapshotVerifyMode,
833 all_checkpoints: bool,
834) -> Result<(), anyhow::Error> {
835 let m = MultiProgress::new();
836 m.println(format!(
837 "Beginning formal snapshot restore to end of epoch {}, network: {:?}, verification mode: {:?}",
838 epoch, network, verify,
839 ))?;
840 let path = path.join("staging").to_path_buf();
841 if path.exists() {
842 fs::remove_dir_all(path.clone())?;
843 }
844 let perpetual_db = Arc::new(AuthorityPerpetualTables::open(&path.join("store"), None));
845 let genesis = Genesis::load(genesis).unwrap();
846 let genesis_committee = genesis.committee()?;
847 let committee_store = Arc::new(CommitteeStore::new(
848 path.join("epochs"),
849 &genesis_committee,
850 None,
851 ));
852 let checkpoint_store = Arc::new(CheckpointStore::open_tables_read_write(
853 path.join("checkpoints"),
854 MetricConf::default(),
855 None,
856 None,
857 ));
858
859 let summaries_handle = start_summary_sync(
860 perpetual_db.clone(),
861 committee_store.clone(),
862 checkpoint_store.clone(),
863 m.clone(),
864 genesis.clone(),
865 archive_store_config.clone(),
866 epoch,
867 num_parallel_downloads,
868 verify != SnapshotVerifyMode::None,
869 all_checkpoints,
870 );
871 let (_abort_handle, abort_registration) = AbortHandle::new_pair();
872 let perpetual_db_clone = perpetual_db.clone();
873 let snapshot_dir = path.parent().unwrap().join("snapshot");
874 if snapshot_dir.exists() {
875 fs::remove_dir_all(snapshot_dir.clone())?;
876 }
877 let snapshot_dir_clone = snapshot_dir.clone();
878
879 let (sender, mut receiver) = mpsc::channel(num_parallel_downloads);
882 let m_clone = m.clone();
883
884 let snapshot_handle = tokio::spawn(async move {
885 let local_store_config = ObjectStoreConfig {
886 object_store: Some(ObjectStoreType::File),
887 directory: Some(snapshot_dir_clone.to_path_buf()),
888 ..Default::default()
889 };
890 let mut reader = StateSnapshotReaderV1::new(
891 epoch,
892 &snapshot_store_config,
893 &local_store_config,
894 usize::MAX,
895 NonZeroUsize::new(num_parallel_downloads).unwrap(),
896 m_clone,
897 )
898 .await
899 .unwrap_or_else(|err| panic!("Failed to create reader: {}", err));
900 reader
901 .read(&perpetual_db_clone, abort_registration, Some(sender))
902 .await
903 .unwrap_or_else(|err| panic!("Failed during read: {}", err));
904 Ok::<(), anyhow::Error>(())
905 });
906 let mut root_accumulator = Accumulator::default();
907 let mut num_live_objects = 0;
908 while let Some((partial_acc, num_objects)) = receiver.recv().await {
909 num_live_objects += num_objects;
910 root_accumulator.union(&partial_acc);
911 }
912 summaries_handle
913 .await
914 .expect("Task join failed")
915 .expect("Summaries task failed");
916
917 let last_checkpoint = checkpoint_store
918 .get_highest_verified_checkpoint()?
919 .expect("Expected nonempty checkpoint store");
920
921 if verify != SnapshotVerifyMode::None {
923 assert_eq!(
924 last_checkpoint.epoch(),
925 epoch,
926 "Expected highest verified checkpoint ({}) to be for epoch {} but was for epoch {}",
927 last_checkpoint.sequence_number,
928 epoch,
929 last_checkpoint.epoch()
930 );
931 let commitment = last_checkpoint
932 .end_of_epoch_data
933 .as_ref()
934 .expect("Expected highest verified checkpoint to have end of epoch data")
935 .epoch_commitments
936 .last()
937 .expect(
938 "End of epoch has no commitments. This likely means that the epoch \
939 you are attempting to restore from does not support end of epoch state \
940 digest commitment. If restoring from mainnet, `--epoch` must be > 20, \
941 and for testnet, `--epoch` must be > 12.",
942 );
943 match commitment {
944 CheckpointCommitment::ECMHLiveObjectSetDigest(consensus_digest) => {
945 let local_digest: ECMHLiveObjectSetDigest = root_accumulator.digest().into();
946 assert_eq!(
947 *consensus_digest, local_digest,
948 "End of epoch {} root state digest {} does not match \
949 local root state hash {} computed from snapshot data",
950 epoch, consensus_digest.digest, local_digest.digest,
951 );
952 let progress_bar = m.add(
953 ProgressBar::new(1).with_style(
954 ProgressStyle::with_template(
955 "[{elapsed_precise}] {wide_bar} Verifying snapshot contents against root state hash ({msg})",
956 )
957 .unwrap(),
958 ),
959 );
960 progress_bar.finish_with_message("Verification complete");
961 }
962 };
963 } else {
964 m.println(
965 "WARNING: Skipping snapshot verification! \
966 This is highly discouraged unless you fully trust the source of this snapshot and its contents.
967 If this was unintentional, rerun with `--verify` set to `normal` or `strict`.",
968 )?;
969 }
970
971 snapshot_handle
972 .await
973 .expect("Task join failed")
974 .expect("Snapshot restore task failed");
975
976 checkpoint_store.insert_epoch_last_checkpoint(epoch, &last_checkpoint)?;
981
982 setup_db_state(
983 epoch,
984 root_accumulator.clone(),
985 perpetual_db.clone(),
986 checkpoint_store,
987 committee_store,
988 verify == SnapshotVerifyMode::Strict,
989 num_live_objects,
990 m,
991 )
992 .await?;
993
994 let new_path = path.parent().unwrap().join("live");
995 if new_path.exists() {
996 fs::remove_dir_all(new_path.clone())?;
997 }
998 fs::rename(&path, &new_path)?;
999 fs::remove_dir_all(snapshot_dir.clone())?;
1000 println!(
1001 "Successfully restored state from snapshot at end of epoch {}",
1002 epoch
1003 );
1004
1005 Ok(())
1006}
1007
1008pub async fn download_db_snapshot(
1009 path: &Path,
1010 epoch: u64,
1011 snapshot_store_config: ObjectStoreConfig,
1012 skip_indexes: bool,
1013 num_parallel_downloads: usize,
1014) -> Result<(), anyhow::Error> {
1015 let remote_store = if snapshot_store_config.no_sign_request {
1016 snapshot_store_config.make_http()?
1017 } else {
1018 snapshot_store_config.make().map(Arc::new)?
1019 };
1020
1021 let manifest_contents = remote_store.get_bytes(&get_path(MANIFEST_FILENAME)).await?;
1023 let root_manifest: Manifest = serde_json::from_slice(&manifest_contents)
1024 .map_err(|err| anyhow!("Error parsing MANIFEST from bytes: {}", err))?;
1025
1026 if !root_manifest.epoch_exists(epoch) {
1027 return Err(anyhow!(
1028 "Epoch dir {} doesn't exist on the remote store",
1029 epoch
1030 ));
1031 }
1032
1033 let epoch_path = format!("epoch_{}", epoch);
1034 let epoch_dir = get_path(&epoch_path);
1035
1036 let manifest_file = epoch_dir.child(MANIFEST_FILENAME);
1037 let epoch_manifest_contents =
1038 String::from_utf8(remote_store.get_bytes(&manifest_file).await?.to_vec())
1039 .map_err(|err| anyhow!("Error parsing {}/MANIFEST from bytes: {}", epoch_path, err))?;
1040
1041 let epoch_manifest =
1042 PerEpochManifest::deserialize_from_newline_delimited(&epoch_manifest_contents);
1043
1044 let mut files: Vec<String> = vec![];
1045 files.extend(epoch_manifest.filter_by_prefix("store/perpetual").lines);
1046 files.extend(epoch_manifest.filter_by_prefix("epochs").lines);
1047 files.extend(epoch_manifest.filter_by_prefix("checkpoints").lines);
1048 if !skip_indexes {
1049 files.extend(epoch_manifest.filter_by_prefix("indexes").lines)
1050 }
1051 let local_store = ObjectStoreConfig {
1052 object_store: Some(ObjectStoreType::File),
1053 directory: Some(path.to_path_buf()),
1054 ..Default::default()
1055 }
1056 .make()?;
1057 let m = MultiProgress::new();
1058 let path = path.to_path_buf();
1059 let snapshot_handle = tokio::spawn(async move {
1060 let progress_bar = m.add(
1061 ProgressBar::new(files.len() as u64).with_style(
1062 ProgressStyle::with_template(
1063 "[{elapsed_precise}] {wide_bar} {pos} out of {len} files done ({msg})",
1064 )
1065 .unwrap(),
1066 ),
1067 );
1068 let cloned_progress_bar = progress_bar.clone();
1069 let file_counter = Arc::new(AtomicUsize::new(0));
1070 futures::stream::iter(files.iter())
1071 .map(|file| {
1072 let local_store = local_store.clone();
1073 let remote_store = remote_store.clone();
1074 let counter_cloned = file_counter.clone();
1075 async move {
1076 counter_cloned.fetch_add(1, Ordering::Relaxed);
1077 let file_path = get_path(format!("epoch_{}/{}", epoch, file).as_str());
1078 copy_file(&file_path, &file_path, &remote_store, &local_store).await?;
1079 Ok::<::object_store::path::Path, anyhow::Error>(file_path.clone())
1080 }
1081 })
1082 .boxed()
1083 .buffer_unordered(num_parallel_downloads)
1084 .try_for_each(|path| {
1085 file_counter.fetch_sub(1, Ordering::Relaxed);
1086 cloned_progress_bar.inc(1);
1087 cloned_progress_bar.set_message(format!(
1088 "Downloading file: {}, #downloads_in_progress: {}",
1089 path,
1090 file_counter.load(Ordering::Relaxed)
1091 ));
1092 futures::future::ready(Ok(()))
1093 })
1094 .await?;
1095 progress_bar.finish_with_message("Snapshot file download is complete");
1096 Ok::<(), anyhow::Error>(())
1097 });
1098
1099 let tasks: Vec<_> = vec![Box::pin(snapshot_handle)];
1100 join_all(tasks)
1101 .await
1102 .into_iter()
1103 .collect::<Result<Vec<_>, _>>()?
1104 .into_iter()
1105 .for_each(|result| result.expect("Task failed"));
1106
1107 let store_dir = path.join("store");
1108 if store_dir.exists() {
1109 fs::remove_dir_all(&store_dir)?;
1110 }
1111 let epochs_dir = path.join("epochs");
1112 if epochs_dir.exists() {
1113 fs::remove_dir_all(&epochs_dir)?;
1114 }
1115 Ok(())
1116}
1117
1118pub async fn verify_archive(
1119 genesis: &Path,
1120 remote_store_config: ObjectStoreConfig,
1121 concurrency: usize,
1122 interactive: bool,
1123) -> Result<()> {
1124 verify_archive_with_genesis_config(genesis, remote_store_config, concurrency, interactive, 10)
1125 .await
1126}
1127
1128pub async fn dump_checkpoints_from_archive(
1129 remote_store_config: ObjectStoreConfig,
1130 start_checkpoint: u64,
1131 end_checkpoint: u64,
1132 max_content_length: usize,
1133) -> Result<()> {
1134 let metrics = ArchiveReaderMetrics::new(&Registry::default());
1135 let config = ArchiveReaderConfig {
1136 remote_store_config,
1137 download_concurrency: NonZeroUsize::new(1).unwrap(),
1138 use_for_pruning_watermark: false,
1139 };
1140 let store = SharedInMemoryStore::default();
1141 let archive_reader = ArchiveReader::new(config, &metrics)?;
1142 archive_reader.sync_manifest_once().await?;
1143 let checkpoint_counter = Arc::new(AtomicU64::new(0));
1144 let txn_counter = Arc::new(AtomicU64::new(0));
1145 archive_reader
1146 .read(
1147 store.clone(),
1148 Range {
1149 start: start_checkpoint,
1150 end: end_checkpoint,
1151 },
1152 txn_counter,
1153 checkpoint_counter,
1154 false,
1155 )
1156 .await?;
1157 for key in store
1158 .inner()
1159 .checkpoints()
1160 .values()
1161 .sorted_by(|a, b| a.sequence_number().cmp(&b.sequence_number))
1162 {
1163 let mut content = serde_json::to_string(
1164 &store
1165 .get_full_checkpoint_contents_by_sequence_number(key.sequence_number)?
1166 .unwrap(),
1167 )?;
1168 content.truncate(max_content_length);
1169 info!(
1170 "{}:{}:{:?}",
1171 key.sequence_number, key.content_digest, content
1172 );
1173 }
1174 Ok(())
1175}
1176
1177pub async fn verify_archive_by_checksum(
1178 remote_store_config: ObjectStoreConfig,
1179 concurrency: usize,
1180) -> Result<()> {
1181 verify_archive_with_checksums(remote_store_config, concurrency).await
1182}