iota_data_ingestion_core/history/
manifest.rs1use std::{
17 io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
18 num::NonZeroUsize,
19 ops::Range,
20};
21
22use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
23use bytes::Bytes;
24use fastcrypto::hash::{HashFunction, Sha3_256};
25use iota_config::{
26 node::ArchiveReaderConfig as HistoricalReaderConfig, object_storage_config::ObjectStoreConfig,
27};
28use iota_storage::{
29 SHA3_BYTES,
30 blob::{Blob, BlobEncoding},
31 compute_sha3_checksum, compute_sha3_checksum_for_bytes,
32 object_store::{
33 ObjectStoreGetExt, ObjectStorePutExt,
34 util::{get, put},
35 },
36};
37use object_store::path::Path;
38use serde::{Deserialize, Serialize};
39use tracing::info;
40
41use crate::{
42 IngestionError,
43 errors::IngestionResult as Result,
44 history::{
45 CHECKPOINT_FILE_SUFFIX, HISTORICAL_DIR_NAME, INGESTION_DIR_NAME, MAGIC_BYTES,
46 MANIFEST_FILE_MAGIC, MANIFEST_FILENAME, reader::HistoricalReader,
47 },
48};
49
50#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
51pub struct FileMetadata {
52 pub checkpoint_seq_range: Range<u64>,
53 pub sha3_digest: [u8; 32],
54}
55
56impl FileMetadata {
57 pub fn file_path(&self) -> Path {
58 Path::from(INGESTION_DIR_NAME)
59 .child(HISTORICAL_DIR_NAME)
60 .child(format!(
61 "{}.{CHECKPOINT_FILE_SUFFIX}",
62 self.checkpoint_seq_range.start
63 ))
64 }
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
68pub struct ManifestV1 {
69 pub archive_version: u8,
70 pub next_checkpoint_seq_num: u64,
71 pub file_metadata: Vec<FileMetadata>,
72}
73
74#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
75#[non_exhaustive]
76pub enum Manifest {
77 V1(ManifestV1),
78}
79
80impl Manifest {
81 pub fn new(next_checkpoint_seq_num: u64) -> Self {
82 Manifest::V1(ManifestV1 {
83 archive_version: 1,
84 next_checkpoint_seq_num,
85 file_metadata: vec![],
86 })
87 }
88
89 pub fn to_files(&self) -> Vec<FileMetadata> {
90 match self {
91 Manifest::V1(manifest) => manifest.file_metadata.clone(),
92 }
93 }
94
95 pub fn next_checkpoint_seq_num(&self) -> u64 {
96 match self {
97 Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
98 }
99 }
100
101 pub fn update(&mut self, checkpoint_sequence_number: u64, file_metadata: FileMetadata) {
102 match self {
103 Manifest::V1(manifest) => {
104 manifest.file_metadata.push(file_metadata);
105 manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
106 }
107 }
108 }
109
110 pub fn file_path() -> Path {
111 Path::from(INGESTION_DIR_NAME)
112 .child(HISTORICAL_DIR_NAME)
113 .child(MANIFEST_FILENAME)
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
118pub struct CheckpointUpdates {
119 file_metadata: FileMetadata,
120 manifest: Manifest,
121}
122
123impl CheckpointUpdates {
124 pub fn new(
125 checkpoint_sequence_number: u64,
126 file_metadata: FileMetadata,
127 manifest: &mut Manifest,
128 ) -> Self {
129 manifest.update(checkpoint_sequence_number, file_metadata.clone());
130 CheckpointUpdates {
131 file_metadata,
132 manifest: manifest.clone(),
133 }
134 }
135
136 pub fn file_path(&self) -> Path {
137 self.file_metadata.file_path()
138 }
139
140 pub fn manifest_file_path(&self) -> Path {
141 Path::from(MANIFEST_FILENAME)
142 }
143}
144
145pub fn create_file_metadata(
146 file_path: &std::path::Path,
147 checkpoint_seq_range: Range<u64>,
148) -> Result<FileMetadata> {
149 let sha3_digest = compute_sha3_checksum(file_path)?;
150 let file_metadata = FileMetadata {
151 checkpoint_seq_range,
152 sha3_digest,
153 };
154 Ok(file_metadata)
155}
156
157pub fn create_file_metadata_from_bytes(
158 contents: Bytes,
159 checkpoint_seq_range: Range<u64>,
160) -> Result<FileMetadata> {
161 let sha3_digest = compute_sha3_checksum_for_bytes(contents)?;
162 let file_metadata = FileMetadata {
163 checkpoint_seq_range,
164 sha3_digest,
165 };
166 Ok(file_metadata)
167}
168
169pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
171 let vec = get(&remote_store, &Manifest::file_path()).await?.to_vec();
172 read_manifest_from_bytes(vec)
173}
174
175pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
178 let manifest_file_size = vec.len();
179 let mut manifest_reader = Cursor::new(vec);
180
181 manifest_reader.rewind()?;
184 let magic = manifest_reader.read_u32::<BigEndian>()?;
185 if magic != MANIFEST_FILE_MAGIC {
186 return Err(IngestionError::HistoryRead(format!(
187 "unexpected magic byte in manifest: {magic}",
188 )));
189 }
190
191 manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
194 let mut sha3_digest = [0u8; SHA3_BYTES];
195 manifest_reader.read_exact(&mut sha3_digest)?;
196
197 manifest_reader.rewind()?;
200 let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
201 manifest_reader.read_exact(&mut content_buf)?;
202 let mut hasher = Sha3_256::default();
203 hasher.update(&content_buf);
204 let computed_digest = hasher.finalize().digest;
205 if computed_digest != sha3_digest {
206 return Err(IngestionError::HistoryRead(format!(
207 "manifest corrupted, computed checksum: {computed_digest:?}, stored checksum: {sha3_digest:?}"
208 )));
209 }
210 manifest_reader.rewind()?;
211 manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
212 Ok(Blob::read(&mut manifest_reader)?.decode()?)
213}
214
215pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
217 let mut buf = BufWriter::new(vec![]);
218 buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
219 let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
220 blob.write(&mut buf)?;
221 buf.flush()?;
222 let mut hasher = Sha3_256::default();
223 hasher.update(buf.get_ref());
224 let computed_digest = hasher.finalize().digest;
225 buf.write_all(&computed_digest)?;
226 Ok(Bytes::from(buf.into_inner().map_err(|e| e.into_error())?))
227}
228
229pub async fn write_manifest<S: ObjectStorePutExt>(
231 manifest: Manifest,
232 remote_store: S,
233) -> Result<()> {
234 let bytes = finalize_manifest(manifest)?;
235 put(&remote_store, &Manifest::file_path(), bytes).await?;
236 Ok(())
237}
238
239pub async fn verify_historical_checkpoints_with_checksums(
240 remote_store_config: ObjectStoreConfig,
241 concurrency: usize,
242) -> Result<()> {
243 let config = HistoricalReaderConfig {
244 remote_store_config,
245 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
246 use_for_pruning_watermark: false,
247 };
248 let reader = HistoricalReader::new(config)?;
250 reader.sync_manifest_once().await?;
251 let manifest = reader.get_manifest().await;
252 info!(
253 "next checkpoint in archive store: {}",
254 manifest.next_checkpoint_seq_num()
255 );
256
257 let file_metadata = reader.verify_and_get_manifest_files(manifest)?;
258
259 let num_files = file_metadata.len() * 2;
261 reader.verify_file_consistency(file_metadata).await?;
262 info!("all {num_files} files are valid");
263 Ok(())
264}