iota_storage/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7pub mod indexes;
8
9use std::{
10    fs,
11    fs::File,
12    io,
13    io::{BufReader, Read, Write},
14    ops::Range,
15    path::{Path, PathBuf},
16    sync::{
17        Arc,
18        atomic::{AtomicU64, Ordering},
19    },
20};
21
22use anyhow::{Result, anyhow};
23use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
24use bytes::{Buf, Bytes};
25use fastcrypto::hash::{HashFunction, Sha3_256};
26use futures::StreamExt;
27pub use indexes::{IndexStore, IndexStoreTables};
28use iota_types::{
29    committee::Committee,
30    messages_checkpoint::{
31        CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
32    },
33    storage::WriteStore,
34};
35use itertools::Itertools;
36use num_enum::{IntoPrimitive, TryFromPrimitive};
37use serde::{Deserialize, Serialize, de::DeserializeOwned};
38use tracing::debug;
39
40use crate::blob::BlobIter;
41
42pub mod blob;
43pub mod http_key_value_store;
44pub mod key_value_store;
45pub mod key_value_store_metrics;
46pub mod mutex_table;
47pub mod object_store;
48pub mod package_object_cache;
49pub mod sharded_lru;
50pub mod write_path_pending_tx_log;
51
52pub const SHA3_BYTES: usize = 32;
53
54#[derive(
55    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
56)]
57#[repr(u8)]
58pub enum StorageFormat {
59    Blob = 0,
60}
61
62#[derive(
63    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
64)]
65#[repr(u8)]
66pub enum FileCompression {
67    None = 0,
68    Zstd,
69}
70
71impl FileCompression {
72    pub fn zstd_compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<()> {
73        // TODO: Add zstd compression level as function argument
74        let mut encoder = zstd::Encoder::new(writer, 1)?;
75        io::copy(reader, &mut encoder)?;
76        encoder.finish()?;
77        Ok(())
78    }
79    pub fn compress(&self, source: &std::path::Path) -> io::Result<()> {
80        match self {
81            FileCompression::Zstd => {
82                let mut input = File::open(source)?;
83                let tmp_file_name = source.with_extension("tmp");
84                let mut output = File::create(&tmp_file_name)?;
85                Self::zstd_compress(&mut input, &mut output)?;
86                fs::rename(tmp_file_name, source)?;
87            }
88            FileCompression::None => {}
89        }
90        Ok(())
91    }
92    pub fn decompress(&self, source: &PathBuf) -> Result<Box<dyn Read>> {
93        let file = File::open(source)?;
94        let res: Box<dyn Read> = match self {
95            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
96            FileCompression::None => Box::new(BufReader::new(file)),
97        };
98        Ok(res)
99    }
100    pub fn bytes_decompress(&self, bytes: Bytes) -> Result<Box<dyn Read>> {
101        let res: Box<dyn Read> = match self {
102            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(bytes.reader())?),
103            FileCompression::None => Box::new(BufReader::new(bytes.reader())),
104        };
105        Ok(res)
106    }
107}
108
109pub fn compute_sha3_checksum_for_bytes(bytes: Bytes) -> Result<[u8; 32]> {
110    let mut hasher = Sha3_256::default();
111    io::copy(&mut bytes.reader(), &mut hasher)?;
112    Ok(hasher.finalize().digest)
113}
114
115pub fn compute_sha3_checksum_for_file(file: &mut File) -> Result<[u8; 32]> {
116    let mut hasher = Sha3_256::default();
117    io::copy(file, &mut hasher)?;
118    Ok(hasher.finalize().digest)
119}
120
121pub fn compute_sha3_checksum(source: &std::path::Path) -> Result<[u8; 32]> {
122    let mut file = fs::File::open(source)?;
123    compute_sha3_checksum_for_file(&mut file)
124}
125
126pub fn compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<()> {
127    let magic = reader.read_u32::<BigEndian>()?;
128    writer.write_u32::<BigEndian>(magic)?;
129    let storage_format = reader.read_u8()?;
130    writer.write_u8(storage_format)?;
131    let file_compression = FileCompression::try_from(reader.read_u8()?)?;
132    writer.write_u8(file_compression.into())?;
133    match file_compression {
134        FileCompression::Zstd => {
135            FileCompression::zstd_compress(reader, writer)?;
136        }
137        FileCompression::None => {}
138    }
139    Ok(())
140}
141
142pub fn read<R: Read + 'static>(
143    expected_magic: u32,
144    mut reader: R,
145) -> Result<(Box<dyn Read>, StorageFormat)> {
146    let magic = reader.read_u32::<BigEndian>()?;
147    if magic != expected_magic {
148        Err(anyhow!(
149            "Unexpected magic string in file: {:?}, expected: {:?}",
150            magic,
151            expected_magic
152        ))
153    } else {
154        let storage_format = StorageFormat::try_from(reader.read_u8()?)?;
155        let file_compression = FileCompression::try_from(reader.read_u8()?)?;
156        let reader: Box<dyn Read> = match file_compression {
157            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(reader)?),
158            FileCompression::None => Box::new(BufReader::new(reader)),
159        };
160        Ok((reader, storage_format))
161    }
162}
163
164pub fn make_iterator<T: DeserializeOwned, R: Read + 'static>(
165    expected_magic: u32,
166    reader: R,
167) -> Result<impl Iterator<Item = T>> {
168    let (reader, storage_format) = read(expected_magic, reader)?;
169    match storage_format {
170        StorageFormat::Blob => Ok(BlobIter::new(reader)),
171    }
172}
173
174pub fn verify_checkpoint_with_committee(
175    committee: Arc<Committee>,
176    current: &VerifiedCheckpoint,
177    checkpoint: CertifiedCheckpointSummary,
178) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary> {
179    assert_eq!(
180        *checkpoint.sequence_number(),
181        current.sequence_number().checked_add(1).unwrap()
182    );
183
184    if Some(*current.digest()) != checkpoint.previous_digest {
185        debug!(
186            current_checkpoint_seq = current.sequence_number(),
187            current_digest =% current.digest(),
188            checkpoint_seq = checkpoint.sequence_number(),
189            checkpoint_digest =% checkpoint.digest(),
190            checkpoint_previous_digest =? checkpoint.previous_digest,
191            "checkpoint not on same chain"
192        );
193        return Err(checkpoint);
194    }
195
196    let current_epoch = current.epoch();
197    if checkpoint.epoch() != current_epoch
198        && checkpoint.epoch() != current_epoch.checked_add(1).unwrap()
199    {
200        debug!(
201            checkpoint_seq = checkpoint.sequence_number(),
202            checkpoint_epoch = checkpoint.epoch(),
203            current_checkpoint_seq = current.sequence_number(),
204            current_epoch = current_epoch,
205            "cannot verify checkpoint with too high of an epoch",
206        );
207        return Err(checkpoint);
208    }
209
210    if checkpoint.epoch() == current_epoch.checked_add(1).unwrap()
211        && current.next_epoch_committee().is_none()
212    {
213        debug!(
214            checkpoint_seq = checkpoint.sequence_number(),
215            checkpoint_epoch = checkpoint.epoch(),
216            current_checkpoint_seq = current.sequence_number(),
217            current_epoch = current_epoch,
218            "next checkpoint claims to be from the next epoch but the latest verified \
219            checkpoint does not indicate that it is the last checkpoint of an epoch"
220        );
221        return Err(checkpoint);
222    }
223
224    checkpoint
225        .verify_authority_signatures(&committee)
226        .map_err(|e| {
227            debug!("error verifying checkpoint: {e}");
228            checkpoint.clone()
229        })?;
230    Ok(VerifiedCheckpoint::new_unchecked(checkpoint))
231}
232
233pub fn verify_checkpoint<S>(
234    current: &VerifiedCheckpoint,
235    store: S,
236    checkpoint: CertifiedCheckpointSummary,
237) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary>
238where
239    S: WriteStore,
240{
241    let committee = store
242        .get_committee(checkpoint.epoch())
243        .expect("store operation should not fail")
244        .unwrap_or_else(|| {
245            panic!(
246                "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
247                checkpoint.epoch(),
248                checkpoint.sequence_number()
249            )
250        });
251
252    verify_checkpoint_with_committee(committee, current, checkpoint)
253}
254
255pub async fn verify_checkpoint_range<S>(
256    checkpoint_range: Range<CheckpointSequenceNumber>,
257    store: S,
258    checkpoint_counter: Arc<AtomicU64>,
259    max_concurrency: usize,
260) where
261    S: WriteStore + Clone,
262{
263    let range_clone = checkpoint_range.clone();
264    futures::stream::iter(range_clone.into_iter().tuple_windows())
265        .map(|(a, b)| {
266            let current = store
267                .get_checkpoint_by_sequence_number(a)
268                .expect("store operation should not fail")
269                .unwrap_or_else(|| {
270                    panic!(
271                        "Checkpoint {} should exist in store after summary sync but does not",
272                        a
273                    );
274                });
275            let next = store
276                .get_checkpoint_by_sequence_number(b)
277                .expect("store operation should not fail")
278                .unwrap_or_else(|| {
279                    panic!(
280                        "Checkpoint {} should exist in store after summary sync but does not",
281                        a
282                    );
283                });
284            let committee = store
285                .get_committee(next.epoch())
286                .expect("store operation should not fail")
287                .unwrap_or_else(|| {
288                    panic!(
289                        "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
290                        next.epoch(),
291                        next.sequence_number()
292                    )
293                });
294            tokio::spawn(async move {
295                verify_checkpoint_with_committee(committee, &current, next.clone().into())
296                    .expect("Checkpoint verification failed");
297            })
298        })
299        .buffer_unordered(max_concurrency)
300        .for_each(|result| {
301            result.expect("Checkpoint verification task failed");
302            checkpoint_counter.fetch_add(1, Ordering::Relaxed);
303            futures::future::ready(())
304        })
305        .await;
306    let last = checkpoint_range
307        .last()
308        .expect("Received empty checkpoint range");
309    let final_checkpoint = store
310        .get_checkpoint_by_sequence_number(last)
311        .expect("Failed to fetch checkpoint")
312        .expect("Expected end of checkpoint range to exist in store");
313    store
314        .update_highest_verified_checkpoint(&final_checkpoint)
315        .expect("Failed to update highest verified checkpoint");
316}
317
318fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> io::Result<()> {
319    fs::create_dir_all(&dst)?;
320    for entry in fs::read_dir(src)? {
321        let entry = entry?;
322        let ty = entry.file_type()?;
323        if ty.is_dir() {
324            hard_link(entry.path(), dst.as_ref().join(entry.file_name()))?;
325        } else {
326            fs::hard_link(entry.path(), dst.as_ref().join(entry.file_name()))?;
327        }
328    }
329    Ok(())
330}
331
332#[cfg(test)]
333mod tests {
334    use tempfile::TempDir;
335    use typed_store::{
336        Map, reopen,
337        rocks::{DBMap, MetricConf, ReadWriteOptions, open_cf},
338    };
339
340    use crate::hard_link;
341
342    #[tokio::test]
343    pub async fn test_db_hard_link() -> anyhow::Result<()> {
344        let input = TempDir::new()?;
345        let input_path = input.path();
346
347        let output = TempDir::new()?;
348        let output_path = output.path();
349
350        const FIRST_CF: &str = "First_CF";
351        const SECOND_CF: &str = "Second_CF";
352
353        let db_a = open_cf(
354            input_path,
355            None,
356            MetricConf::new("test_db_hard_link_1"),
357            &[FIRST_CF, SECOND_CF],
358        )
359        .unwrap();
360
361        let (db_map_1, db_map_2) = reopen!(&db_a, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
362
363        let keys_vals_cf1 = (1..100).map(|i| (i, i.to_string()));
364        let keys_vals_cf2 = (1..100).map(|i| (i, i.to_string()));
365
366        assert!(db_map_1.multi_insert(keys_vals_cf1).is_ok());
367        assert!(db_map_2.multi_insert(keys_vals_cf2).is_ok());
368
369        // set up db hard link
370        hard_link(input_path, output_path)?;
371        let db_b = open_cf(
372            output_path,
373            None,
374            MetricConf::new("test_db_hard_link_2"),
375            &[FIRST_CF, SECOND_CF],
376        )
377        .unwrap();
378
379        let (db_map_1, db_map_2) = reopen!(&db_b, FIRST_CF;<i32, String>, SECOND_CF;<i32, String>);
380        for i in 1..100 {
381            assert!(
382                db_map_1
383                    .contains_key(&i)
384                    .expect("Failed to call contains key")
385            );
386            assert!(
387                db_map_2
388                    .contains_key(&i)
389                    .expect("Failed to call contains key")
390            );
391        }
392
393        Ok(())
394    }
395}