iota_archival/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7pub mod reader;
8pub mod writer;
9
10#[cfg(test)]
11mod tests;
12
13use std::{
14    fs,
15    io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
16    num::NonZeroUsize,
17    ops::Range,
18    sync::{
19        Arc,
20        atomic::{AtomicU64, Ordering},
21    },
22    time::{Duration, Instant},
23};
24
25use anyhow::{Result, anyhow, bail};
26use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
27use bytes::Bytes;
28use fastcrypto::hash::{HashFunction, Sha3_256};
29use indicatif::{ProgressBar, ProgressStyle};
30use iota_config::{
31    genesis::Genesis, node::ArchiveReaderConfig, object_storage_config::ObjectStoreConfig,
32};
33use iota_storage::{
34    SHA3_BYTES,
35    blob::{Blob, BlobEncoding},
36    compute_sha3_checksum, compute_sha3_checksum_for_bytes,
37    object_store::{
38        ObjectStoreGetExt, ObjectStorePutExt,
39        util::{get, put},
40    },
41};
42use iota_types::{
43    base_types::ExecutionData,
44    messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents},
45    storage::{SingleCheckpointSharedInMemoryStore, WriteStore},
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use prometheus::Registry;
50use serde::{Deserialize, Serialize};
51use tracing::{error, info};
52
53use crate::reader::{ArchiveReader, ArchiveReaderMetrics};
54
55#[expect(rustdoc::invalid_html_tags)]
56/// Checkpoints and summaries are persisted as blob files. Files are committed
57/// to local store by duration or file size. Committed files are synced with the
58/// remote store continuously. Files are optionally compressed with the zstd
59/// compression format. Filenames follow the format <checkpoint_seq_num>.
60/// <suffix> where `checkpoint_seq_num` is the first checkpoint present in that
61/// file. MANIFEST is the index and source of truth for all files present in the
62/// archive.
63///
64/// State Archival Directory Layout
65///  - archive/
66///     - MANIFEST
67///     - epoch_0/
68///        - 0.chk
69///        - 0.sum
70///        - 1000.chk
71///        - 1000.sum
72///        - 3000.chk
73///        - 3000.sum
74///        - ...
75///        - 100000.chk
76///        - 100000.sum
77///     - epoch_1/
78///        - 101000.chk
79///        - ...
80///
81/// Blob File Disk Format
82/// ┌──────────────────────────────┐
83/// │       magic <4 byte>         │
84/// ├──────────────────────────────┤
85/// │  storage format <1 byte>     │
86/// ├──────────────────────────────┤
87/// │    file compression <1 byte> │
88/// ├──────────────────────────────┤
89/// │ ┌──────────────────────────┐ │
90/// │ │         Blob 1           │ │
91/// │ ├──────────────────────────┤ │
92/// │ │          ...             │ │
93/// │ ├──────────────────────────┤ │
94/// │ │        Blob N            │ │
95/// │ └──────────────────────────┘ │
96/// └──────────────────────────────┘
97/// Blob
98/// ┌───────────────┬───────────────────┬──────────────┐
99/// │ len <uvarint> │ encoding <1 byte> │ data <bytes> │
100/// └───────────────┴───────────────────┴──────────────┘
101///
102/// MANIFEST File Disk Format
103/// ┌──────────────────────────────┐
104/// │        magic<4 byte>         │
105/// ├──────────────────────────────┤
106/// │   serialized manifest        │
107/// ├──────────────────────────────┤
108/// │      sha3 <32 bytes>         │
109/// └──────────────────────────────┘
110pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
111pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
112const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
113const MAGIC_BYTES: usize = 4;
114const CHECKPOINT_FILE_SUFFIX: &str = "chk";
115const SUMMARY_FILE_SUFFIX: &str = "sum";
116const EPOCH_DIR_PREFIX: &str = "epoch_";
117const MANIFEST_FILENAME: &str = "MANIFEST";
118
119#[derive(
120    Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
121)]
122#[repr(u8)]
123pub enum FileType {
124    CheckpointContent = 0,
125    CheckpointSummary,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub struct FileMetadata {
130    pub file_type: FileType,
131    pub epoch_num: u64,
132    pub checkpoint_seq_range: Range<u64>,
133    pub sha3_digest: [u8; 32],
134}
135
136impl FileMetadata {
137    pub fn file_path(&self) -> Path {
138        let dir_path = Path::from(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num));
139        match self.file_type {
140            FileType::CheckpointContent => dir_path.child(&*format!(
141                "{}.{CHECKPOINT_FILE_SUFFIX}",
142                self.checkpoint_seq_range.start
143            )),
144            FileType::CheckpointSummary => dir_path.child(&*format!(
145                "{}.{SUMMARY_FILE_SUFFIX}",
146                self.checkpoint_seq_range.start
147            )),
148        }
149    }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
153pub struct ManifestV1 {
154    pub archive_version: u8,
155    pub next_checkpoint_seq_num: u64,
156    pub file_metadata: Vec<FileMetadata>,
157    pub epoch: u64,
158}
159
160#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
161pub enum Manifest {
162    V1(ManifestV1),
163}
164
165impl Manifest {
166    pub fn new(epoch: u64, next_checkpoint_seq_num: u64) -> Self {
167        Manifest::V1(ManifestV1 {
168            archive_version: 1,
169            next_checkpoint_seq_num,
170            file_metadata: vec![],
171            epoch,
172        })
173    }
174    pub fn files(&self) -> Vec<FileMetadata> {
175        match self {
176            Manifest::V1(manifest) => manifest.file_metadata.clone(),
177        }
178    }
179    pub fn epoch_num(&self) -> u64 {
180        match self {
181            Manifest::V1(manifest) => manifest.epoch,
182        }
183    }
184    pub fn next_checkpoint_seq_num(&self) -> u64 {
185        match self {
186            Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
187        }
188    }
189    pub fn next_checkpoint_after_epoch(&self, epoch_num: u64) -> u64 {
190        match self {
191            Manifest::V1(manifest) => {
192                let mut summary_files: Vec<_> = manifest
193                    .file_metadata
194                    .clone()
195                    .into_iter()
196                    .filter(|f| f.file_type == FileType::CheckpointSummary)
197                    .collect();
198                summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
199                assert!(
200                    summary_files
201                        .windows(2)
202                        .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
203                );
204                assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
205                summary_files
206                    .iter()
207                    .find(|f| f.epoch_num > epoch_num)
208                    .map(|f| f.checkpoint_seq_range.start)
209                    .unwrap_or(u64::MAX)
210            }
211        }
212    }
213
214    pub fn get_all_end_of_epoch_checkpoint_seq_numbers(&self) -> Result<Vec<u64>> {
215        match self {
216            Manifest::V1(manifest) => {
217                let mut summary_files: Vec<_> = manifest
218                    .file_metadata
219                    .clone()
220                    .into_iter()
221                    .filter(|f| f.file_type == FileType::CheckpointSummary)
222                    .collect();
223                summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
224                assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
225                // get last checkpoint seq num per epoch
226                let res = summary_files.windows(2).filter_map(|w| {
227                    assert_eq!(
228                        w[1].checkpoint_seq_range.start,
229                        w[0].checkpoint_seq_range.end
230                    );
231                    if w[1].epoch_num == w[0].epoch_num + 1 {
232                        Some(w[0].checkpoint_seq_range.end - 1)
233                    } else {
234                        None
235                    }
236                });
237                Ok(res.collect())
238            }
239        }
240    }
241
242    pub fn update(
243        &mut self,
244        epoch_num: u64,
245        checkpoint_sequence_number: u64,
246        checkpoint_file_metadata: FileMetadata,
247        summary_file_metadata: FileMetadata,
248    ) {
249        match self {
250            Manifest::V1(manifest) => {
251                manifest.epoch = epoch_num;
252                manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
253                if let Some(last_file_metadata) = manifest.file_metadata.last() {
254                    if last_file_metadata.checkpoint_seq_range
255                        == checkpoint_file_metadata.checkpoint_seq_range
256                    {
257                        return;
258                    }
259                }
260                manifest
261                    .file_metadata
262                    .extend(vec![checkpoint_file_metadata, summary_file_metadata]);
263            }
264        }
265    }
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
269pub struct CheckpointUpdates {
270    checkpoint_file_metadata: FileMetadata,
271    summary_file_metadata: FileMetadata,
272    manifest: Manifest,
273}
274
275impl CheckpointUpdates {
276    pub fn new(
277        epoch_num: u64,
278        checkpoint_sequence_number: u64,
279        checkpoint_file_metadata: FileMetadata,
280        summary_file_metadata: FileMetadata,
281        manifest: &mut Manifest,
282    ) -> Self {
283        manifest.update(
284            epoch_num,
285            checkpoint_sequence_number,
286            checkpoint_file_metadata.clone(),
287            summary_file_metadata.clone(),
288        );
289        CheckpointUpdates {
290            checkpoint_file_metadata,
291            summary_file_metadata,
292            manifest: manifest.clone(),
293        }
294    }
295    pub fn content_file_path(&self) -> Path {
296        self.checkpoint_file_metadata.file_path()
297    }
298    pub fn summary_file_path(&self) -> Path {
299        self.summary_file_metadata.file_path()
300    }
301    pub fn manifest_file_path(&self) -> Path {
302        Path::from(MANIFEST_FILENAME)
303    }
304}
305
306pub fn create_file_metadata(
307    file_path: &std::path::Path,
308    file_type: FileType,
309    epoch_num: u64,
310    checkpoint_seq_range: Range<u64>,
311) -> Result<FileMetadata> {
312    let sha3_digest = compute_sha3_checksum(file_path)?;
313    let file_metadata = FileMetadata {
314        file_type,
315        epoch_num,
316        checkpoint_seq_range,
317        sha3_digest,
318    };
319    Ok(file_metadata)
320}
321
322pub fn create_file_metadata_from_bytes(
323    bytes: Bytes,
324    file_type: FileType,
325    epoch_num: u64,
326    checkpoint_seq_range: Range<u64>,
327) -> Result<FileMetadata> {
328    let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
329    let file_metadata = FileMetadata {
330        file_type,
331        epoch_num,
332        checkpoint_seq_range,
333        sha3_digest,
334    };
335    Ok(file_metadata)
336}
337
338/// Reads the manifest file from the store.
339pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
340    let manifest_file_path = Path::from(MANIFEST_FILENAME);
341    let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
342    read_manifest_from_bytes(vec)
343}
344
345/// Reads the manifest file from the given byte vector and verifies the
346/// integrity of the file.
347pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
348    let manifest_file_size = vec.len();
349    let mut manifest_reader = Cursor::new(vec);
350
351    // Reads from the beginning of the file and verifies the magic byte
352    // is MANIFEST_FILE_MAGIC
353    manifest_reader.rewind()?;
354    let magic = manifest_reader.read_u32::<BigEndian>()?;
355    if magic != MANIFEST_FILE_MAGIC {
356        bail!("Unexpected magic byte in manifest: {}", magic);
357    }
358
359    // Reads from the end of the file and gets the SHA3 checksum
360    // of the content.
361    manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
362    let mut sha3_digest = [0u8; SHA3_BYTES];
363    manifest_reader.read_exact(&mut sha3_digest)?;
364
365    // Reads the content of the file and verifies the SHA3 checksum
366    // of the content matches the stored checksum.
367    manifest_reader.rewind()?;
368    let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
369    manifest_reader.read_exact(&mut content_buf)?;
370    let mut hasher = Sha3_256::default();
371    hasher.update(&content_buf);
372    let computed_digest = hasher.finalize().digest;
373    if computed_digest != sha3_digest {
374        bail!(
375            "Manifest corrupted, computed checksum: {:?}, stored checksum: {:?}",
376            computed_digest,
377            sha3_digest
378        );
379    }
380    manifest_reader.rewind()?;
381    manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
382    Blob::read(&mut manifest_reader)?.decode()
383}
384
385/// Computes the SHA3 checksum of the Manifest and writes it to a byte vector.
386pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
387    let mut buf = BufWriter::new(vec![]);
388    buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
389    let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
390    blob.write(&mut buf)?;
391    buf.flush()?;
392    let mut hasher = Sha3_256::default();
393    hasher.update(buf.get_ref());
394    let computed_digest = hasher.finalize().digest;
395    buf.write_all(&computed_digest)?;
396    Ok(Bytes::from(buf.into_inner()?))
397}
398
399/// Writes the Manifest to the remote store.
400pub async fn write_manifest<S: ObjectStorePutExt>(
401    manifest: Manifest,
402    remote_store: S,
403) -> Result<()> {
404    let path = Path::from(MANIFEST_FILENAME);
405    let bytes = finalize_manifest(manifest)?;
406    put(&remote_store, &path, bytes).await?;
407    Ok(())
408}
409
410pub async fn read_manifest_as_json(remote_store_config: ObjectStoreConfig) -> Result<String> {
411    let metrics = ArchiveReaderMetrics::new(&Registry::default());
412    let config = ArchiveReaderConfig {
413        remote_store_config,
414        download_concurrency: NonZeroUsize::new(1).unwrap(),
415        use_for_pruning_watermark: false,
416    };
417    let archive_reader = ArchiveReader::new(config, &metrics)?;
418    archive_reader.sync_manifest_once().await?;
419    let manifest = archive_reader.get_manifest().await?;
420    let json = serde_json::to_string(&manifest).expect("Failed to serialize object");
421    Ok(json)
422}
423
424pub async fn write_manifest_from_json(
425    remote_store_config: ObjectStoreConfig,
426    json_manifest_path: std::path::PathBuf,
427) -> Result<()> {
428    let manifest: Manifest = serde_json::from_str(&fs::read_to_string(json_manifest_path)?)?;
429    let store = remote_store_config.make()?;
430    write_manifest(manifest, store).await?;
431    Ok(())
432}
433
434/// Loads and stores states from genesis then verifies and stores the
435/// checkpoints from the archive store.
436pub async fn verify_archive_with_genesis_config(
437    genesis: &std::path::Path,
438    remote_store_config: ObjectStoreConfig,
439    concurrency: usize,
440    interactive: bool,
441    num_retries: u32,
442) -> Result<()> {
443    let genesis = Genesis::load(genesis).unwrap();
444    let genesis_committee = genesis.committee()?;
445    let mut store = SingleCheckpointSharedInMemoryStore::default();
446    let contents = genesis.checkpoint_contents();
447    let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
448        contents.clone(),
449        std::iter::once(ExecutionData::new(
450            genesis.transaction().clone(),
451            genesis.effects().clone(),
452        )),
453    );
454    store.insert_genesis_state(
455        genesis.checkpoint(),
456        VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
457        genesis_committee,
458    );
459
460    let num_retries = std::cmp::max(num_retries, 1);
461    for _ in 0..num_retries {
462        match verify_archive_with_local_store(
463            store.clone(),
464            remote_store_config.clone(),
465            concurrency,
466            interactive,
467        )
468        .await
469        {
470            Ok(_) => return Ok(()),
471            Err(e) => {
472                error!("Error while verifying archive: {}", e);
473                tokio::time::sleep(Duration::from_secs(10)).await;
474            }
475        }
476    }
477
478    bail!("Failed to verify archive after {} retries", num_retries)
479}
480
481pub async fn verify_archive_with_checksums(
482    remote_store_config: ObjectStoreConfig,
483    concurrency: usize,
484) -> Result<()> {
485    let metrics = ArchiveReaderMetrics::new(&Registry::default());
486    let config = ArchiveReaderConfig {
487        remote_store_config,
488        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
489        use_for_pruning_watermark: false,
490    };
491    // Gets the Manifest from the remote store.
492    let archive_reader = ArchiveReader::new(config, &metrics)?;
493    archive_reader.sync_manifest_once().await?;
494    let manifest = archive_reader.get_manifest().await?;
495    info!(
496        "Next checkpoint in archive store: {}",
497        manifest.next_checkpoint_seq_num()
498    );
499
500    let file_metadata = archive_reader.verify_manifest(manifest).await?;
501    // Account for both summary and content files
502    let num_files = file_metadata.len() * 2;
503    archive_reader
504        .verify_file_consistency(file_metadata)
505        .await?;
506    info!("All {} files are valid", num_files);
507    Ok(())
508}
509
510/// Verifies the archive store by reading the checkpoints from the remote store
511/// and storing them in the local one.
512pub async fn verify_archive_with_local_store<S>(
513    store: S,
514    remote_store_config: ObjectStoreConfig,
515    concurrency: usize,
516    interactive: bool,
517) -> Result<()>
518where
519    S: WriteStore + Clone + Send + 'static,
520{
521    let metrics = ArchiveReaderMetrics::new(&Registry::default());
522    let config = ArchiveReaderConfig {
523        remote_store_config,
524        download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
525        use_for_pruning_watermark: false,
526    };
527    let archive_reader = ArchiveReader::new(config, &metrics)?;
528    // Gets the Manifest from the remote store.
529    archive_reader.sync_manifest_once().await?;
530    let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
531    info!(
532        "Latest available checkpoint in archive store: {}",
533        latest_checkpoint_in_archive
534    );
535    let latest_checkpoint = store
536        .try_get_highest_synced_checkpoint()
537        .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
538        .sequence_number;
539    info!("Highest synced checkpoint in db: {latest_checkpoint}");
540    let txn_counter = Arc::new(AtomicU64::new(0));
541    let checkpoint_counter = Arc::new(AtomicU64::new(0));
542    let progress_bar = if interactive {
543        let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
544            ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
545                .unwrap(),
546        );
547        let cloned_progress_bar = progress_bar.clone();
548        let cloned_counter = txn_counter.clone();
549        let cloned_checkpoint_counter = checkpoint_counter.clone();
550        let instant = Instant::now();
551        tokio::spawn(async move {
552            loop {
553                let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
554                let total_checkpoints_per_sec =
555                    total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
556                let total_txns_per_sec =
557                    cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
558                cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
559                cloned_progress_bar.set_message(format!(
560                    "checkpoints/s: {total_checkpoints_per_sec}, txns/s: {total_txns_per_sec}"
561                ));
562                tokio::time::sleep(Duration::from_secs(1)).await;
563            }
564        });
565        Some(progress_bar)
566    } else {
567        let cloned_store = store.clone();
568        tokio::spawn(async move {
569            loop {
570                let latest_checkpoint = cloned_store
571                    .try_get_highest_synced_checkpoint()
572                    .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
573                    .sequence_number;
574                let percent = (latest_checkpoint * 100) / latest_checkpoint_in_archive;
575                info!("done = {percent}%");
576                tokio::time::sleep(Duration::from_secs(60)).await;
577                if percent >= 100 {
578                    break;
579                }
580            }
581            Ok::<(), anyhow::Error>(())
582        });
583        None
584    };
585    // Reads the checkpoints from the remote store and stores them in the local
586    // one.
587    archive_reader
588        .read(
589            store.clone(),
590            (latest_checkpoint + 1)..u64::MAX,
591            txn_counter,
592            checkpoint_counter,
593            true,
594        )
595        .await?;
596    progress_bar.iter().for_each(|p| p.finish_and_clear());
597    let end = store
598        .try_get_highest_synced_checkpoint()
599        .map_err(|_| anyhow!("Failed to read watermark"))?
600        .sequence_number;
601    info!("Highest verified checkpoint: {}", end);
602    Ok(())
603}