iota_data_ingestion_core/history/
manifest.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Handle the manifest for historical checkpoint data.
6//!
7//! MANIFEST File Disk Format
8//! ┌──────────────────────────────┐
9//! │        magic<4 byte>         │
10//! ├──────────────────────────────┤
11//! │   serialized manifest        │
12//! ├──────────────────────────────┤
13//! │      sha3 <32 bytes>         │
14//! └──────────────────────────────┘
15
16use std::{
17    io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
18    num::NonZeroUsize,
19    ops::Range,
20};
21
22use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
23use bytes::Bytes;
24use fastcrypto::hash::{HashFunction, Sha3_256};
25use iota_config::{
26    node::ArchiveReaderConfig as HistoricalReaderConfig, object_storage_config::ObjectStoreConfig,
27};
28use iota_storage::{
29    SHA3_BYTES,
30    blob::{Blob, BlobEncoding},
31    compute_sha3_checksum, compute_sha3_checksum_for_bytes,
32    object_store::{
33        ObjectStoreGetExt, ObjectStorePutExt,
34        util::{get, put},
35    },
36};
37use object_store::path::Path;
38use serde::{Deserialize, Serialize};
39use tracing::info;
40
41use crate::{
42    IngestionError,
43    errors::IngestionResult as Result,
44    history::{
45        CHECKPOINT_FILE_SUFFIX, MAGIC_BYTES, MANIFEST_FILE_MAGIC, MANIFEST_FILENAME,
46        reader::HistoricalReader,
47    },
48};
49
50#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
51pub struct FileMetadata {
52    pub checkpoint_seq_range: Range<u64>,
53    pub sha3_digest: [u8; 32],
54}
55
56impl FileMetadata {
57    pub fn file_path(&self) -> Path {
58        Path::from(format!(
59            "{}.{CHECKPOINT_FILE_SUFFIX}",
60            self.checkpoint_seq_range.start
61        ))
62    }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
66pub struct ManifestV1 {
67    pub archive_version: u8,
68    pub next_checkpoint_seq_num: u64,
69    pub file_metadata: Vec<FileMetadata>,
70}
71
72#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
73#[non_exhaustive]
74pub enum Manifest {
75    V1(ManifestV1),
76}
77
78impl Manifest {
79    pub fn new(next_checkpoint_seq_num: u64) -> Self {
80        Manifest::V1(ManifestV1 {
81            archive_version: 1,
82            next_checkpoint_seq_num,
83            file_metadata: vec![],
84        })
85    }
86
87    pub fn to_files(&self) -> Vec<FileMetadata> {
88        match self {
89            Manifest::V1(manifest) => manifest.file_metadata.clone(),
90        }
91    }
92
93    pub fn next_checkpoint_seq_num(&self) -> u64 {
94        match self {
95            Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
96        }
97    }
98
99    pub fn update(&mut self, checkpoint_sequence_number: u64, file_metadata: FileMetadata) {
100        match self {
101            Manifest::V1(manifest) => {
102                manifest.file_metadata.push(file_metadata);
103                manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
104            }
105        }
106    }
107
108    pub fn file_path() -> Path {
109        Path::from(MANIFEST_FILENAME)
110    }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
114pub struct CheckpointUpdates {
115    file_metadata: FileMetadata,
116    manifest: Manifest,
117}
118
119impl CheckpointUpdates {
120    pub fn new(
121        checkpoint_sequence_number: u64,
122        file_metadata: FileMetadata,
123        manifest: &mut Manifest,
124    ) -> Self {
125        manifest.update(checkpoint_sequence_number, file_metadata.clone());
126        CheckpointUpdates {
127            file_metadata,
128            manifest: manifest.clone(),
129        }
130    }
131
132    pub fn file_path(&self) -> Path {
133        self.file_metadata.file_path()
134    }
135
136    pub fn manifest_file_path(&self) -> Path {
137        Path::from(MANIFEST_FILENAME)
138    }
139}
140
141pub fn create_file_metadata(
142    file_path: &std::path::Path,
143    checkpoint_seq_range: Range<u64>,
144) -> Result<FileMetadata> {
145    let sha3_digest = compute_sha3_checksum(file_path)?;
146    let file_metadata = FileMetadata {
147        checkpoint_seq_range,
148        sha3_digest,
149    };
150    Ok(file_metadata)
151}
152
153pub fn create_file_metadata_from_bytes(
154    contents: Bytes,
155    checkpoint_seq_range: Range<u64>,
156) -> Result<FileMetadata> {
157    let sha3_digest = compute_sha3_checksum_for_bytes(contents)?;
158    let file_metadata = FileMetadata {
159        checkpoint_seq_range,
160        sha3_digest,
161    };
162    Ok(file_metadata)
163}
164
165/// Reads the manifest file from the store.
166pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
167    let vec = get(&remote_store, &Manifest::file_path()).await?.to_vec();
168    read_manifest_from_bytes(vec)
169}
170
171/// Reads the manifest file from the given byte vector and verifies the
172/// integrity of the file.
173pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
174    let manifest_file_size = vec.len();
175    let mut manifest_reader = Cursor::new(vec);
176
177    // Reads from the beginning of the file and verifies the magic byte
178    // is MANIFEST_FILE_MAGIC
179    manifest_reader.rewind()?;
180    let magic = manifest_reader.read_u32::<BigEndian>()?;
181    if magic != MANIFEST_FILE_MAGIC {
182        return Err(IngestionError::HistoryRead(format!(
183            "unexpected magic byte in manifest: {magic}",
184        )));
185    }
186
187    // Reads from the end of the file and gets the SHA3 checksum
188    // of the content.
189    manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
190    let mut sha3_digest = [0u8; SHA3_BYTES];
191    manifest_reader.read_exact(&mut sha3_digest)?;
192
193    // Reads the content of the file and verifies the SHA3 checksum
194    // of the content matches the stored checksum.
195    manifest_reader.rewind()?;
196    let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
197    manifest_reader.read_exact(&mut content_buf)?;
198    let mut hasher = Sha3_256::default();
199    hasher.update(&content_buf);
200    let computed_digest = hasher.finalize().digest;
201    if computed_digest != sha3_digest {
202        return Err(IngestionError::HistoryRead(format!(
203            "manifest corrupted, computed checksum: {computed_digest:?}, stored checksum: {sha3_digest:?}"
204        )));
205    }
206    manifest_reader.rewind()?;
207    manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
208    Ok(Blob::read(&mut manifest_reader)?.decode()?)
209}
210
211/// Computes the SHA3 checksum of the Manifest and writes it to a byte vector.
212pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
213    let mut buf = BufWriter::new(vec![]);
214    buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
215    let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
216    blob.write(&mut buf)?;
217    buf.flush()?;
218    let mut hasher = Sha3_256::default();
219    hasher.update(buf.get_ref());
220    let computed_digest = hasher.finalize().digest;
221    buf.write_all(&computed_digest)?;
222    Ok(Bytes::from(buf.into_inner().map_err(|e| e.into_error())?))
223}
224
225/// Writes the Manifest to the remote store.
226pub async fn write_manifest<S: ObjectStorePutExt>(
227    manifest: Manifest,
228    remote_store: S,
229) -> Result<()> {
230    let bytes = finalize_manifest(manifest)?;
231    put(&remote_store, &Manifest::file_path(), bytes).await?;
232    Ok(())
233}
234
235pub async fn verify_historical_checkpoints_with_checksums(
236    remote_store_config: ObjectStoreConfig,
237    concurrency: usize,
238) -> Result<()> {
239    let config = HistoricalReaderConfig {
240        remote_store_config,
241        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
242        use_for_pruning_watermark: false,
243    };
244    // Gets the Manifest from the remote store.
245    let reader = HistoricalReader::new(config)?;
246    reader.sync_manifest_once().await?;
247    let manifest = reader.get_manifest().await;
248    info!(
249        "next checkpoint in archive store: {}",
250        manifest.next_checkpoint_seq_num()
251    );
252
253    let file_metadata = reader.verify_and_get_manifest_files(manifest)?;
254
255    // Account for both summary and content files
256    let num_files = file_metadata.len() * 2;
257    reader.verify_file_consistency(file_metadata).await?;
258    info!("all {num_files} files are valid");
259    Ok(())
260}