iota_snapshot/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7#[cfg(test)]
8mod tests;
9
10pub mod reader;
11pub mod uploader;
12mod writer;
13
14use std::{
15    path::PathBuf,
16    sync::{
17        Arc,
18        atomic::{AtomicU64, Ordering},
19    },
20    time::Duration,
21};
22
23use anyhow::Result;
24use fastcrypto::hash::MultisetHash;
25use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
26use iota_core::{
27    authority::{
28        authority_store_tables::{AuthorityPerpetualTables, LiveObject},
29        epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
30    },
31    checkpoints::CheckpointStore,
32    epoch::committee_store::CommitteeStore,
33    state_accumulator::WrappedObject,
34};
35use iota_storage::{
36    FileCompression, SHA3_BYTES, compute_sha3_checksum, object_store::util::path_to_filesystem,
37};
38use iota_types::{
39    accumulator::Accumulator,
40    base_types::ObjectID,
41    iota_system_state::{
42        IotaSystemStateTrait, epoch_start_iota_system_state::EpochStartSystemStateTrait,
43        get_iota_system_state,
44    },
45    messages_checkpoint::ECMHLiveObjectSetDigest,
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use serde::{Deserialize, Serialize};
50use tokio::time::Instant;
51
52/// The following describes the format of an object file (*.obj) used for
53/// persisting live iota objects. The maximum size per .obj file is 128MB. State
54/// snapshot will be taken at the end of every epoch. Live object set is split
55/// into and stored across multiple hash buckets. The hashing function used
56/// for bucketing objects is the same as the one used to build the accumulator
57/// tree for computing state root hash. Buckets are further subdivided into
58/// partitions. A partition is a smallest storage unit which holds a subset of
59/// objects in one bucket. Each partition is a single *.obj file where
60/// objects are appended to in an append-only fashion. A new partition is
61/// created when the current one reaches its maximum size. i.e. 128MB.
62/// Partitions allow a single hash bucket to be consumed in parallel. Partition
63/// files are optionally compressed with the zstd compression format. Partition
64/// filenames follows the format <bucket_number>_<partition_number>.obj. Object
65/// references for hash. There is one single ref file per hash bucket. Object
66/// references are written in an append-only manner as well. Finally, the
67/// MANIFEST file contains per file metadata of every file in the snapshot
68/// directory. State Snapshot Directory Layout
69///  - snapshot/
70///     - epoch_0/
71///        - 1_1.obj
72///        - 1_2.obj
73///        - 1_3.obj
74///        - 2_1.obj
75///        - ...
76///        - 1000_1.obj
77///        - REFERENCE-1
78///        - REFERENCE-2
79///        - ...
80///        - REFERENCE-1000
81///        - MANIFEST
82///     - epoch_1/
83///       - 1_1.obj
84///       - ...
85///
86/// Object File Disk Format
87/// ┌──────────────────────────────┐
88/// │  magic(0x00B7EC75) <4 byte>  │
89/// ├──────────────────────────────┤
90/// │ ┌──────────────────────────┐ │
91/// │ │         Object 1         │ │
92/// │ ├──────────────────────────┤ │
93/// │ │          ...             │ │
94/// │ ├──────────────────────────┤ │
95/// │ │         Object N         │ │
96/// │ └──────────────────────────┘ │
97/// └──────────────────────────────┘
98/// Object
99/// ┌───────────────┬───────────────────┬──────────────┐
100/// │ len <uvarint> │ encoding <1 byte> │ data <bytes> │
101/// └───────────────┴───────────────────┴──────────────┘
102///
103/// REFERENCE File Disk Format
104/// ┌──────────────────────────────┐
105/// │  magic(0x5EFE5E11) <4 byte>  │
106/// ├──────────────────────────────┤
107/// │ ┌──────────────────────────┐ │
108/// │ │         ObjectRef 1      │ │
109/// │ ├──────────────────────────┤ │
110/// │ │          ...             │ │
111/// │ ├──────────────────────────┤ │
112/// │ │         ObjectRef N      │ │
113/// │ └──────────────────────────┘ │
114/// └──────────────────────────────┘
115/// ObjectRef (ObjectID, SequenceNumber, ObjectDigest)
116/// ┌───────────────┬───────────────────┬──────────────┐
117/// │         data (<(address_len + 8 + 32) bytes>)    │
118/// └───────────────┴───────────────────┴──────────────┘
119///
120/// MANIFEST File Disk Format
121/// ┌──────────────────────────────┐
122/// │  magic(0x00C0FFEE) <4 byte>  │
123/// ├──────────────────────────────┤
124/// │   serialized manifest        │
125/// ├──────────────────────────────┤
126/// │      sha3 <32 bytes>         │
127/// └──────────────────────────────┘
128const OBJECT_FILE_MAGIC: u32 = 0x00B7EC75;
129const REFERENCE_FILE_MAGIC: u32 = 0xDEADBEEF;
130const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
131const MAGIC_BYTES: usize = 4;
132const SNAPSHOT_VERSION_BYTES: usize = 1;
133const ADDRESS_LENGTH_BYTES: usize = 8;
134const PADDING_BYTES: usize = 3;
135const MANIFEST_FILE_HEADER_BYTES: usize =
136    MAGIC_BYTES + SNAPSHOT_VERSION_BYTES + ADDRESS_LENGTH_BYTES + PADDING_BYTES;
137const FILE_MAX_BYTES: usize = 128 * 1024 * 1024;
138const OBJECT_ID_BYTES: usize = ObjectID::LENGTH;
139const SEQUENCE_NUM_BYTES: usize = 8;
140const OBJECT_DIGEST_BYTES: usize = 32;
141const OBJECT_REF_BYTES: usize = OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES + OBJECT_DIGEST_BYTES;
142const FILE_TYPE_BYTES: usize = 1;
143const BUCKET_BYTES: usize = 4;
144const BUCKET_PARTITION_BYTES: usize = 4;
145const COMPRESSION_TYPE_BYTES: usize = 1;
146const FILE_METADATA_BYTES: usize =
147    FILE_TYPE_BYTES + BUCKET_BYTES + BUCKET_PARTITION_BYTES + COMPRESSION_TYPE_BYTES + SHA3_BYTES;
148
149#[derive(
150    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
151)]
152#[repr(u8)]
153pub enum FileType {
154    Object = 0,
155    Reference,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
159/// FileMetadata holds either an object or a reference file metadata.
160pub struct FileMetadata {
161    pub file_type: FileType,
162    pub bucket_num: u32,
163    pub part_num: u32,
164    pub file_compression: FileCompression,
165    pub sha3_digest: [u8; 32],
166}
167
168impl FileMetadata {
169    pub fn file_path(&self, dir_path: &Path) -> Path {
170        match self.file_type {
171            FileType::Object => {
172                dir_path.child(&*format!("{}_{}.obj", self.bucket_num, self.part_num))
173            }
174            FileType::Reference => {
175                dir_path.child(&*format!("{}_{}.ref", self.bucket_num, self.part_num))
176            }
177        }
178    }
179    pub fn local_file_path(&self, root_path: &std::path::Path, dir_path: &Path) -> Result<PathBuf> {
180        path_to_filesystem(root_path.to_path_buf(), &self.file_path(dir_path))
181    }
182}
183
184#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
185pub struct ManifestV1 {
186    pub snapshot_version: u8,
187    pub address_length: u64,
188    pub file_metadata: Vec<FileMetadata>,
189    pub epoch: u64,
190}
191
192#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
193pub enum Manifest {
194    V1(ManifestV1),
195}
196
197impl Manifest {
198    pub fn snapshot_version(&self) -> u8 {
199        match self {
200            Self::V1(manifest) => manifest.snapshot_version,
201        }
202    }
203    pub fn address_length(&self) -> u64 {
204        match self {
205            Self::V1(manifest) => manifest.address_length,
206        }
207    }
208    pub fn file_metadata(&self) -> &Vec<FileMetadata> {
209        match self {
210            Self::V1(manifest) => &manifest.file_metadata,
211        }
212    }
213    pub fn epoch(&self) -> u64 {
214        match self {
215            Self::V1(manifest) => manifest.epoch,
216        }
217    }
218}
219
220/// Creates a FileMetadata of the provided file path, which is overwritten with
221/// compressed data of the original file.
222pub fn create_file_metadata(
223    file_path: &std::path::Path,
224    file_compression: FileCompression,
225    file_type: FileType,
226    bucket_num: u32,
227    part_num: u32,
228) -> Result<FileMetadata> {
229    // Overwrites the file with compressed data of the original file.
230    file_compression.compress(file_path)?;
231    // Computes the sha3 checksum of the compressed file.
232    let sha3_digest = compute_sha3_checksum(file_path)?;
233    let file_metadata = FileMetadata {
234        file_type,
235        bucket_num,
236        part_num,
237        file_compression,
238        sha3_digest,
239    };
240    Ok(file_metadata)
241}
242
243pub async fn setup_db_state(
244    epoch: u64,
245    accumulator: Accumulator,
246    perpetual_db: Arc<AuthorityPerpetualTables>,
247    checkpoint_store: Arc<CheckpointStore>,
248    committee_store: Arc<CommitteeStore>,
249    verify: bool,
250    num_live_objects: u64,
251    m: MultiProgress,
252) -> Result<()> {
253    // This function should be called once state accumulator based hash verification
254    // is complete and live object set state is downloaded to local store
255    let system_state_object = get_iota_system_state(&perpetual_db)?;
256    let new_epoch_start_state = system_state_object.into_epoch_start_state();
257    let next_epoch_committee = new_epoch_start_state.get_iota_committee();
258    let root_digest: ECMHLiveObjectSetDigest = accumulator.digest().into();
259    let last_checkpoint = checkpoint_store
260        .get_epoch_last_checkpoint(epoch)
261        .expect("Error loading last checkpoint for current epoch")
262        .expect("Could not load last checkpoint for current epoch");
263    let flags = EpochFlag::default_for_no_config();
264    let epoch_start_configuration = EpochStartConfiguration::new(
265        new_epoch_start_state,
266        *last_checkpoint.digest(),
267        &perpetual_db,
268        flags,
269    )
270    .unwrap();
271    perpetual_db.set_epoch_start_configuration(&epoch_start_configuration)?;
272    perpetual_db.insert_root_state_hash(epoch, last_checkpoint.sequence_number, accumulator)?;
273    perpetual_db.set_highest_pruned_checkpoint_without_wb(last_checkpoint.sequence_number)?;
274    committee_store.insert_new_committee(&next_epoch_committee)?;
275    checkpoint_store.update_highest_executed_checkpoint(&last_checkpoint)?;
276
277    if verify {
278        let iter = perpetual_db.iter_live_object_set();
279        let local_digest = ECMHLiveObjectSetDigest::from(
280            accumulate_live_object_iter(Box::new(iter), m.clone(), num_live_objects)
281                .await
282                .digest(),
283        );
284        assert_eq!(
285            root_digest, local_digest,
286            "End of epoch {} root state digest {} does not match \
287                local root state hash {} after restoring db from formal snapshot",
288            epoch, root_digest.digest, local_digest.digest,
289        );
290        println!("DB live object state verification completed successfully!");
291    }
292
293    Ok(())
294}
295
296pub async fn accumulate_live_object_iter(
297    iter: Box<dyn Iterator<Item = LiveObject> + '_>,
298    m: MultiProgress,
299    num_live_objects: u64,
300) -> Accumulator {
301    // Monitor progress of live object accumulation
302    let accum_progress_bar = m.add(ProgressBar::new(num_live_objects).with_style(
303        ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})").unwrap(),
304    ));
305    let accum_counter = Arc::new(AtomicU64::new(0));
306    let cloned_accum_counter = accum_counter.clone();
307    let cloned_progress_bar = accum_progress_bar.clone();
308    let handle = tokio::spawn(async move {
309        let a_instant = Instant::now();
310        loop {
311            if cloned_progress_bar.is_finished() {
312                break;
313            }
314            let num_accumulated = cloned_accum_counter.load(Ordering::Relaxed);
315            assert!(
316                num_accumulated <= num_live_objects,
317                "Accumulated more objects (at least {num_accumulated}) than expected ({num_live_objects})"
318            );
319            let accumulations_per_sec = num_accumulated as f64 / a_instant.elapsed().as_secs_f64();
320            cloned_progress_bar.set_position(num_accumulated);
321            cloned_progress_bar.set_message(format!(
322                "DB live obj accumulations per sec: {}",
323                accumulations_per_sec
324            ));
325            tokio::time::sleep(Duration::from_secs(1)).await;
326        }
327    });
328
329    // Accumulate live objects
330    let mut acc = Accumulator::default();
331    for live_object in iter {
332        match live_object {
333            LiveObject::Normal(object) => {
334                acc.insert(object.compute_object_reference().2);
335            }
336            LiveObject::Wrapped(key) => {
337                acc.insert(
338                    bcs::to_bytes(&WrappedObject::new(key.0, key.1))
339                        .expect("Failed to serialize WrappedObject"),
340                );
341            }
342        }
343        accum_counter.fetch_add(1, Ordering::Relaxed);
344    }
345    accum_progress_bar.finish_with_message("DB live object accumulation completed");
346    handle
347        .await
348        .expect("Failed to join live object accumulation progress monitor");
349    acc
350}