iota_archival/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7pub mod reader;
8pub mod writer;
9
10#[cfg(test)]
11mod tests;
12
13use std::{
14    fs,
15    io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
16    num::NonZeroUsize,
17    ops::Range,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25use anyhow::{Result, anyhow};
26use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
27use bytes::Bytes;
28use fastcrypto::hash::{HashFunction, Sha3_256};
29use indicatif::{ProgressBar, ProgressStyle};
30use iota_config::{
31    genesis::Genesis, node::ArchiveReaderConfig, object_storage_config::ObjectStoreConfig,
32};
33use iota_storage::{
34    SHA3_BYTES,
35    blob::{Blob, BlobEncoding},
36    compute_sha3_checksum, compute_sha3_checksum_for_bytes,
37    object_store::{
38        ObjectStoreGetExt, ObjectStorePutExt,
39        util::{get, put},
40    },
41};
42use iota_types::{
43    base_types::ExecutionData,
44    messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents},
45    storage::{SingleCheckpointSharedInMemoryStore, WriteStore},
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use prometheus::Registry;
50use serde::{Deserialize, Serialize};
51use tracing::{error, info};
52
53use crate::reader::{ArchiveReader, ArchiveReaderMetrics};
54
55#[expect(rustdoc::invalid_html_tags)]
56/// Checkpoints and summaries are persisted as blob files. Files are committed
57/// to local store by duration or file size. Committed files are synced with the
58/// remote store continuously. Files are optionally compressed with the zstd
59/// compression format. Filenames follow the format <checkpoint_seq_num>.
60/// <suffix> where `checkpoint_seq_num` is the first checkpoint present in that
61/// file. MANIFEST is the index and source of truth for all files present in the
62/// archive.
63///
64/// State Archival Directory Layout
65///  - archive/
66///     - MANIFEST
67///     - epoch_0/
68///        - 0.chk
69///        - 0.sum
70///        - 1000.chk
71///        - 1000.sum
72///        - 3000.chk
73///        - 3000.sum
74///        - ...
75///        - 100000.chk
76///        - 100000.sum
77///     - epoch_1/
78///        - 101000.chk
79///        - ...
80///
81/// Blob File Disk Format
82/// ┌──────────────────────────────┐
83/// │       magic <4 byte>         │
84/// ├──────────────────────────────┤
85/// │  storage format <1 byte>     │
86/// ├──────────────────────────────┤
87/// │    file compression <1 byte> │
88/// ├──────────────────────────────┤
89/// │ ┌──────────────────────────┐ │
90/// │ │         Blob 1           │ │
91/// │ ├──────────────────────────┤ │
92/// │ │          ...             │ │
93/// │ ├──────────────────────────┤ │
94/// │ │        Blob N            │ │
95/// │ └──────────────────────────┘ │
96/// └──────────────────────────────┘
97/// Blob
98/// ┌───────────────┬───────────────────┬──────────────┐
99/// │ len <uvarint> │ encoding <1 byte> │ data <bytes> │
100/// └───────────────┴───────────────────┴──────────────┘
101///
102/// MANIFEST File Disk Format
103/// ┌──────────────────────────────┐
104/// │        magic<4 byte>         │
105/// ├──────────────────────────────┤
106/// │   serialized manifest        │
107/// ├──────────────────────────────┤
108/// │      sha3 <32 bytes>         │
109/// └──────────────────────────────┘
110pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
111pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
112const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
113const MAGIC_BYTES: usize = 4;
114const CHECKPOINT_FILE_SUFFIX: &str = "chk";
115const SUMMARY_FILE_SUFFIX: &str = "sum";
116const EPOCH_DIR_PREFIX: &str = "epoch_";
117const MANIFEST_FILENAME: &str = "MANIFEST";
118
119#[derive(
120    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
121)]
122#[repr(u8)]
123pub enum FileType {
124    CheckpointContent = 0,
125    CheckpointSummary,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub struct FileMetadata {
130    pub file_type: FileType,
131    pub epoch_num: u64,
132    pub checkpoint_seq_range: Range<u64>,
133    pub sha3_digest: [u8; 32],
134}
135
136impl FileMetadata {
137    pub fn file_path(&self) -> Path {
138        let dir_path = Path::from(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num));
139        match self.file_type {
140            FileType::CheckpointContent => dir_path.child(&*format!(
141                "{}.{CHECKPOINT_FILE_SUFFIX}",
142                self.checkpoint_seq_range.start
143            )),
144            FileType::CheckpointSummary => dir_path.child(&*format!(
145                "{}.{SUMMARY_FILE_SUFFIX}",
146                self.checkpoint_seq_range.start
147            )),
148        }
149    }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
153pub struct ManifestV1 {
154    pub archive_version: u8,
155    pub next_checkpoint_seq_num: u64,
156    pub file_metadata: Vec<FileMetadata>,
157    pub epoch: u64,
158}
159
160#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
161pub enum Manifest {
162    V1(ManifestV1),
163}
164
165impl Manifest {
166    pub fn new(epoch: u64, next_checkpoint_seq_num: u64) -> Self {
167        Manifest::V1(ManifestV1 {
168            archive_version: 1,
169            next_checkpoint_seq_num,
170            file_metadata: vec![],
171            epoch,
172        })
173    }
174    pub fn files(&self) -> Vec<FileMetadata> {
175        match self {
176            Manifest::V1(manifest) => manifest.file_metadata.clone(),
177        }
178    }
179    pub fn epoch_num(&self) -> u64 {
180        match self {
181            Manifest::V1(manifest) => manifest.epoch,
182        }
183    }
184    pub fn next_checkpoint_seq_num(&self) -> u64 {
185        match self {
186            Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
187        }
188    }
189    pub fn next_checkpoint_after_epoch(&self, epoch_num: u64) -> u64 {
190        match self {
191            Manifest::V1(manifest) => {
192                let mut summary_files: Vec<_> = manifest
193                    .file_metadata
194                    .clone()
195                    .into_iter()
196                    .filter(|f| f.file_type == FileType::CheckpointSummary)
197                    .collect();
198                summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
199                assert!(
200                    summary_files
201                        .windows(2)
202                        .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
203                );
204                assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
205                summary_files
206                    .iter()
207                    .find(|f| f.epoch_num > epoch_num)
208                    .map(|f| f.checkpoint_seq_range.start)
209                    .unwrap_or(u64::MAX)
210            }
211        }
212    }
213
214    pub fn get_all_end_of_epoch_checkpoint_seq_numbers(&self) -> Result<Vec<u64>> {
215        match self {
216            Manifest::V1(manifest) => {
217                let mut summary_files: Vec<_> = manifest
218                    .file_metadata
219                    .clone()
220                    .into_iter()
221                    .filter(|f| f.file_type == FileType::CheckpointSummary)
222                    .collect();
223                summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
224                assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
225                // get last checkpoint seq num per epoch
226                let res = summary_files.windows(2).filter_map(|w| {
227                    assert_eq!(
228                        w[1].checkpoint_seq_range.start,
229                        w[0].checkpoint_seq_range.end
230                    );
231                    if w[1].epoch_num == w[0].epoch_num + 1 {
232                        Some(w[0].checkpoint_seq_range.end - 1)
233                    } else {
234                        None
235                    }
236                });
237                Ok(res.collect())
238            }
239        }
240    }
241
242    pub fn update(
243        &mut self,
244        epoch_num: u64,
245        checkpoint_sequence_number: u64,
246        checkpoint_file_metadata: FileMetadata,
247        summary_file_metadata: FileMetadata,
248    ) {
249        match self {
250            Manifest::V1(manifest) => {
251                manifest
252                    .file_metadata
253                    .extend(vec![checkpoint_file_metadata, summary_file_metadata]);
254                manifest.epoch = epoch_num;
255                manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
256            }
257        }
258    }
259}
260
261#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
262pub struct CheckpointUpdates {
263    checkpoint_file_metadata: FileMetadata,
264    summary_file_metadata: FileMetadata,
265    manifest: Manifest,
266}
267
268impl CheckpointUpdates {
269    pub fn new(
270        epoch_num: u64,
271        checkpoint_sequence_number: u64,
272        checkpoint_file_metadata: FileMetadata,
273        summary_file_metadata: FileMetadata,
274        manifest: &mut Manifest,
275    ) -> Self {
276        manifest.update(
277            epoch_num,
278            checkpoint_sequence_number,
279            checkpoint_file_metadata.clone(),
280            summary_file_metadata.clone(),
281        );
282        CheckpointUpdates {
283            checkpoint_file_metadata,
284            summary_file_metadata,
285            manifest: manifest.clone(),
286        }
287    }
288    pub fn content_file_path(&self) -> Path {
289        self.checkpoint_file_metadata.file_path()
290    }
291    pub fn summary_file_path(&self) -> Path {
292        self.summary_file_metadata.file_path()
293    }
294    pub fn manifest_file_path(&self) -> Path {
295        Path::from(MANIFEST_FILENAME)
296    }
297}
298
299pub fn create_file_metadata(
300    file_path: &std::path::Path,
301    file_type: FileType,
302    epoch_num: u64,
303    checkpoint_seq_range: Range<u64>,
304) -> Result<FileMetadata> {
305    let sha3_digest = compute_sha3_checksum(file_path)?;
306    let file_metadata = FileMetadata {
307        file_type,
308        epoch_num,
309        checkpoint_seq_range,
310        sha3_digest,
311    };
312    Ok(file_metadata)
313}
314
315pub fn create_file_metadata_from_bytes(
316    bytes: Bytes,
317    file_type: FileType,
318    epoch_num: u64,
319    checkpoint_seq_range: Range<u64>,
320) -> Result<FileMetadata> {
321    let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
322    let file_metadata = FileMetadata {
323        file_type,
324        epoch_num,
325        checkpoint_seq_range,
326        sha3_digest,
327    };
328    Ok(file_metadata)
329}
330
331/// Reads the manifest file from the store.
332pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
333    let manifest_file_path = Path::from(MANIFEST_FILENAME);
334    let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
335    read_manifest_from_bytes(vec)
336}
337
338/// Reads the manifest file from the given byte vector and verifies the
339/// integrity of the file.
340pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
341    let manifest_file_size = vec.len();
342    let mut manifest_reader = Cursor::new(vec);
343
344    // Reads from the beginning of the file and verifies the magic byte
345    // is MANIFEST_FILE_MAGIC
346    manifest_reader.rewind()?;
347    let magic = manifest_reader.read_u32::<BigEndian>()?;
348    if magic != MANIFEST_FILE_MAGIC {
349        return Err(anyhow!("Unexpected magic byte in manifest: {}", magic));
350    }
351
352    // Reads from the end of the file and gets the SHA3 checksum
353    // of the content.
354    manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
355    let mut sha3_digest = [0u8; SHA3_BYTES];
356    manifest_reader.read_exact(&mut sha3_digest)?;
357
358    // Reads the content of the file and verifies the SHA3 checksum
359    // of the content matches the stored checksum.
360    manifest_reader.rewind()?;
361    let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
362    manifest_reader.read_exact(&mut content_buf)?;
363    let mut hasher = Sha3_256::default();
364    hasher.update(&content_buf);
365    let computed_digest = hasher.finalize().digest;
366    if computed_digest != sha3_digest {
367        return Err(anyhow!(
368            "Manifest corrupted, computed checksum: {:?}, stored checksum: {:?}",
369            computed_digest,
370            sha3_digest
371        ));
372    }
373    manifest_reader.rewind()?;
374    manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
375    Blob::read(&mut manifest_reader)?.decode()
376}
377
378/// Computes the SHA3 checksum of the Manifest and writes it to a byte vector.
379pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
380    let mut buf = BufWriter::new(vec![]);
381    buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
382    let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
383    blob.write(&mut buf)?;
384    buf.flush()?;
385    let mut hasher = Sha3_256::default();
386    hasher.update(buf.get_ref());
387    let computed_digest = hasher.finalize().digest;
388    buf.write_all(&computed_digest)?;
389    Ok(Bytes::from(buf.into_inner()?))
390}
391
392/// Writes the Manifest to the remote store.
393pub async fn write_manifest<S: ObjectStorePutExt>(
394    manifest: Manifest,
395    remote_store: S,
396) -> Result<()> {
397    let path = Path::from(MANIFEST_FILENAME);
398    let bytes = finalize_manifest(manifest)?;
399    put(&remote_store, &path, bytes).await?;
400    Ok(())
401}
402
403pub async fn read_manifest_as_json(remote_store_config: ObjectStoreConfig) -> Result<String> {
404    let metrics = ArchiveReaderMetrics::new(&Registry::default());
405    let config = ArchiveReaderConfig {
406        remote_store_config,
407        download_concurrency: NonZeroUsize::new(1).unwrap(),
408        use_for_pruning_watermark: false,
409    };
410    let archive_reader = ArchiveReader::new(config, &metrics)?;
411    archive_reader.sync_manifest_once().await?;
412    let manifest = archive_reader.get_manifest().await?;
413    let json = serde_json::to_string(&manifest).expect("Failed to serialize object");
414    Ok(json)
415}
416
417pub async fn write_manifest_from_json(
418    remote_store_config: ObjectStoreConfig,
419    json_manifest_path: std::path::PathBuf,
420) -> Result<()> {
421    let manifest: Manifest = serde_json::from_str(&fs::read_to_string(json_manifest_path)?)?;
422    let store = remote_store_config.make()?;
423    write_manifest(manifest, store).await?;
424    Ok(())
425}
426
427/// Loads and stores states from genesis then verifies and stores the
428/// checkpoints from the archive store.
429pub async fn verify_archive_with_genesis_config(
430    genesis: &std::path::Path,
431    remote_store_config: ObjectStoreConfig,
432    concurrency: usize,
433    interactive: bool,
434    num_retries: u32,
435) -> Result<()> {
436    let genesis = Genesis::load(genesis).unwrap();
437    let genesis_committee = genesis.committee()?;
438    let mut store = SingleCheckpointSharedInMemoryStore::default();
439    let contents = genesis.checkpoint_contents();
440    let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
441        contents.clone(),
442        std::iter::once(ExecutionData::new(
443            genesis.transaction().clone(),
444            genesis.effects().clone(),
445        )),
446    );
447    store.insert_genesis_state(
448        genesis.checkpoint(),
449        VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
450        genesis_committee,
451    );
452
453    let num_retries = std::cmp::max(num_retries, 1);
454    for _ in 0..num_retries {
455        match verify_archive_with_local_store(
456            store.clone(),
457            remote_store_config.clone(),
458            concurrency,
459            interactive,
460        )
461        .await
462        {
463            Ok(_) => return Ok(()),
464            Err(e) => {
465                error!("Error while verifying archive: {}", e);
466                tokio::time::sleep(Duration::from_secs(10)).await;
467            }
468        }
469    }
470
471    Err::<(), anyhow::Error>(anyhow!(
472        "Failed to verify archive after {} retries",
473        num_retries
474    ))
475}
476
477pub async fn verify_archive_with_checksums(
478    remote_store_config: ObjectStoreConfig,
479    concurrency: usize,
480) -> Result<()> {
481    let metrics = ArchiveReaderMetrics::new(&Registry::default());
482    let config = ArchiveReaderConfig {
483        remote_store_config,
484        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
485        use_for_pruning_watermark: false,
486    };
487    // Gets the Manifest from the remote store.
488    let archive_reader = ArchiveReader::new(config, &metrics)?;
489    archive_reader.sync_manifest_once().await?;
490    let manifest = archive_reader.get_manifest().await?;
491    info!(
492        "Next checkpoint in archive store: {}",
493        manifest.next_checkpoint_seq_num()
494    );
495
496    let file_metadata = archive_reader.verify_manifest(manifest).await?;
497    // Account for both summary and content files
498    let num_files = file_metadata.len() * 2;
499    archive_reader
500        .verify_file_consistency(file_metadata)
501        .await?;
502    info!("All {} files are valid", num_files);
503    Ok(())
504}
505
506/// Verifies the archive store by reading the checkpoints from the remote store
507/// and storing them in the local one.
508pub async fn verify_archive_with_local_store<S>(
509    store: S,
510    remote_store_config: ObjectStoreConfig,
511    concurrency: usize,
512    interactive: bool,
513) -> Result<()>
514where
515    S: WriteStore + Clone + Send + 'static,
516{
517    let metrics = ArchiveReaderMetrics::new(&Registry::default());
518    let config = ArchiveReaderConfig {
519        remote_store_config,
520        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
521        use_for_pruning_watermark: false,
522    };
523    let archive_reader = ArchiveReader::new(config, &metrics)?;
524    // Gets the Manifest from the remote store.
525    archive_reader.sync_manifest_once().await?;
526    let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
527    info!(
528        "Latest available checkpoint in archive store: {}",
529        latest_checkpoint_in_archive
530    );
531    let latest_checkpoint = store
532        .get_highest_synced_checkpoint()
533        .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
534        .sequence_number;
535    info!("Highest synced checkpoint in db: {latest_checkpoint}");
536    let txn_counter = Arc::new(AtomicU64::new(0));
537    let checkpoint_counter = Arc::new(AtomicU64::new(0));
538    let progress_bar = if interactive {
539        let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
540            ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
541                .unwrap(),
542        );
543        let cloned_progress_bar = progress_bar.clone();
544        let cloned_counter = txn_counter.clone();
545        let cloned_checkpoint_counter = checkpoint_counter.clone();
546        let instant = Instant::now();
547        tokio::spawn(async move {
548            loop {
549                let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
550                let total_checkpoints_per_sec =
551                    total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
552                let total_txns_per_sec =
553                    cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
554                cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
555                cloned_progress_bar.set_message(format!(
556                    "checkpoints/s: {}, txns/s: {}",
557                    total_checkpoints_per_sec, total_txns_per_sec
558                ));
559                tokio::time::sleep(Duration::from_secs(1)).await;
560            }
561        });
562        Some(progress_bar)
563    } else {
564        let cloned_store = store.clone();
565        tokio::spawn(async move {
566            loop {
567                let latest_checkpoint = cloned_store
568                    .get_highest_synced_checkpoint()
569                    .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
570                    .sequence_number;
571                let percent = (latest_checkpoint * 100) / latest_checkpoint_in_archive;
572                info!("done = {percent}%");
573                tokio::time::sleep(Duration::from_secs(60)).await;
574                if percent >= 100 {
575                    break;
576                }
577            }
578            Ok::<(), anyhow::Error>(())
579        });
580        None
581    };
582    // Reads the checkpoints from the remote store and stores them in the local
583    // one.
584    archive_reader
585        .read(
586            store.clone(),
587            (latest_checkpoint + 1)..u64::MAX,
588            txn_counter,
589            checkpoint_counter,
590            true,
591        )
592        .await?;
593    progress_bar.iter().for_each(|p| p.finish_and_clear());
594    let end = store
595        .get_highest_synced_checkpoint()
596        .map_err(|_| anyhow!("Failed to read watermark"))?
597        .sequence_number;
598    info!("Highest verified checkpoint: {}", end);
599    Ok(())
600}