Skip to main content

iota_data_ingestion_core/history/
manifest.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Handle the manifest for historical checkpoint data.
6//!
7//! MANIFEST File Disk Format
8//! ┌──────────────────────────────┐
9//! │        magic<4 byte>         │
10//! ├──────────────────────────────┤
11//! │   serialized manifest        │
12//! ├──────────────────────────────┤
13//! │      sha3 <32 bytes>         │
14//! └──────────────────────────────┘
15
16use std::{num::NonZeroUsize, ops::Range};
17
18use bytes::Bytes;
19use iota_config::{
20    node::ArchiveReaderConfig as HistoricalReaderConfig, object_storage_config::ObjectStoreConfig,
21};
22use iota_storage::{
23    compute_sha3_checksum, compute_sha3_checksum_for_bytes,
24    object_store::{
25        ObjectStoreGetExt, ObjectStorePutExt,
26        util::{get, put},
27    },
28};
29use object_store::path::Path;
30use serde::{Deserialize, Serialize};
31use tracing::info;
32
33use crate::{
34    errors::IngestionResult as Result,
35    history::{
36        CHECKPOINT_FILE_SUFFIX, MANIFEST_FILE_MAGIC, MANIFEST_FILENAME, finalize_magic_blob,
37        read_magic_blob, reader::HistoricalReader,
38    },
39};
40
41#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
42pub struct FileMetadata {
43    pub checkpoint_seq_range: Range<u64>,
44    pub sha3_digest: [u8; 32],
45}
46
47impl FileMetadata {
48    pub fn file_path(&self) -> Path {
49        Path::from(format!(
50            "{}.{CHECKPOINT_FILE_SUFFIX}",
51            self.checkpoint_seq_range.start
52        ))
53    }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
57pub struct ManifestV1 {
58    pub archive_version: u8,
59    pub next_checkpoint_seq_num: u64,
60    pub file_metadata: Vec<FileMetadata>,
61}
62
63#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
64#[non_exhaustive]
65pub enum Manifest {
66    V1(ManifestV1),
67}
68
69impl Manifest {
70    pub fn new(next_checkpoint_seq_num: u64) -> Self {
71        Manifest::V1(ManifestV1 {
72            archive_version: 1,
73            next_checkpoint_seq_num,
74            file_metadata: vec![],
75        })
76    }
77
78    pub fn to_files(&self) -> Vec<FileMetadata> {
79        match self {
80            Manifest::V1(manifest) => manifest.file_metadata.clone(),
81        }
82    }
83
84    pub fn next_checkpoint_seq_num(&self) -> u64 {
85        match self {
86            Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
87        }
88    }
89
90    pub fn update(&mut self, checkpoint_sequence_number: u64, file_metadata: FileMetadata) {
91        match self {
92            Manifest::V1(manifest) => {
93                manifest.file_metadata.push(file_metadata);
94                manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
95            }
96        }
97    }
98
99    pub fn file_path() -> Path {
100        Path::from(MANIFEST_FILENAME)
101    }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
105pub struct CheckpointUpdates {
106    file_metadata: FileMetadata,
107    manifest: Manifest,
108}
109
110impl CheckpointUpdates {
111    pub fn new(
112        checkpoint_sequence_number: u64,
113        file_metadata: FileMetadata,
114        manifest: &mut Manifest,
115    ) -> Self {
116        manifest.update(checkpoint_sequence_number, file_metadata.clone());
117        CheckpointUpdates {
118            file_metadata,
119            manifest: manifest.clone(),
120        }
121    }
122
123    pub fn file_path(&self) -> Path {
124        self.file_metadata.file_path()
125    }
126
127    pub fn manifest_file_path(&self) -> Path {
128        Path::from(MANIFEST_FILENAME)
129    }
130}
131
132pub fn create_file_metadata(
133    file_path: &std::path::Path,
134    checkpoint_seq_range: Range<u64>,
135) -> Result<FileMetadata> {
136    let sha3_digest = compute_sha3_checksum(file_path)?;
137    let file_metadata = FileMetadata {
138        checkpoint_seq_range,
139        sha3_digest,
140    };
141    Ok(file_metadata)
142}
143
144pub fn create_file_metadata_from_bytes(
145    contents: Bytes,
146    checkpoint_seq_range: Range<u64>,
147) -> Result<FileMetadata> {
148    let sha3_digest = compute_sha3_checksum_for_bytes(contents)?;
149    let file_metadata = FileMetadata {
150        checkpoint_seq_range,
151        sha3_digest,
152    };
153    Ok(file_metadata)
154}
155
156/// Reads the manifest file from the store.
157pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
158    let vec = get(&remote_store, &Manifest::file_path()).await?.to_vec();
159    read_manifest_from_bytes(vec)
160}
161
162/// Reads the manifest file from the given byte vector and verifies the
163/// integrity of the file.
164pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
165    read_magic_blob(vec, MANIFEST_FILE_MAGIC, MANIFEST_FILENAME)
166}
167
168/// Computes the SHA3 checksum of the Manifest and writes it to a byte vector.
169pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
170    finalize_magic_blob(&manifest, MANIFEST_FILE_MAGIC)
171}
172
173/// Writes the Manifest to the remote store.
174pub async fn write_manifest<S: ObjectStorePutExt>(
175    manifest: Manifest,
176    remote_store: S,
177) -> Result<()> {
178    let bytes = finalize_manifest(manifest)?;
179    put(&remote_store, &Manifest::file_path(), bytes).await?;
180    Ok(())
181}
182
183pub async fn verify_historical_checkpoints_with_checksums(
184    remote_store_config: ObjectStoreConfig,
185    concurrency: usize,
186) -> Result<()> {
187    let config = HistoricalReaderConfig {
188        remote_store_config,
189        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
190        use_for_pruning_watermark: false,
191    };
192    // Gets the Manifest from the remote store.
193    let reader = HistoricalReader::new(config)?;
194    reader.sync_manifest_once().await?;
195    let manifest = reader.get_manifest().await;
196    info!(
197        "next checkpoint in archive store: {}",
198        manifest.next_checkpoint_seq_num()
199    );
200
201    let file_metadata = reader.verify_and_get_manifest_files(manifest)?;
202
203    // Account for both summary and content files
204    let num_files = file_metadata.len() * 2;
205    reader.verify_file_consistency(file_metadata).await?;
206    info!("all {num_files} files are valid");
207    Ok(())
208}