1#![allow(dead_code)]
6
7pub mod reader;
8pub mod writer;
9
10#[cfg(test)]
11mod tests;
12
13use std::{
14 fs,
15 io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
16 num::NonZeroUsize,
17 ops::Range,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25use anyhow::{Result, anyhow};
26use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
27use bytes::Bytes;
28use fastcrypto::hash::{HashFunction, Sha3_256};
29use indicatif::{ProgressBar, ProgressStyle};
30use iota_config::{
31 genesis::Genesis, node::ArchiveReaderConfig, object_storage_config::ObjectStoreConfig,
32};
33use iota_storage::{
34 SHA3_BYTES,
35 blob::{Blob, BlobEncoding},
36 compute_sha3_checksum, compute_sha3_checksum_for_bytes,
37 object_store::{
38 ObjectStoreGetExt, ObjectStorePutExt,
39 util::{get, put},
40 },
41};
42use iota_types::{
43 base_types::ExecutionData,
44 messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents},
45 storage::{SingleCheckpointSharedInMemoryStore, WriteStore},
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use prometheus::Registry;
50use serde::{Deserialize, Serialize};
51use tracing::{error, info};
52
53use crate::reader::{ArchiveReader, ArchiveReaderMetrics};
54
55#[expect(rustdoc::invalid_html_tags)]
56pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
111pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
112const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
113const MAGIC_BYTES: usize = 4;
114const CHECKPOINT_FILE_SUFFIX: &str = "chk";
115const SUMMARY_FILE_SUFFIX: &str = "sum";
116const EPOCH_DIR_PREFIX: &str = "epoch_";
117const MANIFEST_FILENAME: &str = "MANIFEST";
118
119#[derive(
120 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
121)]
122#[repr(u8)]
123pub enum FileType {
124 CheckpointContent = 0,
125 CheckpointSummary,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub struct FileMetadata {
130 pub file_type: FileType,
131 pub epoch_num: u64,
132 pub checkpoint_seq_range: Range<u64>,
133 pub sha3_digest: [u8; 32],
134}
135
136impl FileMetadata {
137 pub fn file_path(&self) -> Path {
138 let dir_path = Path::from(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num));
139 match self.file_type {
140 FileType::CheckpointContent => dir_path.child(&*format!(
141 "{}.{CHECKPOINT_FILE_SUFFIX}",
142 self.checkpoint_seq_range.start
143 )),
144 FileType::CheckpointSummary => dir_path.child(&*format!(
145 "{}.{SUMMARY_FILE_SUFFIX}",
146 self.checkpoint_seq_range.start
147 )),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
153pub struct ManifestV1 {
154 pub archive_version: u8,
155 pub next_checkpoint_seq_num: u64,
156 pub file_metadata: Vec<FileMetadata>,
157 pub epoch: u64,
158}
159
160#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
161pub enum Manifest {
162 V1(ManifestV1),
163}
164
165impl Manifest {
166 pub fn new(epoch: u64, next_checkpoint_seq_num: u64) -> Self {
167 Manifest::V1(ManifestV1 {
168 archive_version: 1,
169 next_checkpoint_seq_num,
170 file_metadata: vec![],
171 epoch,
172 })
173 }
174 pub fn files(&self) -> Vec<FileMetadata> {
175 match self {
176 Manifest::V1(manifest) => manifest.file_metadata.clone(),
177 }
178 }
179 pub fn epoch_num(&self) -> u64 {
180 match self {
181 Manifest::V1(manifest) => manifest.epoch,
182 }
183 }
184 pub fn next_checkpoint_seq_num(&self) -> u64 {
185 match self {
186 Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
187 }
188 }
189 pub fn next_checkpoint_after_epoch(&self, epoch_num: u64) -> u64 {
190 match self {
191 Manifest::V1(manifest) => {
192 let mut summary_files: Vec<_> = manifest
193 .file_metadata
194 .clone()
195 .into_iter()
196 .filter(|f| f.file_type == FileType::CheckpointSummary)
197 .collect();
198 summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
199 assert!(
200 summary_files
201 .windows(2)
202 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
203 );
204 assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
205 summary_files
206 .iter()
207 .find(|f| f.epoch_num > epoch_num)
208 .map(|f| f.checkpoint_seq_range.start)
209 .unwrap_or(u64::MAX)
210 }
211 }
212 }
213
214 pub fn get_all_end_of_epoch_checkpoint_seq_numbers(&self) -> Result<Vec<u64>> {
215 match self {
216 Manifest::V1(manifest) => {
217 let mut summary_files: Vec<_> = manifest
218 .file_metadata
219 .clone()
220 .into_iter()
221 .filter(|f| f.file_type == FileType::CheckpointSummary)
222 .collect();
223 summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
224 assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
225 let res = summary_files.windows(2).filter_map(|w| {
227 assert_eq!(
228 w[1].checkpoint_seq_range.start,
229 w[0].checkpoint_seq_range.end
230 );
231 if w[1].epoch_num == w[0].epoch_num + 1 {
232 Some(w[0].checkpoint_seq_range.end - 1)
233 } else {
234 None
235 }
236 });
237 Ok(res.collect())
238 }
239 }
240 }
241
242 pub fn update(
243 &mut self,
244 epoch_num: u64,
245 checkpoint_sequence_number: u64,
246 checkpoint_file_metadata: FileMetadata,
247 summary_file_metadata: FileMetadata,
248 ) {
249 match self {
250 Manifest::V1(manifest) => {
251 manifest
252 .file_metadata
253 .extend(vec![checkpoint_file_metadata, summary_file_metadata]);
254 manifest.epoch = epoch_num;
255 manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
256 }
257 }
258 }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
262pub struct CheckpointUpdates {
263 checkpoint_file_metadata: FileMetadata,
264 summary_file_metadata: FileMetadata,
265 manifest: Manifest,
266}
267
268impl CheckpointUpdates {
269 pub fn new(
270 epoch_num: u64,
271 checkpoint_sequence_number: u64,
272 checkpoint_file_metadata: FileMetadata,
273 summary_file_metadata: FileMetadata,
274 manifest: &mut Manifest,
275 ) -> Self {
276 manifest.update(
277 epoch_num,
278 checkpoint_sequence_number,
279 checkpoint_file_metadata.clone(),
280 summary_file_metadata.clone(),
281 );
282 CheckpointUpdates {
283 checkpoint_file_metadata,
284 summary_file_metadata,
285 manifest: manifest.clone(),
286 }
287 }
288 pub fn content_file_path(&self) -> Path {
289 self.checkpoint_file_metadata.file_path()
290 }
291 pub fn summary_file_path(&self) -> Path {
292 self.summary_file_metadata.file_path()
293 }
294 pub fn manifest_file_path(&self) -> Path {
295 Path::from(MANIFEST_FILENAME)
296 }
297}
298
299pub fn create_file_metadata(
300 file_path: &std::path::Path,
301 file_type: FileType,
302 epoch_num: u64,
303 checkpoint_seq_range: Range<u64>,
304) -> Result<FileMetadata> {
305 let sha3_digest = compute_sha3_checksum(file_path)?;
306 let file_metadata = FileMetadata {
307 file_type,
308 epoch_num,
309 checkpoint_seq_range,
310 sha3_digest,
311 };
312 Ok(file_metadata)
313}
314
315pub fn create_file_metadata_from_bytes(
316 bytes: Bytes,
317 file_type: FileType,
318 epoch_num: u64,
319 checkpoint_seq_range: Range<u64>,
320) -> Result<FileMetadata> {
321 let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
322 let file_metadata = FileMetadata {
323 file_type,
324 epoch_num,
325 checkpoint_seq_range,
326 sha3_digest,
327 };
328 Ok(file_metadata)
329}
330
331pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
333 let manifest_file_path = Path::from(MANIFEST_FILENAME);
334 let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
335 read_manifest_from_bytes(vec)
336}
337
338pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
341 let manifest_file_size = vec.len();
342 let mut manifest_reader = Cursor::new(vec);
343
344 manifest_reader.rewind()?;
347 let magic = manifest_reader.read_u32::<BigEndian>()?;
348 if magic != MANIFEST_FILE_MAGIC {
349 return Err(anyhow!("Unexpected magic byte in manifest: {}", magic));
350 }
351
352 manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
355 let mut sha3_digest = [0u8; SHA3_BYTES];
356 manifest_reader.read_exact(&mut sha3_digest)?;
357
358 manifest_reader.rewind()?;
361 let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
362 manifest_reader.read_exact(&mut content_buf)?;
363 let mut hasher = Sha3_256::default();
364 hasher.update(&content_buf);
365 let computed_digest = hasher.finalize().digest;
366 if computed_digest != sha3_digest {
367 return Err(anyhow!(
368 "Manifest corrupted, computed checksum: {:?}, stored checksum: {:?}",
369 computed_digest,
370 sha3_digest
371 ));
372 }
373 manifest_reader.rewind()?;
374 manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
375 Blob::read(&mut manifest_reader)?.decode()
376}
377
378pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
380 let mut buf = BufWriter::new(vec![]);
381 buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
382 let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
383 blob.write(&mut buf)?;
384 buf.flush()?;
385 let mut hasher = Sha3_256::default();
386 hasher.update(buf.get_ref());
387 let computed_digest = hasher.finalize().digest;
388 buf.write_all(&computed_digest)?;
389 Ok(Bytes::from(buf.into_inner()?))
390}
391
392pub async fn write_manifest<S: ObjectStorePutExt>(
394 manifest: Manifest,
395 remote_store: S,
396) -> Result<()> {
397 let path = Path::from(MANIFEST_FILENAME);
398 let bytes = finalize_manifest(manifest)?;
399 put(&remote_store, &path, bytes).await?;
400 Ok(())
401}
402
403pub async fn read_manifest_as_json(remote_store_config: ObjectStoreConfig) -> Result<String> {
404 let metrics = ArchiveReaderMetrics::new(&Registry::default());
405 let config = ArchiveReaderConfig {
406 remote_store_config,
407 download_concurrency: NonZeroUsize::new(1).unwrap(),
408 use_for_pruning_watermark: false,
409 };
410 let archive_reader = ArchiveReader::new(config, &metrics)?;
411 archive_reader.sync_manifest_once().await?;
412 let manifest = archive_reader.get_manifest().await?;
413 let json = serde_json::to_string(&manifest).expect("Failed to serialize object");
414 Ok(json)
415}
416
417pub async fn write_manifest_from_json(
418 remote_store_config: ObjectStoreConfig,
419 json_manifest_path: std::path::PathBuf,
420) -> Result<()> {
421 let manifest: Manifest = serde_json::from_str(&fs::read_to_string(json_manifest_path)?)?;
422 let store = remote_store_config.make()?;
423 write_manifest(manifest, store).await?;
424 Ok(())
425}
426
427pub async fn verify_archive_with_genesis_config(
430 genesis: &std::path::Path,
431 remote_store_config: ObjectStoreConfig,
432 concurrency: usize,
433 interactive: bool,
434 num_retries: u32,
435) -> Result<()> {
436 let genesis = Genesis::load(genesis).unwrap();
437 let genesis_committee = genesis.committee()?;
438 let mut store = SingleCheckpointSharedInMemoryStore::default();
439 let contents = genesis.checkpoint_contents();
440 let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
441 contents.clone(),
442 std::iter::once(ExecutionData::new(
443 genesis.transaction().clone(),
444 genesis.effects().clone(),
445 )),
446 );
447 store.insert_genesis_state(
448 genesis.checkpoint(),
449 VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
450 genesis_committee,
451 );
452
453 let num_retries = std::cmp::max(num_retries, 1);
454 for _ in 0..num_retries {
455 match verify_archive_with_local_store(
456 store.clone(),
457 remote_store_config.clone(),
458 concurrency,
459 interactive,
460 )
461 .await
462 {
463 Ok(_) => return Ok(()),
464 Err(e) => {
465 error!("Error while verifying archive: {}", e);
466 tokio::time::sleep(Duration::from_secs(10)).await;
467 }
468 }
469 }
470
471 Err::<(), anyhow::Error>(anyhow!(
472 "Failed to verify archive after {} retries",
473 num_retries
474 ))
475}
476
477pub async fn verify_archive_with_checksums(
478 remote_store_config: ObjectStoreConfig,
479 concurrency: usize,
480) -> Result<()> {
481 let metrics = ArchiveReaderMetrics::new(&Registry::default());
482 let config = ArchiveReaderConfig {
483 remote_store_config,
484 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
485 use_for_pruning_watermark: false,
486 };
487 let archive_reader = ArchiveReader::new(config, &metrics)?;
489 archive_reader.sync_manifest_once().await?;
490 let manifest = archive_reader.get_manifest().await?;
491 info!(
492 "Next checkpoint in archive store: {}",
493 manifest.next_checkpoint_seq_num()
494 );
495
496 let file_metadata = archive_reader.verify_manifest(manifest).await?;
497 let num_files = file_metadata.len() * 2;
499 archive_reader
500 .verify_file_consistency(file_metadata)
501 .await?;
502 info!("All {} files are valid", num_files);
503 Ok(())
504}
505
506pub async fn verify_archive_with_local_store<S>(
509 store: S,
510 remote_store_config: ObjectStoreConfig,
511 concurrency: usize,
512 interactive: bool,
513) -> Result<()>
514where
515 S: WriteStore + Clone + Send + 'static,
516{
517 let metrics = ArchiveReaderMetrics::new(&Registry::default());
518 let config = ArchiveReaderConfig {
519 remote_store_config,
520 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
521 use_for_pruning_watermark: false,
522 };
523 let archive_reader = ArchiveReader::new(config, &metrics)?;
524 archive_reader.sync_manifest_once().await?;
526 let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
527 info!(
528 "Latest available checkpoint in archive store: {}",
529 latest_checkpoint_in_archive
530 );
531 let latest_checkpoint = store
532 .get_highest_synced_checkpoint()
533 .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
534 .sequence_number;
535 info!("Highest synced checkpoint in db: {latest_checkpoint}");
536 let txn_counter = Arc::new(AtomicU64::new(0));
537 let checkpoint_counter = Arc::new(AtomicU64::new(0));
538 let progress_bar = if interactive {
539 let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
540 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
541 .unwrap(),
542 );
543 let cloned_progress_bar = progress_bar.clone();
544 let cloned_counter = txn_counter.clone();
545 let cloned_checkpoint_counter = checkpoint_counter.clone();
546 let instant = Instant::now();
547 tokio::spawn(async move {
548 loop {
549 let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
550 let total_checkpoints_per_sec =
551 total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
552 let total_txns_per_sec =
553 cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
554 cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
555 cloned_progress_bar.set_message(format!(
556 "checkpoints/s: {}, txns/s: {}",
557 total_checkpoints_per_sec, total_txns_per_sec
558 ));
559 tokio::time::sleep(Duration::from_secs(1)).await;
560 }
561 });
562 Some(progress_bar)
563 } else {
564 let cloned_store = store.clone();
565 tokio::spawn(async move {
566 loop {
567 let latest_checkpoint = cloned_store
568 .get_highest_synced_checkpoint()
569 .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
570 .sequence_number;
571 let percent = (latest_checkpoint * 100) / latest_checkpoint_in_archive;
572 info!("done = {percent}%");
573 tokio::time::sleep(Duration::from_secs(60)).await;
574 if percent >= 100 {
575 break;
576 }
577 }
578 Ok::<(), anyhow::Error>(())
579 });
580 None
581 };
582 archive_reader
585 .read(
586 store.clone(),
587 (latest_checkpoint + 1)..u64::MAX,
588 txn_counter,
589 checkpoint_counter,
590 true,
591 )
592 .await?;
593 progress_bar.iter().for_each(|p| p.finish_and_clear());
594 let end = store
595 .get_highest_synced_checkpoint()
596 .map_err(|_| anyhow!("Failed to read watermark"))?
597 .sequence_number;
598 info!("Highest verified checkpoint: {}", end);
599 Ok(())
600}