iota_archival/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7pub mod reader;
8pub mod writer;
9
10#[cfg(test)]
11mod tests;
12
13use std::{
14    fs,
15    io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
16    num::NonZeroUsize,
17    ops::Range,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25use anyhow::{Result, anyhow};
26use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
27use bytes::Bytes;
28use fastcrypto::hash::{HashFunction, Sha3_256};
29use indicatif::{ProgressBar, ProgressStyle};
30use iota_config::{
31    genesis::Genesis, node::ArchiveReaderConfig, object_storage_config::ObjectStoreConfig,
32};
33use iota_storage::{
34    SHA3_BYTES,
35    blob::{Blob, BlobEncoding},
36    compute_sha3_checksum, compute_sha3_checksum_for_bytes,
37    object_store::{
38        ObjectStoreGetExt, ObjectStorePutExt,
39        util::{get, put},
40    },
41};
42use iota_types::{
43    base_types::ExecutionData,
44    messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents},
45    storage::{SingleCheckpointSharedInMemoryStore, WriteStore},
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use prometheus::Registry;
50use serde::{Deserialize, Serialize};
51use tracing::{error, info};
52
53use crate::reader::{ArchiveReader, ArchiveReaderMetrics};
54
55#[expect(rustdoc::invalid_html_tags)]
56/// Checkpoints and summaries are persisted as blob files. Files are committed
57/// to local store by duration or file size. Committed files are synced with the
58/// remote store continuously. Files are optionally compressed with the zstd
59/// compression format. Filenames follow the format <checkpoint_seq_num>.
60/// <suffix> where `checkpoint_seq_num` is the first checkpoint present in that
61/// file. MANIFEST is the index and source of truth for all files present in the
62/// archive.
63///
64/// State Archival Directory Layout
65///  - archive/
66///     - MANIFEST
67///     - epoch_0/
68///        - 0.chk
69///        - 0.sum
70///        - 1000.chk
71///        - 1000.sum
72///        - 3000.chk
73///        - 3000.sum
74///        - ...
75///        - 100000.chk
76///        - 100000.sum
77///     - epoch_1/
78///        - 101000.chk
79///        - ...
80///
81/// Blob File Disk Format
82/// ┌──────────────────────────────┐
83/// │       magic <4 byte>         │
84/// ├──────────────────────────────┤
85/// │  storage format <1 byte>     │
86/// ├──────────────────────────────┤
87/// │    file compression <1 byte> │
88/// ├──────────────────────────────┤
89/// │ ┌──────────────────────────┐ │
90/// │ │         Blob 1           │ │
91/// │ ├──────────────────────────┤ │
92/// │ │          ...             │ │
93/// │ ├──────────────────────────┤ │
94/// │ │        Blob N            │ │
95/// │ └──────────────────────────┘ │
96/// └──────────────────────────────┘
97/// Blob
98/// ┌───────────────┬───────────────────┬──────────────┐
99/// │ len <uvarint> │ encoding <1 byte> │ data <bytes> │
100/// └───────────────┴───────────────────┴──────────────┘
101///
102/// MANIFEST File Disk Format
103/// ┌──────────────────────────────┐
104/// │        magic<4 byte>         │
105/// ├──────────────────────────────┤
106/// │   serialized manifest        │
107/// ├──────────────────────────────┤
108/// │      sha3 <32 bytes>         │
109/// └──────────────────────────────┘
110pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
111pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
112const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
113const MAGIC_BYTES: usize = 4;
114const CHECKPOINT_FILE_SUFFIX: &str = "chk";
115const SUMMARY_FILE_SUFFIX: &str = "sum";
116const EPOCH_DIR_PREFIX: &str = "epoch_";
117const MANIFEST_FILENAME: &str = "MANIFEST";
118
119#[derive(
120    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
121)]
122#[repr(u8)]
123pub enum FileType {
124    CheckpointContent = 0,
125    CheckpointSummary,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub struct FileMetadata {
130    pub file_type: FileType,
131    pub epoch_num: u64,
132    pub checkpoint_seq_range: Range<u64>,
133    pub sha3_digest: [u8; 32],
134}
135
136impl FileMetadata {
137    pub fn file_path(&self) -> Path {
138        let dir_path = Path::from(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num));
139        match self.file_type {
140            FileType::CheckpointContent => dir_path.child(&*format!(
141                "{}.{CHECKPOINT_FILE_SUFFIX}",
142                self.checkpoint_seq_range.start
143            )),
144            FileType::CheckpointSummary => dir_path.child(&*format!(
145                "{}.{SUMMARY_FILE_SUFFIX}",
146                self.checkpoint_seq_range.start
147            )),
148        }
149    }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
153pub struct ManifestV1 {
154    pub archive_version: u8,
155    pub next_checkpoint_seq_num: u64,
156    pub file_metadata: Vec<FileMetadata>,
157    pub epoch: u64,
158}
159
160#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
161pub enum Manifest {
162    V1(ManifestV1),
163}
164
165impl Manifest {
166    pub fn new(epoch: u64, next_checkpoint_seq_num: u64) -> Self {
167        Manifest::V1(ManifestV1 {
168            archive_version: 1,
169            next_checkpoint_seq_num,
170            file_metadata: vec![],
171            epoch,
172        })
173    }
174    pub fn files(&self) -> Vec<FileMetadata> {
175        match self {
176            Manifest::V1(manifest) => manifest.file_metadata.clone(),
177        }
178    }
179    pub fn epoch_num(&self) -> u64 {
180        match self {
181            Manifest::V1(manifest) => manifest.epoch,
182        }
183    }
184    pub fn next_checkpoint_seq_num(&self) -> u64 {
185        match self {
186            Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
187        }
188    }
189    pub fn next_checkpoint_after_epoch(&self, epoch_num: u64) -> u64 {
190        match self {
191            Manifest::V1(manifest) => {
192                let mut summary_files: Vec<_> = manifest
193                    .file_metadata
194                    .clone()
195                    .into_iter()
196                    .filter(|f| f.file_type == FileType::CheckpointSummary)
197                    .collect();
198                summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
199                assert!(
200                    summary_files
201                        .windows(2)
202                        .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
203                );
204                assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
205                summary_files
206                    .iter()
207                    .find(|f| f.epoch_num > epoch_num)
208                    .map(|f| f.checkpoint_seq_range.start)
209                    .unwrap_or(u64::MAX)
210            }
211        }
212    }
213    pub fn update(
214        &mut self,
215        epoch_num: u64,
216        checkpoint_sequence_number: u64,
217        checkpoint_file_metadata: FileMetadata,
218        summary_file_metadata: FileMetadata,
219    ) {
220        match self {
221            Manifest::V1(manifest) => {
222                manifest
223                    .file_metadata
224                    .extend(vec![checkpoint_file_metadata, summary_file_metadata]);
225                manifest.epoch = epoch_num;
226                manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
227            }
228        }
229    }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
233pub struct CheckpointUpdates {
234    checkpoint_file_metadata: FileMetadata,
235    summary_file_metadata: FileMetadata,
236    manifest: Manifest,
237}
238
239impl CheckpointUpdates {
240    pub fn new(
241        epoch_num: u64,
242        checkpoint_sequence_number: u64,
243        checkpoint_file_metadata: FileMetadata,
244        summary_file_metadata: FileMetadata,
245        manifest: &mut Manifest,
246    ) -> Self {
247        manifest.update(
248            epoch_num,
249            checkpoint_sequence_number,
250            checkpoint_file_metadata.clone(),
251            summary_file_metadata.clone(),
252        );
253        CheckpointUpdates {
254            checkpoint_file_metadata,
255            summary_file_metadata,
256            manifest: manifest.clone(),
257        }
258    }
259    pub fn content_file_path(&self) -> Path {
260        self.checkpoint_file_metadata.file_path()
261    }
262    pub fn summary_file_path(&self) -> Path {
263        self.summary_file_metadata.file_path()
264    }
265    pub fn manifest_file_path(&self) -> Path {
266        Path::from(MANIFEST_FILENAME)
267    }
268}
269
270pub fn create_file_metadata(
271    file_path: &std::path::Path,
272    file_type: FileType,
273    epoch_num: u64,
274    checkpoint_seq_range: Range<u64>,
275) -> Result<FileMetadata> {
276    let sha3_digest = compute_sha3_checksum(file_path)?;
277    let file_metadata = FileMetadata {
278        file_type,
279        epoch_num,
280        checkpoint_seq_range,
281        sha3_digest,
282    };
283    Ok(file_metadata)
284}
285
286pub fn create_file_metadata_from_bytes(
287    bytes: Bytes,
288    file_type: FileType,
289    epoch_num: u64,
290    checkpoint_seq_range: Range<u64>,
291) -> Result<FileMetadata> {
292    let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
293    let file_metadata = FileMetadata {
294        file_type,
295        epoch_num,
296        checkpoint_seq_range,
297        sha3_digest,
298    };
299    Ok(file_metadata)
300}
301
302/// Reads the manifest file from the store.
303pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
304    let manifest_file_path = Path::from(MANIFEST_FILENAME);
305    let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
306    read_manifest_from_bytes(vec)
307}
308
309/// Reads the manifest file from the given byte vector and verifies the
310/// integrity of the file.
311pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
312    let manifest_file_size = vec.len();
313    let mut manifest_reader = Cursor::new(vec);
314
315    // Reads from the beginning of the file and verifies the magic byte
316    // is MANIFEST_FILE_MAGIC
317    manifest_reader.rewind()?;
318    let magic = manifest_reader.read_u32::<BigEndian>()?;
319    if magic != MANIFEST_FILE_MAGIC {
320        return Err(anyhow!("Unexpected magic byte in manifest: {}", magic));
321    }
322
323    // Reads from the end of the file and gets the SHA3 checksum
324    // of the content.
325    manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
326    let mut sha3_digest = [0u8; SHA3_BYTES];
327    manifest_reader.read_exact(&mut sha3_digest)?;
328
329    // Reads the content of the file and verifies the SHA3 checksum
330    // of the content matches the stored checksum.
331    manifest_reader.rewind()?;
332    let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
333    manifest_reader.read_exact(&mut content_buf)?;
334    let mut hasher = Sha3_256::default();
335    hasher.update(&content_buf);
336    let computed_digest = hasher.finalize().digest;
337    if computed_digest != sha3_digest {
338        return Err(anyhow!(
339            "Manifest corrupted, computed checksum: {:?}, stored checksum: {:?}",
340            computed_digest,
341            sha3_digest
342        ));
343    }
344    manifest_reader.rewind()?;
345    manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
346    Blob::read(&mut manifest_reader)?.decode()
347}
348
349/// Computes the SHA3 checksum of the Manifest and writes it to a byte vector.
350pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
351    let mut buf = BufWriter::new(vec![]);
352    buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
353    let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
354    blob.write(&mut buf)?;
355    buf.flush()?;
356    let mut hasher = Sha3_256::default();
357    hasher.update(buf.get_ref());
358    let computed_digest = hasher.finalize().digest;
359    buf.write_all(&computed_digest)?;
360    Ok(Bytes::from(buf.into_inner()?))
361}
362
363/// Writes the Manifest to the remote store.
364pub async fn write_manifest<S: ObjectStorePutExt>(
365    manifest: Manifest,
366    remote_store: S,
367) -> Result<()> {
368    let path = Path::from(MANIFEST_FILENAME);
369    let bytes = finalize_manifest(manifest)?;
370    put(&remote_store, &path, bytes).await?;
371    Ok(())
372}
373
374pub async fn read_manifest_as_json(remote_store_config: ObjectStoreConfig) -> Result<String> {
375    let metrics = ArchiveReaderMetrics::new(&Registry::default());
376    let config = ArchiveReaderConfig {
377        remote_store_config,
378        download_concurrency: NonZeroUsize::new(1).unwrap(),
379        use_for_pruning_watermark: false,
380    };
381    let archive_reader = ArchiveReader::new(config, &metrics)?;
382    archive_reader.sync_manifest_once().await?;
383    let manifest = archive_reader.get_manifest().await?;
384    let json = serde_json::to_string(&manifest).expect("Failed to serialize object");
385    Ok(json)
386}
387
388pub async fn write_manifest_from_json(
389    remote_store_config: ObjectStoreConfig,
390    json_manifest_path: std::path::PathBuf,
391) -> Result<()> {
392    let manifest: Manifest = serde_json::from_str(&fs::read_to_string(json_manifest_path)?)?;
393    let store = remote_store_config.make()?;
394    write_manifest(manifest, store).await?;
395    Ok(())
396}
397
398/// Loads and stores states from genesis then verifies and stores the
399/// checkpoints from the archive store.
400pub async fn verify_archive_with_genesis_config(
401    genesis: &std::path::Path,
402    remote_store_config: ObjectStoreConfig,
403    concurrency: usize,
404    interactive: bool,
405    num_retries: u32,
406) -> Result<()> {
407    let genesis = Genesis::load(genesis).unwrap();
408    let genesis_committee = genesis.committee()?;
409    let mut store = SingleCheckpointSharedInMemoryStore::default();
410    let contents = genesis.checkpoint_contents();
411    let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
412        contents.clone(),
413        std::iter::once(ExecutionData::new(
414            genesis.transaction().clone(),
415            genesis.effects().clone(),
416        )),
417    );
418    store.insert_genesis_state(
419        genesis.checkpoint(),
420        VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
421        genesis_committee,
422    );
423
424    let num_retries = std::cmp::max(num_retries, 1);
425    for _ in 0..num_retries {
426        match verify_archive_with_local_store(
427            store.clone(),
428            remote_store_config.clone(),
429            concurrency,
430            interactive,
431        )
432        .await
433        {
434            Ok(_) => return Ok(()),
435            Err(e) => {
436                error!("Error while verifying archive: {}", e);
437                tokio::time::sleep(Duration::from_secs(10)).await;
438            }
439        }
440    }
441
442    Err::<(), anyhow::Error>(anyhow!(
443        "Failed to verify archive after {} retries",
444        num_retries
445    ))
446}
447
448pub async fn verify_archive_with_checksums(
449    remote_store_config: ObjectStoreConfig,
450    concurrency: usize,
451) -> Result<()> {
452    let metrics = ArchiveReaderMetrics::new(&Registry::default());
453    let config = ArchiveReaderConfig {
454        remote_store_config,
455        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
456        use_for_pruning_watermark: false,
457    };
458    // Gets the Manifest from the remote store.
459    let archive_reader = ArchiveReader::new(config, &metrics)?;
460    archive_reader.sync_manifest_once().await?;
461    let manifest = archive_reader.get_manifest().await?;
462    info!(
463        "Next checkpoint in archive store: {}",
464        manifest.next_checkpoint_seq_num()
465    );
466
467    let file_metadata = archive_reader.verify_manifest(manifest).await?;
468    // Account for both summary and content files
469    let num_files = file_metadata.len() * 2;
470    archive_reader
471        .verify_file_consistency(file_metadata)
472        .await?;
473    info!("All {} files are valid", num_files);
474    Ok(())
475}
476
477/// Verifies the archive store by reading the checkpoints from the remote store
478/// and storing them in the local one.
479pub async fn verify_archive_with_local_store<S>(
480    store: S,
481    remote_store_config: ObjectStoreConfig,
482    concurrency: usize,
483    interactive: bool,
484) -> Result<()>
485where
486    S: WriteStore + Clone + Send + 'static,
487{
488    let metrics = ArchiveReaderMetrics::new(&Registry::default());
489    let config = ArchiveReaderConfig {
490        remote_store_config,
491        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
492        use_for_pruning_watermark: false,
493    };
494    let archive_reader = ArchiveReader::new(config, &metrics)?;
495    // Gets the Manifest from the remote store.
496    archive_reader.sync_manifest_once().await?;
497    let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
498    info!(
499        "Latest available checkpoint in archive store: {}",
500        latest_checkpoint_in_archive
501    );
502    let latest_checkpoint = store
503        .get_highest_synced_checkpoint()
504        .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
505        .sequence_number;
506    info!("Highest synced checkpoint in db: {latest_checkpoint}");
507    let txn_counter = Arc::new(AtomicU64::new(0));
508    let checkpoint_counter = Arc::new(AtomicU64::new(0));
509    let progress_bar = if interactive {
510        let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
511            ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
512                .unwrap(),
513        );
514        let cloned_progress_bar = progress_bar.clone();
515        let cloned_counter = txn_counter.clone();
516        let cloned_checkpoint_counter = checkpoint_counter.clone();
517        let instant = Instant::now();
518        tokio::spawn(async move {
519            loop {
520                let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
521                let total_checkpoints_per_sec =
522                    total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
523                let total_txns_per_sec =
524                    cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
525                cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
526                cloned_progress_bar.set_message(format!(
527                    "checkpoints/s: {}, txns/s: {}",
528                    total_checkpoints_per_sec, total_txns_per_sec
529                ));
530                tokio::time::sleep(Duration::from_secs(1)).await;
531            }
532        });
533        Some(progress_bar)
534    } else {
535        let cloned_store = store.clone();
536        tokio::spawn(async move {
537            loop {
538                let latest_checkpoint = cloned_store
539                    .get_highest_synced_checkpoint()
540                    .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
541                    .sequence_number;
542                let percent = (latest_checkpoint * 100) / latest_checkpoint_in_archive;
543                info!("done = {percent}%");
544                tokio::time::sleep(Duration::from_secs(60)).await;
545                if percent >= 100 {
546                    break;
547                }
548            }
549            Ok::<(), anyhow::Error>(())
550        });
551        None
552    };
553    // Reads the checkpoints from the remote store and stores them in the local
554    // one.
555    archive_reader
556        .read(
557            store.clone(),
558            (latest_checkpoint + 1)..u64::MAX,
559            txn_counter,
560            checkpoint_counter,
561            true,
562        )
563        .await?;
564    progress_bar.iter().for_each(|p| p.finish_and_clear());
565    let end = store
566        .get_highest_synced_checkpoint()
567        .map_err(|_| anyhow!("Failed to read watermark"))?
568        .sequence_number;
569    info!("Highest verified checkpoint: {}", end);
570    Ok(())
571}