Skip to main content

iota_data_ingestion_core/history/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4//! Handle historical checkpoint data.
5//!
6//! Full checkpoint data for epochs starting from genesis are persisted in
7//! batches as blob files in a remote store.
8//!
9//! Files are optionally compressed with the zstd
10//! compression format. Filenames follow the format <checkpoint_seq_num>.chk
11//! where `checkpoint_seq_num` is the first checkpoint present in that
12//! file. MANIFEST is the index and source of truth for all files present in the
13//! ingestion source history.
14//!
15//! EPOCH_BOUNDARIES holds the map between the epochs and the sequence number of
16//! the respective last checkpoint. This allows reading directly the last
17//! checkpoints from the store, which is useful for verification purposes.
18//!
19//! Ingestion Source History Directory Layout
20//! ```text
21//!  - ingestion/
22//!     - historical/
23//!          - MANIFEST
24//!          - EPOCH_BOUNDARIES
25//!          - 0.chk
26//!          - 1000.chk
27//!          - 3000.chk
28//!          - ...
29//!          - 100000.chk
30//!
31//! Blob File Disk Format
32//! ┌──────────────────────────────┐
33//! │       magic <4 byte>         │
34//! ├──────────────────────────────┤
35//! │  storage format <1 byte>     │
36//! ├──────────────────────────────┤
37//! │    file compression <1 byte> │
38//! ├──────────────────────────────┤
39//! │ ┌──────────────────────────┐ │
40//! │ │         Blob 1           │ │
41//! │ ├──────────────────────────┤ │
42//! │ │          ...             │ │
43//! │ ├──────────────────────────┤ │
44//! │ │        Blob N            │ │
45//! │ └──────────────────────────┘ │
46//! └──────────────────────────────┘
47//! Blob
48//! ┌───────────────┬───────────────────┬──────────────┐
49//! │ len <uvarint> │ encoding <1 byte> │ data <bytes> │
50//! └───────────────┴───────────────────┴──────────────┘
51//!
52//! MANIFEST and EPOCH_BOUNDARIES File Disk Format
53//! ┌──────────────────────────────┐
54//! │        magic<4 byte>         │
55//! ├──────────────────────────────┤
56//! │   serialized contents        │
57//! ├──────────────────────────────┤
58//! │      sha3 <32 bytes>         │
59//! └──────────────────────────────┘
60//! ```
61
62use std::io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write};
63
64use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
65use bytes::Bytes;
66use fastcrypto::hash::{HashFunction, Sha3_256};
67use iota_storage::{
68    SHA3_BYTES,
69    blob::{Blob, BlobEncoding},
70};
71use serde::{Serialize, de::DeserializeOwned};
72
73use crate::{IngestionError, errors::IngestionResult as Result};
74
75pub mod epoch_boundaries;
76pub mod manifest;
77pub mod reader;
78pub mod verifier;
79
80pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000BEEF;
81pub const CHECKPOINT_FILE_SUFFIX: &str = "chk";
82pub const MAGIC_BYTES: usize = 4;
83pub const MANIFEST_FILE_MAGIC: u32 = 0x0000FACE;
84pub const MANIFEST_FILENAME: &str = "MANIFEST";
85pub const EPOCH_BOUNDARIES_FILE_MAGIC: u32 = 0x0000E90C;
86pub const EPOCH_BOUNDARIES_FILENAME: &str = "EPOCH_BOUNDARIES";
87
88/// Encodes `value` as a BCS [`Blob`] framed with a 4-byte `magic` prefix and a
89/// trailing SHA3-256 checksum computed over the magic and the blob.
90///
91/// This is the on-disk format shared by the MANIFEST and EPOCH_BOUNDARIES
92/// files.
93pub(crate) fn finalize_magic_blob<T: Serialize>(value: &T, magic: u32) -> Result<Bytes> {
94    let mut buf = BufWriter::new(vec![]);
95    buf.write_u32::<BigEndian>(magic)?;
96    Blob::encode(value, BlobEncoding::Bcs)?.write(&mut buf)?;
97    buf.flush()?;
98    let mut hasher = Sha3_256::default();
99    hasher.update(buf.get_ref());
100    let computed_digest = hasher.finalize().digest;
101    buf.write_all(&computed_digest)?;
102    Ok(Bytes::from(buf.into_inner().map_err(|e| e.into_error())?))
103}
104
105/// Decodes a value written by [`finalize_magic_blob`] and verifies its
106/// integrity.
107///
108/// `filename` names the file in error messages (e.g. `"manifest"`).
109///
110/// # Errors
111///
112/// Fails if the `magic` prefix or the trailing SHA3-256 checksum does not
113/// match.
114pub(crate) fn read_magic_blob<T: DeserializeOwned>(
115    vec: Vec<u8>,
116    magic: u32,
117    filename: &str,
118) -> Result<T> {
119    let file_size = vec.len();
120    let mut reader = Cursor::new(vec);
121
122    // Reads from the beginning of the file and verifies the magic byte.
123    let found = reader.read_u32::<BigEndian>()?;
124    if found != magic {
125        return Err(IngestionError::HistoryRead(format!(
126            "unexpected magic byte in {filename}: {found}",
127        )));
128    }
129
130    // Reads the SHA3 checksum stored at the end of the file.
131    reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
132    let mut sha3_digest = [0u8; SHA3_BYTES];
133    reader.read_exact(&mut sha3_digest)?;
134
135    // Reads the content and verifies it against the stored checksum.
136    reader.rewind()?;
137    let mut content_buf = vec![0u8; file_size - SHA3_BYTES];
138    reader.read_exact(&mut content_buf)?;
139    let mut hasher = Sha3_256::default();
140    hasher.update(&content_buf);
141    let computed_digest = hasher.finalize().digest;
142    if computed_digest != sha3_digest {
143        return Err(IngestionError::HistoryRead(format!(
144            "{filename} corrupted, computed checksum: {computed_digest:?}, stored checksum: {sha3_digest:?}"
145        )));
146    }
147
148    reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
149    Ok(Blob::read(&mut reader)?.decode()?)
150}