1#![allow(dead_code)]
6
7use std::{
8 fs,
9 fs::File,
10 io,
11 io::{BufReader, Read, Write},
12 ops::Range,
13 path::PathBuf,
14 sync::{
15 Arc,
16 atomic::{AtomicU64, Ordering},
17 },
18};
19
20use anyhow::{Result, anyhow};
21use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
22use bytes::{Buf, Bytes};
23use fastcrypto::hash::{HashFunction, Sha3_256};
24use futures::StreamExt;
25use iota_types::{
26 committee::Committee,
27 messages_checkpoint::{
28 CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
29 },
30 storage::WriteStore,
31};
32use itertools::Itertools;
33use num_enum::{IntoPrimitive, TryFromPrimitive};
34use serde::{Deserialize, Serialize, de::DeserializeOwned};
35use tracing::debug;
36
37use crate::blob::BlobIter;
38
39pub mod blob;
40pub mod http_key_value_store;
41pub mod key_value_store;
42pub mod key_value_store_metrics;
43pub mod mutex_table;
44pub mod object_store;
45pub mod package_object_cache;
46pub mod sharded_lru;
47pub mod write_path_pending_tx_log;
48
49pub const SHA3_BYTES: usize = 32;
50
51#[derive(
52 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
53)]
54#[repr(u8)]
55pub enum StorageFormat {
56 Blob = 0,
57}
58
59#[derive(
60 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
61)]
62#[repr(u8)]
63pub enum FileCompression {
64 None = 0,
65 Zstd,
66}
67
68impl FileCompression {
69 pub fn zstd_compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<()> {
70 let mut encoder = zstd::Encoder::new(writer, 1)?;
72 io::copy(reader, &mut encoder)?;
73 encoder.finish()?;
74 Ok(())
75 }
76 pub fn compress(&self, source: &std::path::Path) -> io::Result<()> {
77 match self {
78 FileCompression::Zstd => {
79 let mut input = File::open(source)?;
80 let tmp_file_name = source.with_extension("tmp");
81 let mut output = File::create(&tmp_file_name)?;
82 Self::zstd_compress(&mut input, &mut output)?;
83 fs::rename(tmp_file_name, source)?;
84 }
85 FileCompression::None => {}
86 }
87 Ok(())
88 }
89 pub fn decompress(&self, source: &PathBuf) -> Result<Box<dyn Read>> {
90 let file = File::open(source)?;
91 let res: Box<dyn Read> = match self {
92 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
93 FileCompression::None => Box::new(BufReader::new(file)),
94 };
95 Ok(res)
96 }
97 pub fn bytes_decompress(&self, bytes: Bytes) -> Result<Box<dyn Read>> {
98 let res: Box<dyn Read> = match self {
99 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(bytes.reader())?),
100 FileCompression::None => Box::new(BufReader::new(bytes.reader())),
101 };
102 Ok(res)
103 }
104}
105
106pub fn compute_sha3_checksum_for_bytes(bytes: Bytes) -> Result<[u8; 32]> {
107 let mut hasher = Sha3_256::default();
108 io::copy(&mut bytes.reader(), &mut hasher)?;
109 Ok(hasher.finalize().digest)
110}
111
112pub fn compute_sha3_checksum_for_file(file: &mut File) -> Result<[u8; 32]> {
113 let mut hasher = Sha3_256::default();
114 io::copy(file, &mut hasher)?;
115 Ok(hasher.finalize().digest)
116}
117
118pub fn compute_sha3_checksum(source: &std::path::Path) -> Result<[u8; 32]> {
119 let mut file = fs::File::open(source)?;
120 compute_sha3_checksum_for_file(&mut file)
121}
122
123pub fn compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<()> {
124 let magic = reader.read_u32::<BigEndian>()?;
125 writer.write_u32::<BigEndian>(magic)?;
126 let storage_format = reader.read_u8()?;
127 writer.write_u8(storage_format)?;
128 let file_compression = FileCompression::try_from(reader.read_u8()?)?;
129 writer.write_u8(file_compression.into())?;
130 match file_compression {
131 FileCompression::Zstd => {
132 FileCompression::zstd_compress(reader, writer)?;
133 }
134 FileCompression::None => {}
135 }
136 Ok(())
137}
138
139pub fn read<R: Read + 'static>(
140 expected_magic: u32,
141 mut reader: R,
142) -> Result<(Box<dyn Read>, StorageFormat)> {
143 let magic = reader.read_u32::<BigEndian>()?;
144 if magic != expected_magic {
145 Err(anyhow!(
146 "Unexpected magic string in file: {magic:?}, expected: {expected_magic:?}"
147 ))
148 } else {
149 let storage_format = StorageFormat::try_from(reader.read_u8()?)?;
150 let file_compression = FileCompression::try_from(reader.read_u8()?)?;
151 let reader: Box<dyn Read> = match file_compression {
152 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(reader)?),
153 FileCompression::None => Box::new(BufReader::new(reader)),
154 };
155 Ok((reader, storage_format))
156 }
157}
158
159pub fn make_iterator<T: DeserializeOwned, R: Read + 'static>(
160 expected_magic: u32,
161 reader: R,
162) -> Result<impl Iterator<Item = T>> {
163 let (reader, storage_format) = read(expected_magic, reader)?;
164 match storage_format {
165 StorageFormat::Blob => Ok(BlobIter::new(reader)),
166 }
167}
168
169#[expect(clippy::result_large_err)]
170pub fn verify_checkpoint_with_committee(
171 committee: Arc<Committee>,
172 current: &VerifiedCheckpoint,
173 checkpoint: CertifiedCheckpointSummary,
174) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary> {
175 assert_eq!(
176 *checkpoint.sequence_number(),
177 current.sequence_number().checked_add(1).unwrap()
178 );
179
180 if Some(*current.digest()) != checkpoint.previous_digest {
181 debug!(
182 current_checkpoint_seq = current.sequence_number(),
183 current_digest =% current.digest(),
184 checkpoint_seq = checkpoint.sequence_number(),
185 checkpoint_digest =% checkpoint.digest(),
186 checkpoint_previous_digest =? checkpoint.previous_digest,
187 "checkpoint not on same chain"
188 );
189 return Err(checkpoint);
190 }
191
192 let current_epoch = current.epoch();
193 if checkpoint.epoch() != current_epoch
194 && checkpoint.epoch() != current_epoch.checked_add(1).unwrap()
195 {
196 debug!(
197 checkpoint_seq = checkpoint.sequence_number(),
198 checkpoint_epoch = checkpoint.epoch(),
199 current_checkpoint_seq = current.sequence_number(),
200 current_epoch = current_epoch,
201 "cannot verify checkpoint with too high of an epoch",
202 );
203 return Err(checkpoint);
204 }
205
206 if checkpoint.epoch() == current_epoch.checked_add(1).unwrap()
207 && current.next_epoch_committee().is_none()
208 {
209 debug!(
210 checkpoint_seq = checkpoint.sequence_number(),
211 checkpoint_epoch = checkpoint.epoch(),
212 current_checkpoint_seq = current.sequence_number(),
213 current_epoch = current_epoch,
214 "next checkpoint claims to be from the next epoch but the latest verified \
215 checkpoint does not indicate that it is the last checkpoint of an epoch"
216 );
217 return Err(checkpoint);
218 }
219
220 checkpoint
221 .verify_authority_signatures(&committee)
222 .map_err(|e| {
223 debug!("error verifying checkpoint: {e}");
224 checkpoint.clone()
225 })?;
226 Ok(VerifiedCheckpoint::new_unchecked(checkpoint))
227}
228
229#[expect(clippy::result_large_err)]
230pub fn verify_checkpoint<S>(
231 current: &VerifiedCheckpoint,
232 store: S,
233 checkpoint: CertifiedCheckpointSummary,
234) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary>
235where
236 S: WriteStore,
237{
238 let committee = store.get_committee(checkpoint.epoch()).unwrap_or_else(|| {
239 panic!(
240 "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
241 checkpoint.epoch(),
242 checkpoint.sequence_number()
243 )
244 });
245
246 verify_checkpoint_with_committee(committee, current, checkpoint)
247}
248
249pub async fn verify_checkpoint_range<S>(
250 checkpoint_range: Range<CheckpointSequenceNumber>,
251 store: S,
252 checkpoint_counter: Arc<AtomicU64>,
253 max_concurrency: usize,
254) where
255 S: WriteStore + Clone,
256{
257 let range_clone = checkpoint_range.clone();
258 futures::stream::iter(range_clone.into_iter().tuple_windows())
259 .map(|(a, b)| {
260 let current = store
261 .get_checkpoint_by_sequence_number(a)
262 .unwrap_or_else(|| {
263 panic!("Checkpoint {a} should exist in store after summary sync but does not");
264 });
265 let next = store
266 .get_checkpoint_by_sequence_number(b)
267 .unwrap_or_else(|| {
268 panic!("Checkpoint {a} should exist in store after summary sync but does not");
269 });
270
271 let committee = store.get_committee(next.epoch()).unwrap_or_else(|| {
272 panic!(
273 "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
274 next.epoch(),
275 next.sequence_number()
276 )
277 });
278 tokio::spawn(async move {
279 verify_checkpoint_with_committee(committee, ¤t, next.clone().into())
280 .expect("Checkpoint verification failed");
281 })
282 })
283 .buffer_unordered(max_concurrency)
284 .for_each(|result| {
285 result.expect("Checkpoint verification task failed");
286 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
287 futures::future::ready(())
288 })
289 .await;
290 let last = checkpoint_range
291 .last()
292 .expect("Received empty checkpoint range");
293 let final_checkpoint = store
294 .get_checkpoint_by_sequence_number(last)
295 .expect("Expected end of checkpoint range to exist in store");
296 store
297 .try_update_highest_verified_checkpoint(&final_checkpoint)
298 .expect("Failed to update highest verified checkpoint");
299}