iota_data_ingestion_core/history/mod.rs
1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4//! Handle historical checkpoint data.
5//!
6//! Full checkpoint data for epochs starting from genesis are persisted in
7//! batches as blob files in a remote store.
8//!
9//! Files are optionally compressed with the zstd
10//! compression format. Filenames follow the format <checkpoint_seq_num>.chk
11//! where `checkpoint_seq_num` is the first checkpoint present in that
12//! file. MANIFEST is the index and source of truth for all files present in the
13//! ingestion source history.
14//!
15//! EPOCH_BOUNDARIES holds the map between the epochs and the sequence number of
16//! the respective last checkpoint. This allows reading directly the last
17//! checkpoints from the store, which is useful for verification purposes.
18//!
19//! Ingestion Source History Directory Layout
20//! ```text
21//! - ingestion/
22//! - historical/
23//! - MANIFEST
24//! - EPOCH_BOUNDARIES
25//! - 0.chk
26//! - 1000.chk
27//! - 3000.chk
28//! - ...
29//! - 100000.chk
30//!
31//! Blob File Disk Format
32//! ┌──────────────────────────────┐
33//! │ magic <4 byte> │
34//! ├──────────────────────────────┤
35//! │ storage format <1 byte> │
36//! ├──────────────────────────────┤
37//! │ file compression <1 byte> │
38//! ├──────────────────────────────┤
39//! │ ┌──────────────────────────┐ │
40//! │ │ Blob 1 │ │
41//! │ ├──────────────────────────┤ │
42//! │ │ ... │ │
43//! │ ├──────────────────────────┤ │
44//! │ │ Blob N │ │
45//! │ └──────────────────────────┘ │
46//! └──────────────────────────────┘
47//! Blob
48//! ┌───────────────┬───────────────────┬──────────────┐
49//! │ len <uvarint> │ encoding <1 byte> │ data <bytes> │
50//! └───────────────┴───────────────────┴──────────────┘
51//!
52//! MANIFEST and EPOCH_BOUNDARIES File Disk Format
53//! ┌──────────────────────────────┐
54//! │ magic<4 byte> │
55//! ├──────────────────────────────┤
56//! │ serialized contents │
57//! ├──────────────────────────────┤
58//! │ sha3 <32 bytes> │
59//! └──────────────────────────────┘
60//! ```
61
62use std::io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write};
63
64use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
65use bytes::Bytes;
66use fastcrypto::hash::{HashFunction, Sha3_256};
67use iota_storage::{
68 SHA3_BYTES,
69 blob::{Blob, BlobEncoding},
70};
71use serde::{Serialize, de::DeserializeOwned};
72
73use crate::{IngestionError, errors::IngestionResult as Result};
74
75pub mod epoch_boundaries;
76pub mod manifest;
77pub mod reader;
78pub mod verifier;
79
80pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000BEEF;
81pub const CHECKPOINT_FILE_SUFFIX: &str = "chk";
82pub const MAGIC_BYTES: usize = 4;
83pub const MANIFEST_FILE_MAGIC: u32 = 0x0000FACE;
84pub const MANIFEST_FILENAME: &str = "MANIFEST";
85pub const EPOCH_BOUNDARIES_FILE_MAGIC: u32 = 0x0000E90C;
86pub const EPOCH_BOUNDARIES_FILENAME: &str = "EPOCH_BOUNDARIES";
87
88/// Encodes `value` as a BCS [`Blob`] framed with a 4-byte `magic` prefix and a
89/// trailing SHA3-256 checksum computed over the magic and the blob.
90///
91/// This is the on-disk format shared by the MANIFEST and EPOCH_BOUNDARIES
92/// files.
93pub(crate) fn finalize_magic_blob<T: Serialize>(value: &T, magic: u32) -> Result<Bytes> {
94 let mut buf = BufWriter::new(vec![]);
95 buf.write_u32::<BigEndian>(magic)?;
96 Blob::encode(value, BlobEncoding::Bcs)?.write(&mut buf)?;
97 buf.flush()?;
98 let mut hasher = Sha3_256::default();
99 hasher.update(buf.get_ref());
100 let computed_digest = hasher.finalize().digest;
101 buf.write_all(&computed_digest)?;
102 Ok(Bytes::from(buf.into_inner().map_err(|e| e.into_error())?))
103}
104
105/// Decodes a value written by [`finalize_magic_blob`] and verifies its
106/// integrity.
107///
108/// `filename` names the file in error messages (e.g. `"manifest"`).
109///
110/// # Errors
111///
112/// Fails if the `magic` prefix or the trailing SHA3-256 checksum does not
113/// match.
114pub(crate) fn read_magic_blob<T: DeserializeOwned>(
115 vec: Vec<u8>,
116 magic: u32,
117 filename: &str,
118) -> Result<T> {
119 let file_size = vec.len();
120 let mut reader = Cursor::new(vec);
121
122 // Reads from the beginning of the file and verifies the magic byte.
123 let found = reader.read_u32::<BigEndian>()?;
124 if found != magic {
125 return Err(IngestionError::HistoryRead(format!(
126 "unexpected magic byte in {filename}: {found}",
127 )));
128 }
129
130 // Reads the SHA3 checksum stored at the end of the file.
131 reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
132 let mut sha3_digest = [0u8; SHA3_BYTES];
133 reader.read_exact(&mut sha3_digest)?;
134
135 // Reads the content and verifies it against the stored checksum.
136 reader.rewind()?;
137 let mut content_buf = vec![0u8; file_size - SHA3_BYTES];
138 reader.read_exact(&mut content_buf)?;
139 let mut hasher = Sha3_256::default();
140 hasher.update(&content_buf);
141 let computed_digest = hasher.finalize().digest;
142 if computed_digest != sha3_digest {
143 return Err(IngestionError::HistoryRead(format!(
144 "{filename} corrupted, computed checksum: {computed_digest:?}, stored checksum: {sha3_digest:?}"
145 )));
146 }
147
148 reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
149 Ok(Blob::read(&mut reader)?.decode()?)
150}