Skip to main content

iota_storage/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7use std::{
8    fs,
9    fs::File,
10    io,
11    io::{BufReader, Read, Write},
12    ops::Range,
13    path::PathBuf,
14    sync::{
15        Arc,
16        atomic::{AtomicU64, Ordering},
17    },
18};
19
20use anyhow::{Result, anyhow};
21use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
22use bytes::{Buf, Bytes};
23use fastcrypto::hash::{HashFunction, Sha3_256};
24use futures::StreamExt;
25use iota_types::{
26    committee::Committee,
27    messages_checkpoint::{
28        CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
29    },
30    storage::WriteStore,
31};
32use itertools::Itertools;
33use num_enum::{IntoPrimitive, TryFromPrimitive};
34use serde::{Deserialize, Serialize, de::DeserializeOwned};
35use tracing::debug;
36
37use crate::blob::BlobIter;
38
39pub mod blob;
40pub mod http_key_value_store;
41pub mod key_value_store;
42pub mod key_value_store_metrics;
43pub mod mutex_table;
44pub mod object_store;
45pub mod package_object_cache;
46pub mod sharded_lru;
47pub mod write_path_pending_tx_log;
48
49pub const SHA3_BYTES: usize = 32;
50
51#[derive(
52    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
53)]
54#[repr(u8)]
55pub enum StorageFormat {
56    Blob = 0,
57}
58
59#[derive(
60    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
61)]
62#[repr(u8)]
63pub enum FileCompression {
64    None = 0,
65    Zstd,
66}
67
68impl FileCompression {
69    pub fn zstd_compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> io::Result<()> {
70        // TODO: Add zstd compression level as function argument
71        let mut encoder = zstd::Encoder::new(writer, 1)?;
72        io::copy(reader, &mut encoder)?;
73        encoder.finish()?;
74        Ok(())
75    }
76    pub fn compress(&self, source: &std::path::Path) -> io::Result<()> {
77        match self {
78            FileCompression::Zstd => {
79                let mut input = File::open(source)?;
80                let tmp_file_name = source.with_extension("tmp");
81                let mut output = File::create(&tmp_file_name)?;
82                Self::zstd_compress(&mut input, &mut output)?;
83                fs::rename(tmp_file_name, source)?;
84            }
85            FileCompression::None => {}
86        }
87        Ok(())
88    }
89    pub fn decompress(&self, source: &PathBuf) -> Result<Box<dyn Read>> {
90        let file = File::open(source)?;
91        let res: Box<dyn Read> = match self {
92            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
93            FileCompression::None => Box::new(BufReader::new(file)),
94        };
95        Ok(res)
96    }
97    pub fn bytes_decompress(&self, bytes: Bytes) -> Result<Box<dyn Read>> {
98        let res: Box<dyn Read> = match self {
99            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(bytes.reader())?),
100            FileCompression::None => Box::new(BufReader::new(bytes.reader())),
101        };
102        Ok(res)
103    }
104}
105
106pub fn compute_sha3_checksum_for_bytes(bytes: Bytes) -> Result<[u8; 32]> {
107    let mut hasher = Sha3_256::default();
108    io::copy(&mut bytes.reader(), &mut hasher)?;
109    Ok(hasher.finalize().digest)
110}
111
112pub fn compute_sha3_checksum_for_file(file: &mut File) -> Result<[u8; 32]> {
113    let mut hasher = Sha3_256::default();
114    io::copy(file, &mut hasher)?;
115    Ok(hasher.finalize().digest)
116}
117
118pub fn compute_sha3_checksum(source: &std::path::Path) -> Result<[u8; 32]> {
119    let mut file = fs::File::open(source)?;
120    compute_sha3_checksum_for_file(&mut file)
121}
122
123pub fn compress<R: Read, W: Write>(reader: &mut R, writer: &mut W) -> Result<()> {
124    let magic = reader.read_u32::<BigEndian>()?;
125    writer.write_u32::<BigEndian>(magic)?;
126    let storage_format = reader.read_u8()?;
127    writer.write_u8(storage_format)?;
128    let file_compression = FileCompression::try_from(reader.read_u8()?)?;
129    writer.write_u8(file_compression.into())?;
130    match file_compression {
131        FileCompression::Zstd => {
132            FileCompression::zstd_compress(reader, writer)?;
133        }
134        FileCompression::None => {}
135    }
136    Ok(())
137}
138
139pub fn read<R: Read + 'static>(
140    expected_magic: u32,
141    mut reader: R,
142) -> Result<(Box<dyn Read>, StorageFormat)> {
143    let magic = reader.read_u32::<BigEndian>()?;
144    if magic != expected_magic {
145        Err(anyhow!(
146            "Unexpected magic string in file: {magic:?}, expected: {expected_magic:?}"
147        ))
148    } else {
149        let storage_format = StorageFormat::try_from(reader.read_u8()?)?;
150        let file_compression = FileCompression::try_from(reader.read_u8()?)?;
151        let reader: Box<dyn Read> = match file_compression {
152            FileCompression::Zstd => Box::new(zstd::stream::Decoder::new(reader)?),
153            FileCompression::None => Box::new(BufReader::new(reader)),
154        };
155        Ok((reader, storage_format))
156    }
157}
158
159pub fn make_iterator<T: DeserializeOwned, R: Read + 'static>(
160    expected_magic: u32,
161    reader: R,
162) -> Result<impl Iterator<Item = T>> {
163    let (reader, storage_format) = read(expected_magic, reader)?;
164    match storage_format {
165        StorageFormat::Blob => Ok(BlobIter::new(reader)),
166    }
167}
168
169#[expect(clippy::result_large_err)]
170pub fn verify_checkpoint_with_committee(
171    committee: Arc<Committee>,
172    current: &VerifiedCheckpoint,
173    checkpoint: CertifiedCheckpointSummary,
174) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary> {
175    assert_eq!(
176        *checkpoint.sequence_number(),
177        current.sequence_number().checked_add(1).unwrap()
178    );
179
180    if Some(*current.digest()) != checkpoint.previous_digest {
181        debug!(
182            current_checkpoint_seq = current.sequence_number(),
183            current_digest =% current.digest(),
184            checkpoint_seq = checkpoint.sequence_number(),
185            checkpoint_digest =% checkpoint.digest(),
186            checkpoint_previous_digest =? checkpoint.previous_digest,
187            "checkpoint not on same chain"
188        );
189        return Err(checkpoint);
190    }
191
192    let current_epoch = current.epoch();
193    if checkpoint.epoch() != current_epoch
194        && checkpoint.epoch() != current_epoch.checked_add(1).unwrap()
195    {
196        debug!(
197            checkpoint_seq = checkpoint.sequence_number(),
198            checkpoint_epoch = checkpoint.epoch(),
199            current_checkpoint_seq = current.sequence_number(),
200            current_epoch = current_epoch,
201            "cannot verify checkpoint with too high of an epoch",
202        );
203        return Err(checkpoint);
204    }
205
206    if checkpoint.epoch() == current_epoch.checked_add(1).unwrap()
207        && current.next_epoch_committee().is_none()
208    {
209        debug!(
210            checkpoint_seq = checkpoint.sequence_number(),
211            checkpoint_epoch = checkpoint.epoch(),
212            current_checkpoint_seq = current.sequence_number(),
213            current_epoch = current_epoch,
214            "next checkpoint claims to be from the next epoch but the latest verified \
215            checkpoint does not indicate that it is the last checkpoint of an epoch"
216        );
217        return Err(checkpoint);
218    }
219
220    checkpoint
221        .verify_authority_signatures(&committee)
222        .map_err(|e| {
223            debug!("error verifying checkpoint: {e}");
224            checkpoint.clone()
225        })?;
226    Ok(VerifiedCheckpoint::new_unchecked(checkpoint))
227}
228
229#[expect(clippy::result_large_err)]
230pub fn verify_checkpoint<S>(
231    current: &VerifiedCheckpoint,
232    store: S,
233    checkpoint: CertifiedCheckpointSummary,
234) -> Result<VerifiedCheckpoint, CertifiedCheckpointSummary>
235where
236    S: WriteStore,
237{
238    let committee = store.get_committee(checkpoint.epoch()).unwrap_or_else(|| {
239        panic!(
240            "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
241            checkpoint.epoch(),
242            checkpoint.sequence_number()
243        )
244    });
245
246    verify_checkpoint_with_committee(committee, current, checkpoint)
247}
248
249pub async fn verify_checkpoint_range<S>(
250    checkpoint_range: Range<CheckpointSequenceNumber>,
251    store: S,
252    checkpoint_counter: Arc<AtomicU64>,
253    max_concurrency: usize,
254) where
255    S: WriteStore + Clone,
256{
257    let range_clone = checkpoint_range.clone();
258    futures::stream::iter(range_clone.into_iter().tuple_windows())
259        .map(|(a, b)| {
260            let current = store
261                .get_checkpoint_by_sequence_number(a)
262                .unwrap_or_else(|| {
263                    panic!("Checkpoint {a} should exist in store after summary sync but does not");
264                });
265            let next = store
266                .get_checkpoint_by_sequence_number(b)
267                .unwrap_or_else(|| {
268                    panic!("Checkpoint {a} should exist in store after summary sync but does not");
269                });
270
271            let committee = store.get_committee(next.epoch()).unwrap_or_else(|| {
272                panic!(
273                    "BUG: should have committee for epoch {} before we try to verify checkpoint {}",
274                    next.epoch(),
275                    next.sequence_number()
276                )
277            });
278            tokio::spawn(async move {
279                verify_checkpoint_with_committee(committee, &current, next.clone().into())
280                    .expect("Checkpoint verification failed");
281            })
282        })
283        .buffer_unordered(max_concurrency)
284        .for_each(|result| {
285            result.expect("Checkpoint verification task failed");
286            checkpoint_counter.fetch_add(1, Ordering::Relaxed);
287            futures::future::ready(())
288        })
289        .await;
290    let last = checkpoint_range
291        .last()
292        .expect("Received empty checkpoint range");
293    let final_checkpoint = store
294        .get_checkpoint_by_sequence_number(last)
295        .expect("Expected end of checkpoint range to exist in store");
296    store
297        .try_update_highest_verified_checkpoint(&final_checkpoint)
298        .expect("Failed to update highest verified checkpoint");
299}