iota_data_ingestion_core/history/
manifest.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Handle the manifest for historical checkpoint data.
6//!
7//! MANIFEST File Disk Format
8//! ┌──────────────────────────────┐
9//! │        magic<4 byte>         │
10//! ├──────────────────────────────┤
11//! │   serialized manifest        │
12//! ├──────────────────────────────┤
13//! │      sha3 <32 bytes>         │
14//! └──────────────────────────────┘
15
16use std::{
17    io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
18    num::NonZeroUsize,
19    ops::Range,
20};
21
22use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
23use bytes::Bytes;
24use fastcrypto::hash::{HashFunction, Sha3_256};
25use iota_config::{
26    node::ArchiveReaderConfig as HistoricalReaderConfig, object_storage_config::ObjectStoreConfig,
27};
28use iota_storage::{
29    SHA3_BYTES,
30    blob::{Blob, BlobEncoding},
31    compute_sha3_checksum, compute_sha3_checksum_for_bytes,
32    object_store::{
33        ObjectStoreGetExt, ObjectStorePutExt,
34        util::{get, put},
35    },
36};
37use object_store::path::Path;
38use serde::{Deserialize, Serialize};
39use tracing::info;
40
41use crate::{
42    IngestionError,
43    errors::IngestionResult as Result,
44    history::{
45        CHECKPOINT_FILE_SUFFIX, HISTORICAL_DIR_NAME, INGESTION_DIR_NAME, MAGIC_BYTES,
46        MANIFEST_FILE_MAGIC, MANIFEST_FILENAME, reader::HistoricalReader,
47    },
48};
49
50#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
51pub struct FileMetadata {
52    pub checkpoint_seq_range: Range<u64>,
53    pub sha3_digest: [u8; 32],
54}
55
56impl FileMetadata {
57    pub fn file_path(&self) -> Path {
58        Path::from(INGESTION_DIR_NAME)
59            .child(HISTORICAL_DIR_NAME)
60            .child(format!(
61                "{}.{CHECKPOINT_FILE_SUFFIX}",
62                self.checkpoint_seq_range.start
63            ))
64    }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
68pub struct ManifestV1 {
69    pub archive_version: u8,
70    pub next_checkpoint_seq_num: u64,
71    pub file_metadata: Vec<FileMetadata>,
72}
73
74#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
75#[non_exhaustive]
76pub enum Manifest {
77    V1(ManifestV1),
78}
79
80impl Manifest {
81    pub fn new(next_checkpoint_seq_num: u64) -> Self {
82        Manifest::V1(ManifestV1 {
83            archive_version: 1,
84            next_checkpoint_seq_num,
85            file_metadata: vec![],
86        })
87    }
88
89    pub fn to_files(&self) -> Vec<FileMetadata> {
90        match self {
91            Manifest::V1(manifest) => manifest.file_metadata.clone(),
92        }
93    }
94
95    pub fn next_checkpoint_seq_num(&self) -> u64 {
96        match self {
97            Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
98        }
99    }
100
101    pub fn update(&mut self, checkpoint_sequence_number: u64, file_metadata: FileMetadata) {
102        match self {
103            Manifest::V1(manifest) => {
104                manifest.file_metadata.push(file_metadata);
105                manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
106            }
107        }
108    }
109
110    pub fn file_path() -> Path {
111        Path::from(INGESTION_DIR_NAME)
112            .child(HISTORICAL_DIR_NAME)
113            .child(MANIFEST_FILENAME)
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
118pub struct CheckpointUpdates {
119    file_metadata: FileMetadata,
120    manifest: Manifest,
121}
122
123impl CheckpointUpdates {
124    pub fn new(
125        checkpoint_sequence_number: u64,
126        file_metadata: FileMetadata,
127        manifest: &mut Manifest,
128    ) -> Self {
129        manifest.update(checkpoint_sequence_number, file_metadata.clone());
130        CheckpointUpdates {
131            file_metadata,
132            manifest: manifest.clone(),
133        }
134    }
135
136    pub fn file_path(&self) -> Path {
137        self.file_metadata.file_path()
138    }
139
140    pub fn manifest_file_path(&self) -> Path {
141        Path::from(MANIFEST_FILENAME)
142    }
143}
144
145pub fn create_file_metadata(
146    file_path: &std::path::Path,
147    checkpoint_seq_range: Range<u64>,
148) -> Result<FileMetadata> {
149    let sha3_digest = compute_sha3_checksum(file_path)?;
150    let file_metadata = FileMetadata {
151        checkpoint_seq_range,
152        sha3_digest,
153    };
154    Ok(file_metadata)
155}
156
157pub fn create_file_metadata_from_bytes(
158    contents: Bytes,
159    checkpoint_seq_range: Range<u64>,
160) -> Result<FileMetadata> {
161    let sha3_digest = compute_sha3_checksum_for_bytes(contents)?;
162    let file_metadata = FileMetadata {
163        checkpoint_seq_range,
164        sha3_digest,
165    };
166    Ok(file_metadata)
167}
168
169/// Reads the manifest file from the store.
170pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
171    let vec = get(&remote_store, &Manifest::file_path()).await?.to_vec();
172    read_manifest_from_bytes(vec)
173}
174
175/// Reads the manifest file from the given byte vector and verifies the
176/// integrity of the file.
177pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
178    let manifest_file_size = vec.len();
179    let mut manifest_reader = Cursor::new(vec);
180
181    // Reads from the beginning of the file and verifies the magic byte
182    // is MANIFEST_FILE_MAGIC
183    manifest_reader.rewind()?;
184    let magic = manifest_reader.read_u32::<BigEndian>()?;
185    if magic != MANIFEST_FILE_MAGIC {
186        return Err(IngestionError::HistoryRead(format!(
187            "unexpected magic byte in manifest: {magic}",
188        )));
189    }
190
191    // Reads from the end of the file and gets the SHA3 checksum
192    // of the content.
193    manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
194    let mut sha3_digest = [0u8; SHA3_BYTES];
195    manifest_reader.read_exact(&mut sha3_digest)?;
196
197    // Reads the content of the file and verifies the SHA3 checksum
198    // of the content matches the stored checksum.
199    manifest_reader.rewind()?;
200    let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
201    manifest_reader.read_exact(&mut content_buf)?;
202    let mut hasher = Sha3_256::default();
203    hasher.update(&content_buf);
204    let computed_digest = hasher.finalize().digest;
205    if computed_digest != sha3_digest {
206        return Err(IngestionError::HistoryRead(format!(
207            "manifest corrupted, computed checksum: {computed_digest:?}, stored checksum: {sha3_digest:?}"
208        )));
209    }
210    manifest_reader.rewind()?;
211    manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
212    Ok(Blob::read(&mut manifest_reader)?.decode()?)
213}
214
215/// Computes the SHA3 checksum of the Manifest and writes it to a byte vector.
216pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
217    let mut buf = BufWriter::new(vec![]);
218    buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
219    let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
220    blob.write(&mut buf)?;
221    buf.flush()?;
222    let mut hasher = Sha3_256::default();
223    hasher.update(buf.get_ref());
224    let computed_digest = hasher.finalize().digest;
225    buf.write_all(&computed_digest)?;
226    Ok(Bytes::from(buf.into_inner().map_err(|e| e.into_error())?))
227}
228
229/// Writes the Manifest to the remote store.
230pub async fn write_manifest<S: ObjectStorePutExt>(
231    manifest: Manifest,
232    remote_store: S,
233) -> Result<()> {
234    let bytes = finalize_manifest(manifest)?;
235    put(&remote_store, &Manifest::file_path(), bytes).await?;
236    Ok(())
237}
238
239pub async fn verify_historical_checkpoints_with_checksums(
240    remote_store_config: ObjectStoreConfig,
241    concurrency: usize,
242) -> Result<()> {
243    let config = HistoricalReaderConfig {
244        remote_store_config,
245        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
246        use_for_pruning_watermark: false,
247    };
248    // Gets the Manifest from the remote store.
249    let reader = HistoricalReader::new(config)?;
250    reader.sync_manifest_once().await?;
251    let manifest = reader.get_manifest().await;
252    info!(
253        "next checkpoint in archive store: {}",
254        manifest.next_checkpoint_seq_num()
255    );
256
257    let file_metadata = reader.verify_and_get_manifest_files(manifest)?;
258
259    // Account for both summary and content files
260    let num_files = file_metadata.len() * 2;
261    reader.verify_file_consistency(file_metadata).await?;
262    info!("all {num_files} files are valid");
263    Ok(())
264}