iota_data_ingestion_core/history/
manifest.rs1use std::{num::NonZeroUsize, ops::Range};
17
18use bytes::Bytes;
19use iota_config::{
20 node::ArchiveReaderConfig as HistoricalReaderConfig, object_storage_config::ObjectStoreConfig,
21};
22use iota_storage::{
23 compute_sha3_checksum, compute_sha3_checksum_for_bytes,
24 object_store::{
25 ObjectStoreGetExt, ObjectStorePutExt,
26 util::{get, put},
27 },
28};
29use object_store::path::Path;
30use serde::{Deserialize, Serialize};
31use tracing::info;
32
33use crate::{
34 errors::IngestionResult as Result,
35 history::{
36 CHECKPOINT_FILE_SUFFIX, MANIFEST_FILE_MAGIC, MANIFEST_FILENAME, finalize_magic_blob,
37 read_magic_blob, reader::HistoricalReader,
38 },
39};
40
41#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
42pub struct FileMetadata {
43 pub checkpoint_seq_range: Range<u64>,
44 pub sha3_digest: [u8; 32],
45}
46
47impl FileMetadata {
48 pub fn file_path(&self) -> Path {
49 Path::from(format!(
50 "{}.{CHECKPOINT_FILE_SUFFIX}",
51 self.checkpoint_seq_range.start
52 ))
53 }
54}
55
56#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
57pub struct ManifestV1 {
58 pub archive_version: u8,
59 pub next_checkpoint_seq_num: u64,
60 pub file_metadata: Vec<FileMetadata>,
61}
62
63#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
64#[non_exhaustive]
65pub enum Manifest {
66 V1(ManifestV1),
67}
68
69impl Manifest {
70 pub fn new(next_checkpoint_seq_num: u64) -> Self {
71 Manifest::V1(ManifestV1 {
72 archive_version: 1,
73 next_checkpoint_seq_num,
74 file_metadata: vec![],
75 })
76 }
77
78 pub fn to_files(&self) -> Vec<FileMetadata> {
79 match self {
80 Manifest::V1(manifest) => manifest.file_metadata.clone(),
81 }
82 }
83
84 pub fn next_checkpoint_seq_num(&self) -> u64 {
85 match self {
86 Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
87 }
88 }
89
90 pub fn update(&mut self, checkpoint_sequence_number: u64, file_metadata: FileMetadata) {
91 match self {
92 Manifest::V1(manifest) => {
93 manifest.file_metadata.push(file_metadata);
94 manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
95 }
96 }
97 }
98
99 pub fn file_path() -> Path {
100 Path::from(MANIFEST_FILENAME)
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
105pub struct CheckpointUpdates {
106 file_metadata: FileMetadata,
107 manifest: Manifest,
108}
109
110impl CheckpointUpdates {
111 pub fn new(
112 checkpoint_sequence_number: u64,
113 file_metadata: FileMetadata,
114 manifest: &mut Manifest,
115 ) -> Self {
116 manifest.update(checkpoint_sequence_number, file_metadata.clone());
117 CheckpointUpdates {
118 file_metadata,
119 manifest: manifest.clone(),
120 }
121 }
122
123 pub fn file_path(&self) -> Path {
124 self.file_metadata.file_path()
125 }
126
127 pub fn manifest_file_path(&self) -> Path {
128 Path::from(MANIFEST_FILENAME)
129 }
130}
131
132pub fn create_file_metadata(
133 file_path: &std::path::Path,
134 checkpoint_seq_range: Range<u64>,
135) -> Result<FileMetadata> {
136 let sha3_digest = compute_sha3_checksum(file_path)?;
137 let file_metadata = FileMetadata {
138 checkpoint_seq_range,
139 sha3_digest,
140 };
141 Ok(file_metadata)
142}
143
144pub fn create_file_metadata_from_bytes(
145 contents: Bytes,
146 checkpoint_seq_range: Range<u64>,
147) -> Result<FileMetadata> {
148 let sha3_digest = compute_sha3_checksum_for_bytes(contents)?;
149 let file_metadata = FileMetadata {
150 checkpoint_seq_range,
151 sha3_digest,
152 };
153 Ok(file_metadata)
154}
155
156pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
158 let vec = get(&remote_store, &Manifest::file_path()).await?.to_vec();
159 read_manifest_from_bytes(vec)
160}
161
162pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
165 read_magic_blob(vec, MANIFEST_FILE_MAGIC, MANIFEST_FILENAME)
166}
167
168pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
170 finalize_magic_blob(&manifest, MANIFEST_FILE_MAGIC)
171}
172
173pub async fn write_manifest<S: ObjectStorePutExt>(
175 manifest: Manifest,
176 remote_store: S,
177) -> Result<()> {
178 let bytes = finalize_manifest(manifest)?;
179 put(&remote_store, &Manifest::file_path(), bytes).await?;
180 Ok(())
181}
182
183pub async fn verify_historical_checkpoints_with_checksums(
184 remote_store_config: ObjectStoreConfig,
185 concurrency: usize,
186) -> Result<()> {
187 let config = HistoricalReaderConfig {
188 remote_store_config,
189 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
190 use_for_pruning_watermark: false,
191 };
192 let reader = HistoricalReader::new(config)?;
194 reader.sync_manifest_once().await?;
195 let manifest = reader.get_manifest().await;
196 info!(
197 "next checkpoint in archive store: {}",
198 manifest.next_checkpoint_seq_num()
199 );
200
201 let file_metadata = reader.verify_and_get_manifest_files(manifest)?;
202
203 let num_files = file_metadata.len() * 2;
205 reader.verify_file_consistency(file_metadata).await?;
206 info!("all {num_files} files are valid");
207 Ok(())
208}