iota_data_ingestion_core/history/
manifest.rs1use std::{
17 io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
18 num::NonZeroUsize,
19 ops::Range,
20};
21
22use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
23use bytes::Bytes;
24use fastcrypto::hash::{HashFunction, Sha3_256};
25use iota_config::{
26 node::ArchiveReaderConfig as HistoricalReaderConfig, object_storage_config::ObjectStoreConfig,
27};
28use iota_storage::{
29 SHA3_BYTES,
30 blob::{Blob, BlobEncoding},
31 compute_sha3_checksum, compute_sha3_checksum_for_bytes,
32 object_store::{
33 ObjectStoreGetExt, ObjectStorePutExt,
34 util::{get, put},
35 },
36};
37use object_store::path::Path;
38use serde::{Deserialize, Serialize};
39use tracing::info;
40
41use crate::{
42 IngestionError,
43 errors::IngestionResult as Result,
44 history::{
45 CHECKPOINT_FILE_SUFFIX, MAGIC_BYTES, MANIFEST_FILE_MAGIC, MANIFEST_FILENAME,
46 reader::HistoricalReader,
47 },
48};
49
50#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
51pub struct FileMetadata {
52 pub checkpoint_seq_range: Range<u64>,
53 pub sha3_digest: [u8; 32],
54}
55
56impl FileMetadata {
57 pub fn file_path(&self) -> Path {
58 Path::from(format!(
59 "{}.{CHECKPOINT_FILE_SUFFIX}",
60 self.checkpoint_seq_range.start
61 ))
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
66pub struct ManifestV1 {
67 pub archive_version: u8,
68 pub next_checkpoint_seq_num: u64,
69 pub file_metadata: Vec<FileMetadata>,
70}
71
72#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
73#[non_exhaustive]
74pub enum Manifest {
75 V1(ManifestV1),
76}
77
78impl Manifest {
79 pub fn new(next_checkpoint_seq_num: u64) -> Self {
80 Manifest::V1(ManifestV1 {
81 archive_version: 1,
82 next_checkpoint_seq_num,
83 file_metadata: vec![],
84 })
85 }
86
87 pub fn to_files(&self) -> Vec<FileMetadata> {
88 match self {
89 Manifest::V1(manifest) => manifest.file_metadata.clone(),
90 }
91 }
92
93 pub fn next_checkpoint_seq_num(&self) -> u64 {
94 match self {
95 Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
96 }
97 }
98
99 pub fn update(&mut self, checkpoint_sequence_number: u64, file_metadata: FileMetadata) {
100 match self {
101 Manifest::V1(manifest) => {
102 manifest.file_metadata.push(file_metadata);
103 manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
104 }
105 }
106 }
107
108 pub fn file_path() -> Path {
109 Path::from(MANIFEST_FILENAME)
110 }
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
114pub struct CheckpointUpdates {
115 file_metadata: FileMetadata,
116 manifest: Manifest,
117}
118
119impl CheckpointUpdates {
120 pub fn new(
121 checkpoint_sequence_number: u64,
122 file_metadata: FileMetadata,
123 manifest: &mut Manifest,
124 ) -> Self {
125 manifest.update(checkpoint_sequence_number, file_metadata.clone());
126 CheckpointUpdates {
127 file_metadata,
128 manifest: manifest.clone(),
129 }
130 }
131
132 pub fn file_path(&self) -> Path {
133 self.file_metadata.file_path()
134 }
135
136 pub fn manifest_file_path(&self) -> Path {
137 Path::from(MANIFEST_FILENAME)
138 }
139}
140
141pub fn create_file_metadata(
142 file_path: &std::path::Path,
143 checkpoint_seq_range: Range<u64>,
144) -> Result<FileMetadata> {
145 let sha3_digest = compute_sha3_checksum(file_path)?;
146 let file_metadata = FileMetadata {
147 checkpoint_seq_range,
148 sha3_digest,
149 };
150 Ok(file_metadata)
151}
152
153pub fn create_file_metadata_from_bytes(
154 contents: Bytes,
155 checkpoint_seq_range: Range<u64>,
156) -> Result<FileMetadata> {
157 let sha3_digest = compute_sha3_checksum_for_bytes(contents)?;
158 let file_metadata = FileMetadata {
159 checkpoint_seq_range,
160 sha3_digest,
161 };
162 Ok(file_metadata)
163}
164
165pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
167 let vec = get(&remote_store, &Manifest::file_path()).await?.to_vec();
168 read_manifest_from_bytes(vec)
169}
170
171pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
174 let manifest_file_size = vec.len();
175 let mut manifest_reader = Cursor::new(vec);
176
177 manifest_reader.rewind()?;
180 let magic = manifest_reader.read_u32::<BigEndian>()?;
181 if magic != MANIFEST_FILE_MAGIC {
182 return Err(IngestionError::HistoryRead(format!(
183 "unexpected magic byte in manifest: {magic}",
184 )));
185 }
186
187 manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
190 let mut sha3_digest = [0u8; SHA3_BYTES];
191 manifest_reader.read_exact(&mut sha3_digest)?;
192
193 manifest_reader.rewind()?;
196 let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
197 manifest_reader.read_exact(&mut content_buf)?;
198 let mut hasher = Sha3_256::default();
199 hasher.update(&content_buf);
200 let computed_digest = hasher.finalize().digest;
201 if computed_digest != sha3_digest {
202 return Err(IngestionError::HistoryRead(format!(
203 "manifest corrupted, computed checksum: {computed_digest:?}, stored checksum: {sha3_digest:?}"
204 )));
205 }
206 manifest_reader.rewind()?;
207 manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
208 Ok(Blob::read(&mut manifest_reader)?.decode()?)
209}
210
211pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
213 let mut buf = BufWriter::new(vec![]);
214 buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
215 let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
216 blob.write(&mut buf)?;
217 buf.flush()?;
218 let mut hasher = Sha3_256::default();
219 hasher.update(buf.get_ref());
220 let computed_digest = hasher.finalize().digest;
221 buf.write_all(&computed_digest)?;
222 Ok(Bytes::from(buf.into_inner().map_err(|e| e.into_error())?))
223}
224
225pub async fn write_manifest<S: ObjectStorePutExt>(
227 manifest: Manifest,
228 remote_store: S,
229) -> Result<()> {
230 let bytes = finalize_manifest(manifest)?;
231 put(&remote_store, &Manifest::file_path(), bytes).await?;
232 Ok(())
233}
234
235pub async fn verify_historical_checkpoints_with_checksums(
236 remote_store_config: ObjectStoreConfig,
237 concurrency: usize,
238) -> Result<()> {
239 let config = HistoricalReaderConfig {
240 remote_store_config,
241 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
242 use_for_pruning_watermark: false,
243 };
244 let reader = HistoricalReader::new(config)?;
246 reader.sync_manifest_once().await?;
247 let manifest = reader.get_manifest().await;
248 info!(
249 "next checkpoint in archive store: {}",
250 manifest.next_checkpoint_seq_num()
251 );
252
253 let file_metadata = reader.verify_and_get_manifest_files(manifest)?;
254
255 let num_files = file_metadata.len() * 2;
257 reader.verify_file_consistency(file_metadata).await?;
258 info!("all {num_files} files are valid");
259 Ok(())
260}