iota_snapshot/
writer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7use std::{
8    collections::{HashMap, hash_map::Entry::Vacant},
9    fs,
10    fs::{File, OpenOptions},
11    io::{BufWriter, Seek, SeekFrom, Write},
12    num::NonZeroUsize,
13    path::PathBuf,
14    sync::Arc,
15};
16
17use anyhow::{Context, Result};
18use byteorder::{BigEndian, ByteOrder};
19use fastcrypto::hash::MultisetHash;
20use futures::StreamExt;
21use integer_encoding::VarInt;
22use iota_config::object_storage_config::ObjectStoreConfig;
23use iota_core::{
24    authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject},
25    state_accumulator::StateAccumulator,
26};
27use iota_storage::{
28    blob::{BLOB_ENCODING_BYTES, Blob, BlobEncoding},
29    object_store::util::{copy_file, delete_recursively, path_to_filesystem},
30};
31use iota_types::{
32    accumulator::Accumulator,
33    base_types::{ObjectID, ObjectRef},
34    messages_checkpoint::ECMHLiveObjectSetDigest,
35};
36use object_store::{DynObjectStore, path::Path};
37use tokio::{
38    sync::{
39        mpsc,
40        mpsc::{Receiver, Sender},
41    },
42    task::JoinHandle,
43};
44use tokio_stream::wrappers::ReceiverStream;
45use tracing::debug;
46
47use crate::{
48    FILE_MAX_BYTES, FileCompression, FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC,
49    Manifest, ManifestV1, OBJECT_FILE_MAGIC, OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC,
50    SEQUENCE_NUM_BYTES, compute_sha3_checksum, create_file_metadata,
51};
52
53/// LiveObjectSetWriterV1 writes live object set. It creates multiple *.obj
54/// files and *.ref file
55struct LiveObjectSetWriterV1 {
56    dir_path: PathBuf,
57    bucket_num: u32,
58    current_part_num: u32,
59    obj_wbuf: BufWriter<File>,
60    ref_wbuf: BufWriter<File>,
61    object_file_size: usize,
62    files: Vec<FileMetadata>,
63    sender: Option<Sender<FileMetadata>>,
64    file_compression: FileCompression,
65}
66
67impl LiveObjectSetWriterV1 {
68    fn new(
69        dir_path: PathBuf,
70        bucket_num: u32,
71        file_compression: FileCompression,
72        sender: Sender<FileMetadata>,
73    ) -> Result<Self> {
74        let part_num = 1;
75        let (n, obj_file) = Self::object_file(dir_path.clone(), bucket_num, part_num)?;
76        let ref_file = Self::ref_file(dir_path.clone(), bucket_num, part_num)?;
77        Ok(LiveObjectSetWriterV1 {
78            dir_path,
79            bucket_num,
80            current_part_num: part_num,
81            obj_wbuf: BufWriter::new(obj_file),
82            ref_wbuf: BufWriter::new(ref_file),
83            object_file_size: n,
84            files: vec![],
85            sender: Some(sender),
86            file_compression,
87        })
88    }
89
90    /// Writes a live object to the object file and the reference to the
91    /// reference file.
92    pub fn write(&mut self, object: &LiveObject) -> Result<()> {
93        let object_reference = object.object_reference();
94        self.write_object(object)?;
95        self.write_object_ref(&object_reference)?;
96        Ok(())
97    }
98
99    /// Finalizes the object and reference files and returns the FileMetadata of
100    /// the files.
101    pub fn done(mut self) -> Result<Vec<FileMetadata>> {
102        self.finalize_obj()?;
103        self.finalize_ref()?;
104        self.sender = None;
105        Ok(self.files.clone())
106    }
107
108    /// Creates a new object file for the provided bucket number and part
109    /// number, and returns the file and the number of bytes written to it.
110    fn object_file(dir_path: PathBuf, bucket_num: u32, part_num: u32) -> Result<(usize, File)> {
111        let next_part_file_path = dir_path.join(format!("{bucket_num}_{part_num}.obj"));
112        let next_part_file_tmp_path = dir_path.join(format!("{bucket_num}_{part_num}.obj.tmp"));
113        let mut f = File::create(next_part_file_tmp_path.clone())?;
114        let mut metab = [0u8; MAGIC_BYTES];
115        BigEndian::write_u32(&mut metab, OBJECT_FILE_MAGIC);
116        f.rewind()?;
117        let n = f.write(&metab)?;
118        drop(f);
119        fs::rename(next_part_file_tmp_path, next_part_file_path.clone())?;
120        let mut f = OpenOptions::new().append(true).open(next_part_file_path)?;
121        f.seek(SeekFrom::Start(n as u64))?;
122        Ok((n, f))
123    }
124
125    /// Creates a new reference file for the provided bucket number and part
126    /// number, and returns the file and the number of bytes written to the
127    /// file.
128    fn ref_file(dir_path: PathBuf, bucket_num: u32, part_num: u32) -> Result<File> {
129        let ref_path = dir_path.join(format!("{bucket_num}_{part_num}.ref"));
130        let ref_tmp_path = dir_path.join(format!("{bucket_num}_{part_num}.ref.tmp"));
131        let mut f = File::create(ref_tmp_path.clone())?;
132        f.rewind()?;
133        let mut metab = [0u8; MAGIC_BYTES];
134        BigEndian::write_u32(&mut metab, REFERENCE_FILE_MAGIC);
135        let n = f.write(&metab)?;
136        drop(f);
137        fs::rename(ref_tmp_path, ref_path.clone())?;
138        let mut f = OpenOptions::new().append(true).open(ref_path)?;
139        f.seek(SeekFrom::Start(n as u64))?;
140        Ok(f)
141    }
142
143    /// Finalizes the object file by flushing the buffer to disk and sends its
144    /// FileMetadata to the channel.
145    fn finalize_obj(&mut self) -> Result<()> {
146        // Flushes the buffer and sync the data to disk
147        self.obj_wbuf.flush()?;
148        self.obj_wbuf.get_ref().sync_data()?;
149        let off = self.obj_wbuf.get_ref().stream_position()?;
150        self.obj_wbuf.get_ref().set_len(off)?;
151        let file_path = self
152            .dir_path
153            .join(format!("{}_{}.obj", self.bucket_num, self.current_part_num));
154        let file_metadata = create_file_metadata(
155            &file_path,
156            self.file_compression,
157            FileType::Object,
158            self.bucket_num,
159            self.current_part_num,
160        )?;
161        self.files.push(file_metadata.clone());
162        if let Some(sender) = &self.sender {
163            sender.blocking_send(file_metadata)?;
164        }
165        Ok(())
166    }
167
168    /// Finalizes the reference file by flushing the buffer to disk and sends
169    /// its FileMetadata to the channel.
170    fn finalize_ref(&mut self) -> Result<()> {
171        // Flushes the buffer and sync the data to disk
172        self.ref_wbuf.flush()?;
173        self.ref_wbuf.get_ref().sync_data()?;
174        let off = self.ref_wbuf.get_ref().stream_position()?;
175        self.ref_wbuf.get_ref().set_len(off)?;
176        let file_path = self
177            .dir_path
178            .join(format!("{}_{}.ref", self.bucket_num, self.current_part_num));
179        let file_metadata = create_file_metadata(
180            &file_path,
181            self.file_compression,
182            FileType::Reference,
183            self.bucket_num,
184            self.current_part_num,
185        )?;
186        self.files.push(file_metadata.clone());
187        if let Some(sender) = &self.sender {
188            sender.blocking_send(file_metadata)?;
189        }
190        Ok(())
191    }
192
193    /// Finalizes the object file of current partition and creates a new one for
194    /// the next partition.
195    fn cut(&mut self) -> Result<()> {
196        self.finalize_obj()?;
197        let (n, f) = Self::object_file(
198            self.dir_path.clone(),
199            self.bucket_num,
200            self.current_part_num + 1,
201        )?;
202        self.object_file_size = n;
203        self.obj_wbuf = BufWriter::new(f);
204        Ok(())
205    }
206
207    /// Finalizes the reference file of current partition and creates a new one
208    /// for the next partition.
209    fn cut_reference_file(&mut self) -> Result<()> {
210        self.finalize_ref()?;
211        let f = Self::ref_file(
212            self.dir_path.clone(),
213            self.bucket_num,
214            self.current_part_num + 1,
215        )?;
216        self.ref_wbuf = BufWriter::new(f);
217        Ok(())
218    }
219
220    /// Writes a live object to the object file. Creates a new partition (new
221    /// object file and reference file) if it exceeds the maximum size.
222    fn write_object(&mut self, object: &LiveObject) -> Result<()> {
223        let blob = Blob::encode(object, BlobEncoding::Bcs)?;
224        let mut blob_size = blob.data.len().required_space();
225        blob_size += BLOB_ENCODING_BYTES;
226        blob_size += blob.data.len();
227        let cut_new_part_file = (self.object_file_size + blob_size) > FILE_MAX_BYTES;
228        if cut_new_part_file {
229            self.cut()?;
230            self.cut_reference_file()?;
231            self.current_part_num += 1;
232        }
233        self.object_file_size += blob.write(&mut self.obj_wbuf)?;
234        Ok(())
235    }
236
237    /// Writes an object reference to the reference file.
238    fn write_object_ref(&mut self, object_ref: &ObjectRef) -> Result<()> {
239        let mut buf = [0u8; OBJECT_REF_BYTES];
240        buf[0..ObjectID::LENGTH].copy_from_slice(object_ref.0.as_ref());
241        BigEndian::write_u64(
242            &mut buf[ObjectID::LENGTH..OBJECT_REF_BYTES],
243            object_ref.1.value(),
244        );
245        buf[ObjectID::LENGTH + SEQUENCE_NUM_BYTES..OBJECT_REF_BYTES]
246            .copy_from_slice(object_ref.2.as_ref());
247        self.ref_wbuf.write_all(&buf)?;
248        Ok(())
249    }
250}
251
252/// StateSnapshotWriterV1 writes snapshot files to a local staging dir and
253/// simultaneously uploads them to a remote object store
254pub struct StateSnapshotWriterV1 {
255    local_staging_dir: PathBuf,
256    file_compression: FileCompression,
257    remote_object_store: Arc<DynObjectStore>,
258    local_staging_store: Arc<DynObjectStore>,
259    concurrency: usize,
260}
261
262impl StateSnapshotWriterV1 {
263    pub async fn new_from_store(
264        local_staging_path: &std::path::Path,
265        local_staging_store: &Arc<DynObjectStore>,
266        remote_object_store: &Arc<DynObjectStore>,
267        file_compression: FileCompression,
268        concurrency: NonZeroUsize,
269    ) -> Result<Self> {
270        Ok(StateSnapshotWriterV1 {
271            file_compression,
272            local_staging_dir: local_staging_path.to_path_buf(),
273            remote_object_store: remote_object_store.clone(),
274            local_staging_store: local_staging_store.clone(),
275            concurrency: concurrency.get(),
276        })
277    }
278
279    pub async fn new(
280        local_store_config: &ObjectStoreConfig,
281        remote_store_config: &ObjectStoreConfig,
282        file_compression: FileCompression,
283        concurrency: NonZeroUsize,
284    ) -> Result<Self> {
285        let remote_object_store = remote_store_config.make()?;
286        let local_staging_store = local_store_config.make()?;
287        let local_staging_dir = local_store_config
288            .directory
289            .as_ref()
290            .context("No local directory specified")?
291            .clone();
292        Ok(StateSnapshotWriterV1 {
293            local_staging_dir,
294            file_compression,
295            remote_object_store,
296            local_staging_store,
297            concurrency: concurrency.get(),
298        })
299    }
300
301    /// Retrieves the system state object from the perpetual database, writes
302    /// the state snapshot for the specified epoch to the local staging
303    /// directory, and uploads it to the remote store.
304    pub async fn write(
305        self,
306        epoch: u64,
307        perpetual_db: Arc<AuthorityPerpetualTables>,
308        root_state_hash: ECMHLiveObjectSetDigest,
309    ) -> Result<()> {
310        self.write_internal(epoch, perpetual_db, root_state_hash)
311            .await
312    }
313
314    /// Writes the state snapshot for the provided epoch to the local staging
315    /// directory and uploads it to the remote store.
316    pub(crate) async fn write_internal(
317        mut self,
318        epoch: u64,
319        perpetual_db: Arc<AuthorityPerpetualTables>,
320        root_state_hash: ECMHLiveObjectSetDigest,
321    ) -> Result<()> {
322        self.setup_epoch_dir(epoch).await?;
323
324        let manifest_file_path = self.epoch_dir(epoch).child("MANIFEST");
325        let local_staging_dir = self.local_staging_dir.clone();
326        let local_object_store = self.local_staging_store.clone();
327        let remote_object_store = self.remote_object_store.clone();
328
329        let (sender, receiver) = mpsc::channel::<FileMetadata>(1000);
330        // Starts the upload loop, which listens on the receiver for FileMetadata
331        let upload_handle = self.start_upload(epoch, receiver)?;
332        let write_handler = tokio::task::spawn_blocking(move || {
333            self.write_live_object_set(
334                epoch,
335                perpetual_db,
336                sender,
337                Self::bucket_func,
338                root_state_hash,
339            )
340        });
341        // Awaits the object and reference files to be written to the local staging
342        // directory and informs the upload loop
343        write_handler.await?.context(format!(
344            "Failed to write state snapshot for epoch: {}",
345            &epoch
346        ))?;
347
348        // Awaits the upload loop to finish
349        upload_handle.await?.context(format!(
350            "Failed to upload state snapshot for epoch: {}",
351            &epoch
352        ))?;
353
354        // Syncs the manifest file to the remote store
355        Self::sync_file_to_remote(
356            local_staging_dir,
357            manifest_file_path,
358            local_object_store,
359            remote_object_store,
360        )
361        .await?;
362        Ok(())
363    }
364
365    /// Starts listening on the receiver for FileMetadata and uploads the files
366    /// to the remote store in parallel.
367    fn start_upload(
368        &self,
369        epoch: u64,
370        receiver: Receiver<FileMetadata>,
371    ) -> Result<JoinHandle<Result<Vec<()>, anyhow::Error>>> {
372        let remote_object_store = self.remote_object_store.clone();
373        let local_staging_store = self.local_staging_store.clone();
374        let local_dir_path = self.local_staging_dir.clone();
375        let epoch_dir = self.epoch_dir(epoch);
376        let upload_concurrency = self.concurrency;
377        let join_handle = tokio::spawn(async move {
378            // Uploads the files to the remote store in parallel for each received
379            // FileMetadata
380            let results: Vec<Result<(), anyhow::Error>> = ReceiverStream::new(receiver)
381                .map(|file_metadata| {
382                    let file_path = file_metadata.file_path(&epoch_dir);
383                    let remote_object_store = remote_object_store.clone();
384                    let local_object_store = local_staging_store.clone();
385                    let local_dir_path = local_dir_path.clone();
386                    async move {
387                        Self::sync_file_to_remote(
388                            local_dir_path.clone(),
389                            file_path.clone(),
390                            local_object_store.clone(),
391                            remote_object_store.clone(),
392                        )
393                        .await?;
394                        Ok(())
395                    }
396                })
397                .boxed()
398                .buffer_unordered(upload_concurrency)
399                .collect()
400                .await;
401            results
402                .into_iter()
403                .collect::<Result<Vec<()>, anyhow::Error>>()
404        });
405        Ok(join_handle)
406    }
407
408    /// Writes the provided live object set in the form of reference files,
409    /// object files, and MANIFEST. These files are stored in the local
410    /// staging directory and the FileMetadata is sent to the channel.
411    fn write_live_object_set<F>(
412        &mut self,
413        epoch: u64,
414        perpetual_db: Arc<AuthorityPerpetualTables>,
415        sender: Sender<FileMetadata>,
416        bucket_func: F,
417        root_state_hash: ECMHLiveObjectSetDigest,
418    ) -> Result<()>
419    where
420        F: Fn(&LiveObject) -> u32,
421    {
422        let mut object_writers: HashMap<u32, LiveObjectSetWriterV1> = HashMap::new();
423        let local_staging_dir_path =
424            path_to_filesystem(self.local_staging_dir.clone(), &self.epoch_dir(epoch))?;
425        let mut acc = Accumulator::default();
426        for object in perpetual_db.iter_live_object_set() {
427            StateAccumulator::accumulate_live_object(&mut acc, &object);
428            let bucket_num = bucket_func(&object);
429            // Creates a new LiveObjectSetWriterV1 for the bucket if it does not exist
430            if let Vacant(entry) = object_writers.entry(bucket_num) {
431                entry.insert(LiveObjectSetWriterV1::new(
432                    local_staging_dir_path.clone(),
433                    bucket_num,
434                    self.file_compression,
435                    sender.clone(),
436                )?);
437            }
438            let writer = object_writers
439                .get_mut(&bucket_num)
440                .context("Unexpected missing bucket writer")?;
441            writer.write(&object)?;
442        }
443        assert_eq!(
444            ECMHLiveObjectSetDigest::from(acc.digest()),
445            root_state_hash,
446            "Root state hash mismatch!"
447        );
448        let mut files = vec![];
449        // Flushes the object and reference files to disk, informs the file channel of
450        // flushed files and get the FileMetadata
451        for (_, writer) in object_writers.into_iter() {
452            files.extend(writer.done()?);
453        }
454        // Write the manifest file for the epoch(bucket)
455        self.write_manifest(epoch, files)?;
456        Ok(())
457    }
458
459    /// Writes the manifest file for the provided FileMetadata of an epoch and
460    /// its sha3 checksum.
461    fn write_manifest(&mut self, epoch: u64, file_metadata: Vec<FileMetadata>) -> Result<()> {
462        let (f, manifest_file_path) = self.manifest_file(epoch)?;
463        let mut wbuf = BufWriter::new(f);
464        let manifest: Manifest = Manifest::V1(ManifestV1 {
465            snapshot_version: 1,
466            address_length: ObjectID::LENGTH as u64,
467            file_metadata,
468            epoch,
469        });
470        let serialized_manifest = bcs::to_bytes(&manifest)?;
471        wbuf.write_all(&serialized_manifest)?;
472        wbuf.flush()?;
473        wbuf.get_ref().sync_data()?;
474        // Computes the sha3 checksum of the manifest file and write it to the end of
475        // the file
476        let sha3_digest = compute_sha3_checksum(&manifest_file_path)?;
477        wbuf.write_all(&sha3_digest)?;
478        wbuf.flush()?;
479        wbuf.get_ref().sync_data()?;
480        let off = wbuf.get_ref().stream_position()?;
481        wbuf.get_ref().set_len(off)?;
482        Ok(())
483    }
484
485    /// Creates a new manifest file for the provided epoch and returns the file
486    /// and the path to the file.
487    fn manifest_file(&mut self, epoch: u64) -> Result<(File, PathBuf)> {
488        let manifest_file_path = path_to_filesystem(
489            self.local_staging_dir.clone(),
490            &self.epoch_dir(epoch).child("MANIFEST"),
491        )?;
492        let manifest_file_tmp_path = path_to_filesystem(
493            self.local_staging_dir.clone(),
494            &self.epoch_dir(epoch).child("MANIFEST.tmp"),
495        )?;
496        let mut f = File::create(manifest_file_tmp_path.clone())?;
497        let mut metab = vec![0u8; MAGIC_BYTES];
498        BigEndian::write_u32(&mut metab, MANIFEST_FILE_MAGIC);
499        f.rewind()?;
500        f.write_all(&metab)?;
501        drop(f);
502        fs::rename(manifest_file_tmp_path, manifest_file_path.clone())?;
503        let mut f = OpenOptions::new()
504            .append(true)
505            .open(manifest_file_path.clone())?;
506        f.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
507        Ok((f, manifest_file_path))
508    }
509
510    fn bucket_func(_object: &LiveObject) -> u32 {
511        // TODO: Use the hash bucketing function used for accumulator tree if there is
512        // one
513        1u32
514    }
515
516    fn epoch_dir(&self, epoch: u64) -> Path {
517        Path::from(format!("epoch_{}", epoch))
518    }
519
520    /// Creates a new epoch directory and a new staging directory for the epoch
521    /// in the local store. Deletes the old ones if they exist.
522    async fn setup_epoch_dir(&self, epoch: u64) -> Result<()> {
523        let epoch_dir = self.epoch_dir(epoch);
524        // Deletes remote epoch dir if it exists
525        delete_recursively(
526            &epoch_dir,
527            &self.remote_object_store,
528            NonZeroUsize::new(self.concurrency).unwrap(),
529        )
530        .await?;
531        // Deletes local staging epoch dir if it exists
532        let local_epoch_dir_path = self.local_staging_dir.join(format!("epoch_{}", epoch));
533        if local_epoch_dir_path.exists() {
534            fs::remove_dir_all(&local_epoch_dir_path)?;
535        }
536        fs::create_dir_all(&local_epoch_dir_path)?;
537        Ok(())
538    }
539
540    /// Syncs a file from local store to remote store and removes the local file
541    async fn sync_file_to_remote(
542        local_path: PathBuf,
543        path: Path,
544        from: Arc<DynObjectStore>,
545        to: Arc<DynObjectStore>,
546    ) -> Result<()> {
547        debug!("Syncing snapshot file to remote: {:?}", path);
548        copy_file(&path, &path, &from, &to).await?;
549        fs::remove_file(path_to_filesystem(local_path, &path)?)?;
550        Ok(())
551    }
552}