1#![allow(dead_code)]
6
7pub mod indexes;
8
9use std::{
10 fs,
11 fs::File,
12 io,
13 io::{BufReader, Read, Write},
14 ops::Range,
15 path::{Path, PathBuf},
16 sync::{
17 Arc,
18 atomic::{AtomicU64, Ordering},
19 },
20};
21
22use anyhow::{Result, anyhow};
23use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
24use bytes::{Buf, Bytes};
25use fastcrypto::hash::{HashFunction, Sha3_256};
26use futures::StreamExt;
27pub use indexes::{IndexStore, IndexStoreTables};
28use iota_types::{
29 committee::Committee,
30 messages_checkpoint::{
31 CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
32 },
33 storage::WriteStore,
34};
35use itertools::Itertools;
36use num_enum::{IntoPrimitive, TryFromPrimitive};
37use serde::{Deserialize, Serialize, de::DeserializeOwned};
38use tracing::debug;
39
40use crate::blob::BlobIter;
41
42pub mod blob;
43pub mod http_key_value_store;
44pub mod key_value_store;
45pub mod key_value_store_metrics;
46pub mod mutex_table;
47pub mod object_store;
48pub mod package_object_cache;
49pub mod sharded_lru;
50pub mod write_path_pending_tx_log;
51
52pub const SHA3_BYTES: usize = 32;
53
54#[derive(
55 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
56)]
57#[repr(u8)]
58pub enum StorageFormat {
59 Blob = 0,
60}
61
62#[derive(
63 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
64)]
65#[repr(u8)]
66pub enum FileCompression {
67 None = 0,
68 Zstd,
69}
70
71impl FileCompression {
72 pub fn zstd_compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<()> {
73 let mut encoder = zstd::Encoder::new(writer, 1)?;
75 io::copy(reader, &mut encoder)?;
76 encoder.finish()?;
77 Ok(())
78 }
79 pub fn compress(&self, source: &std::path::Path) -> io::Result<()> {
80 match self {
81 FileCompression::Zstd => {
82 let mut input = File::open(source)?;
83 let tmp_file_name = source.with_extension("tmp");
84 let mut output = File::create(&tmp_file_name)?;
85 Self::zstd_compress(&mut input, &mut output)?;
86 fs::rename(tmp_file_name, source)?;
87 }
88 FileCompression::None => {}
89 }
90 Ok(())
91 }
92 pub fn decompress(&self, source: &PathBuf) -> Result<Box<dyn Read>> {
93 let file = File::open(source)?;
94 let res: Box<dyn Read> = match self {
95 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
96 FileCompression::None => Box::new(BufReader::new(file)),
97 };
98 Ok(res)
99 }
100 pub fn bytes_decompress(&self, bytes: Bytes) -> Result<Box<dyn Read>> {
101 let res: Box<dyn Read> = match self {
102 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(bytes.reader())?),
103 FileCompression::None => Box::new(BufReader::new(bytes.reader())),
104 };
105 Ok(res)
106 }
107}
108
109pub fn compute_sha3_checksum_for_bytes(bytes: Bytes) -> Result<[u8; 32]> {
110 let mut hasher = Sha3_256::default();
111 io::copy(&mut bytes.reader(), &mut hasher)?;
112 Ok(hasher.finalize().digest)
113}
114
115pub fn compute_sha3_checksum_for_file(file: &mut File) -> Result<[u8; 32]> {
116 let mut hasher = Sha3_256::default();
117 io::copy(file, &mut hasher)?;
118 Ok(hasher.finalize().digest)
119}
120
121pub fn compute_sha3_checksum(source: &std::path::Path) -> Result<[u8; 32]> {
122 let mut file = fs::File::open(source)?;
123 compute_sha3_checksum_for_file(&mut file)
124}
125
126pub fn compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<()> {
127 let magic = reader.read_u32::<BigEndian>()?;
128 writer.write_u32::<BigEndian>(magic)?;
129 let storage_format = reader.read_u8()?;
130 writer.write_u8(storage_format)?;
131 let file_compression = FileCompression::try_from(reader.read_u8()?)?;
132 writer.write_u8(file_compression.into())?;
133 match file_compression {
134 FileCompression::Zstd => {
135 FileCompression::zstd_compress(reader, writer)?;
136 }
137 FileCompression::None => {}
138 }
139 Ok(())
140}
141
142pub fn read<R: Read + 'static>(
143 expected_magic: u32,
144 mut reader: R,
145) -> Result<(Box<dyn Read>, StorageFormat)> {
146 let magic = reader.read_u32::<BigEndian>()?;
147 if magic != expected_magic {
148 Err(anyhow!(
149 "Unexpected magic string in file: {:?}, expected: {:?}",
150 magic,
151 expected_magic
152 ))
153 } else {
154 let storage_format = StorageFormat::try_from(reader.read_u8()?)?;
155 let file_compression = FileCompression::try_from(reader.read_u8()?)?;
156 let reader: Box<dyn Read> = match file_compression {
157 FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(reader)?),
158 FileCompression::None => Box::new(BufReader::new(reader)),
159 };
160 Ok((reader, storage_format))
161 }
162}
163
164pub fn make_iterator<T: DeserializeOwned, R: Read + 'static>(
165 expected_magic: u32,
166 reader: R,
167) -> Result<impl Iterator<Item = T>> {
168 let (reader, storage_format) = read(expected_magic, reader)?;
169 match storage_format {
170 StorageFormat::Blob => Ok(BlobIter::new(reader)),
171 }
172}
173
174pub fn verify_checkpoint_with_committee(
175 committee: Arc<Committee>,
176 current: &VerifiedCheckpoint,
177 checkpoint: CertifiedCheckpointSummary,
178) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary> {
179 assert_eq!(
180 *checkpoint.sequence_number(),
181 current.sequence_number().checked_add(1).unwrap()
182 );
183
184 if Some(*current.digest()) != checkpoint.previous_digest {
185 debug!(
186 current_checkpoint_seq = current.sequence_number(),
187 current_digest =% current.digest(),
188 checkpoint_seq = checkpoint.sequence_number(),
189 checkpoint_digest =% checkpoint.digest(),
190 checkpoint_previous_digest =? checkpoint.previous_digest,
191 "checkpoint not on same chain"
192 );
193 return Err(checkpoint);
194 }
195
196 let current_epoch = current.epoch();
197 if checkpoint.epoch() != current_epoch
198 && checkpoint.epoch() != current_epoch.checked_add(1).unwrap()
199 {
200 debug!(
201 checkpoint_seq = checkpoint.sequence_number(),
202 checkpoint_epoch = checkpoint.epoch(),
203 current_checkpoint_seq = current.sequence_number(),
204 current_epoch = current_epoch,
205 "cannot verify checkpoint with too high of an epoch",
206 );
207 return Err(checkpoint);
208 }
209
210 if checkpoint.epoch() == current_epoch.checked_add(1).unwrap()
211 && current.next_epoch_committee().is_none()
212 {
213 debug!(
214 checkpoint_seq = checkpoint.sequence_number(),
215 checkpoint_epoch = checkpoint.epoch(),
216 current_checkpoint_seq = current.sequence_number(),
217 current_epoch = current_epoch,
218 "next checkpoint claims to be from the next epoch but the latest verified \
219 checkpoint does not indicate that it is the last checkpoint of an epoch"
220 );
221 return Err(checkpoint);
222 }
223
224 checkpoint
225 .verify_authority_signatures(&committee)
226 .map_err(|e| {
227 debug!("error verifying checkpoint: {e}");
228 checkpoint.clone()
229 })?;
230 Ok(VerifiedCheckpoint::new_unchecked(checkpoint))
231}
232
233pub fn verify_checkpoint<S>(
234 current: &VerifiedCheckpoint,
235 store: S,
236 checkpoint: CertifiedCheckpointSummary,
237) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary>
238where
239 S: WriteStore,
240{
241 let committee = store
242 .get_committee(checkpoint.epoch())
243 .expect("store operation should not fail")
244 .unwrap_or_else(|| {
245 panic!(
246 "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
247 checkpoint.epoch(),
248 checkpoint.sequence_number()
249 )
250 });
251
252 verify_checkpoint_with_committee(committee, current, checkpoint)
253}
254
255pub async fn verify_checkpoint_range<S>(
256 checkpoint_range: Range<CheckpointSequenceNumber>,
257 store: S,
258 checkpoint_counter: Arc<AtomicU64>,
259 max_concurrency: usize,
260) where
261 S: WriteStore + Clone,
262{
263 let range_clone = checkpoint_range.clone();
264 futures::stream::iter(range_clone.into_iter().tuple_windows())
265 .map(|(a, b)| {
266 let current = store
267 .get_checkpoint_by_sequence_number(a)
268 .expect("store operation should not fail")
269 .unwrap_or_else(|| {
270 panic!(
271 "Checkpoint {} should exist in store after summary sync but does not",
272 a
273 );
274 });
275 let next = store
276 .get_checkpoint_by_sequence_number(b)
277 .expect("store operation should not fail")
278 .unwrap_or_else(|| {
279 panic!(
280 "Checkpoint {} should exist in store after summary sync but does not",
281 a
282 );
283 });
284 let committee = store
285 .get_committee(next.epoch())
286 .expect("store operation should not fail")
287 .unwrap_or_else(|| {
288 panic!(
289 "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
290 next.epoch(),
291 next.sequence_number()
292 )
293 });
294 tokio::spawn(async move {
295 verify_checkpoint_with_committee(committee, ¤t, next.clone().into())
296 .expect("Checkpoint verification failed");
297 })
298 })
299 .buffer_unordered(max_concurrency)
300 .for_each(|result| {
301 result.expect("Checkpoint verification task failed");
302 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
303 futures::future::ready(())
304 })
305 .await;
306 let last = checkpoint_range
307 .last()
308 .expect("Received empty checkpoint range");
309 let final_checkpoint = store
310 .get_checkpoint_by_sequence_number(last)
311 .expect("Failed to fetch checkpoint")
312 .expect("Expected end of checkpoint range to exist in store");
313 store
314 .update_highest_verified_checkpoint(&final_checkpoint)
315 .expect("Failed to update highest verified checkpoint");
316}
317
318fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> {
319 fs::create_dir_all(&dst)?;
320 for entry in fs::read_dir(src)? {
321 let entry = entry?;
322 let ty = entry.file_type()?;
323 if ty.is_dir() {
324 hard_link(entry.path(), dst.as_ref().join(entry.file_name()))?;
325 } else {
326 fs::hard_link(entry.path(), dst.as_ref().join(entry.file_name()))?;
327 }
328 }
329 Ok(())
330}
331
332#[cfg(test)]
333mod tests {
334 use tempfile::TempDir;
335 use typed_store::{
336 Map, reopen,
337 rocks::{DBMap, MetricConf, ReadWriteOptions, open_cf},
338 };
339
340 use crate::hard_link;
341
342 #[tokio::test]
343 pub async fn test_db_hard_link() -> anyhow::Result<()> {
344 let input = TempDir::new()?;
345 let input_path = input.path();
346
347 let output = TempDir::new()?;
348 let output_path = output.path();
349
350 const FIRST_CF: &str = "First_CF";
351 const SECOND_CF: &str = "Second_CF";
352
353 let db_a = open_cf(
354 input_path,
355 None,
356 MetricConf::new("test_db_hard_link_1"),
357 &[FIRST_CF, SECOND_CF],
358 )
359 .unwrap();
360
361 let (db_map_1, db_map_2) = reopen!(&db_a, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
362
363 let keys_vals_cf1 = (1..100).map(|i| (i, i.to_string()));
364 let keys_vals_cf2 = (1..100).map(|i| (i, i.to_string()));
365
366 assert!(db_map_1.multi_insert(keys_vals_cf1).is_ok());
367 assert!(db_map_2.multi_insert(keys_vals_cf2).is_ok());
368
369 hard_link(input_path, output_path)?;
371 let db_b = open_cf(
372 output_path,
373 None,
374 MetricConf::new("test_db_hard_link_2"),
375 &[FIRST_CF, SECOND_CF],
376 )
377 .unwrap();
378
379 let (db_map_1, db_map_2) = reopen!(&db_b, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
380 for i in 1..100 {
381 assert!(
382 db_map_1
383 .contains_key(&i)
384 .expect("Failed to call contains key")
385 );
386 assert!(
387 db_map_2
388 .contains_key(&i)
389 .expect("Failed to call contains key")
390 );
391 }
392
393 Ok(())
394 }
395}