iota_archival/
writer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7use std::{
8    fs,
9    fs::{File, OpenOptions},
10    io::{BufWriter, Seek, SeekFrom, Write},
11    ops::Range,
12    path::{Path, PathBuf},
13    sync::Arc,
14    thread::sleep,
15    time::Duration,
16};
17
18use anyhow::{Context, Result, anyhow};
19use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
20use iota_config::object_storage_config::ObjectStoreConfig;
21use iota_storage::{
22    FileCompression, StorageFormat,
23    blob::{Blob, BlobEncoding},
24    compress,
25    object_store::util::{copy_file, path_to_filesystem},
26};
27use iota_types::{
28    messages_checkpoint::{
29        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber,
30        FullCheckpointContents as CheckpointContents,
31    },
32    storage::WriteStore,
33};
34use object_store::DynObjectStore;
35use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
36use tokio::{
37    sync::{
38        mpsc,
39        mpsc::{Receiver, Sender},
40    },
41    time::Instant,
42};
43use tracing::{debug, info};
44
45use crate::{
46    CHECKPOINT_FILE_MAGIC, CHECKPOINT_FILE_SUFFIX, CheckpointUpdates, EPOCH_DIR_PREFIX,
47    FileMetadata, FileType, MAGIC_BYTES, Manifest, SUMMARY_FILE_MAGIC, SUMMARY_FILE_SUFFIX,
48    create_file_metadata, read_manifest, write_manifest,
49};
50
51pub struct ArchiveMetrics {
52    pub latest_checkpoint_archived: IntGauge,
53}
54
55impl ArchiveMetrics {
56    pub fn new(registry: &Registry) -> Arc<Self> {
57        let this = Self {
58            latest_checkpoint_archived: register_int_gauge_with_registry!(
59                "latest_checkpoint_archived",
60                "Latest checkpoint to have archived to the remote store",
61                registry
62            )
63            .unwrap(),
64        };
65        Arc::new(this)
66    }
67}
68
69/// CheckpointWriter writes checkpoints and summaries. It creates multiple *.chk
70/// and *.sum files
71struct CheckpointWriter {
72    root_dir_path: PathBuf,
73    epoch_num: u64,
74    checkpoint_range: Range<u64>,
75    wbuf: BufWriter<File>,
76    summary_wbuf: BufWriter<File>,
77    sender: Sender<CheckpointUpdates>,
78    checkpoint_buf_offset: usize,
79    file_compression: FileCompression,
80    storage_format: StorageFormat,
81    manifest: Manifest,
82    last_commit_instant: Instant,
83    commit_duration: Duration,
84    commit_file_size: usize,
85}
86
87impl CheckpointWriter {
88    fn new(
89        root_dir_path: PathBuf,
90        file_compression: FileCompression,
91        storage_format: StorageFormat,
92        sender: Sender<CheckpointUpdates>,
93        manifest: Manifest,
94        commit_duration: Duration,
95        commit_file_size: usize,
96    ) -> Result<Self> {
97        let epoch_num = manifest.epoch_num();
98        let checkpoint_sequence_num = manifest.next_checkpoint_seq_num();
99        let epoch_dir = root_dir_path.join(format!("{}{epoch_num}", EPOCH_DIR_PREFIX));
100        if epoch_dir.exists() {
101            fs::remove_dir_all(&epoch_dir)?;
102        }
103        fs::create_dir_all(&epoch_dir)?;
104        let checkpoint_file = Self::next_file(
105            &epoch_dir,
106            checkpoint_sequence_num,
107            CHECKPOINT_FILE_SUFFIX,
108            CHECKPOINT_FILE_MAGIC,
109            storage_format,
110            file_compression,
111        )?;
112        let summary_file = Self::next_file(
113            &epoch_dir,
114            checkpoint_sequence_num,
115            SUMMARY_FILE_SUFFIX,
116            SUMMARY_FILE_MAGIC,
117            storage_format,
118            file_compression,
119        )?;
120        Ok(CheckpointWriter {
121            root_dir_path,
122            epoch_num,
123            checkpoint_range: checkpoint_sequence_num..checkpoint_sequence_num,
124            wbuf: BufWriter::new(checkpoint_file),
125            summary_wbuf: BufWriter::new(summary_file),
126            checkpoint_buf_offset: 0,
127            sender,
128            file_compression,
129            storage_format,
130            manifest,
131            last_commit_instant: Instant::now(),
132            commit_duration,
133            commit_file_size,
134        })
135    }
136
137    pub fn write(
138        &mut self,
139        checkpoint_contents: CheckpointContents,
140        checkpoint_summary: Checkpoint,
141    ) -> Result<()> {
142        match self.storage_format {
143            StorageFormat::Blob => self.write_as_blob(checkpoint_contents, checkpoint_summary),
144        }
145    }
146
147    pub fn write_as_blob(
148        &mut self,
149        checkpoint_contents: CheckpointContents,
150        checkpoint_summary: Checkpoint,
151    ) -> Result<()> {
152        assert_eq!(
153            checkpoint_summary.sequence_number,
154            self.checkpoint_range.end
155        );
156
157        if checkpoint_summary.epoch()
158            == self
159                .epoch_num
160                .checked_add(1)
161                .context("Epoch num overflow")?
162        {
163            self.cut()?;
164            self.update_to_next_epoch();
165            if self.epoch_dir().exists() {
166                fs::remove_dir_all(self.epoch_dir())?;
167            }
168            fs::create_dir_all(self.epoch_dir())?;
169            self.reset()?;
170        }
171
172        assert_eq!(checkpoint_summary.epoch, self.epoch_num);
173
174        assert_eq!(
175            checkpoint_summary.content_digest,
176            *checkpoint_contents.checkpoint_contents().digest()
177        );
178
179        let contents_blob = Blob::encode(&checkpoint_contents, BlobEncoding::Bcs)?;
180        let blob_size = contents_blob.size();
181        // Flushes the on-hold checkpoint and summary contents if it exceeds the commit
182        // file size or the commit duration has elapsed, then resets the writer
183        // to write the new contents.
184        let cut_new_checkpoint_file = (self.checkpoint_buf_offset + blob_size)
185            > self.commit_file_size
186            || (self.last_commit_instant.elapsed() > self.commit_duration);
187        if cut_new_checkpoint_file {
188            self.cut()?;
189            self.reset()?;
190        }
191
192        self.checkpoint_buf_offset += contents_blob.write(&mut self.wbuf)?;
193
194        let summary_blob = Blob::encode(&checkpoint_summary, BlobEncoding::Bcs)?;
195        summary_blob.write(&mut self.summary_wbuf)?;
196
197        self.checkpoint_range.end = self
198            .checkpoint_range
199            .end
200            .checked_add(1)
201            .context("Checkpoint sequence num overflow")?;
202        Ok(())
203    }
204    fn finalize(&mut self) -> Result<FileMetadata> {
205        self.wbuf.flush()?;
206        self.wbuf.get_ref().sync_data()?;
207        let off = self.wbuf.get_ref().stream_position()?;
208        self.wbuf.get_ref().set_len(off)?;
209        let file_path = self.epoch_dir().join(format!(
210            "{}.{CHECKPOINT_FILE_SUFFIX}",
211            self.checkpoint_range.start
212        ));
213        self.compress(&file_path)?;
214        let file_metadata = create_file_metadata(
215            &file_path,
216            FileType::CheckpointContent,
217            self.epoch_num,
218            self.checkpoint_range.clone(),
219        )?;
220        Ok(file_metadata)
221    }
222    fn finalize_summary(&mut self) -> Result<FileMetadata> {
223        self.summary_wbuf.flush()?;
224        self.summary_wbuf.get_ref().sync_data()?;
225        let off = self.summary_wbuf.get_ref().stream_position()?;
226        self.summary_wbuf.get_ref().set_len(off)?;
227        let file_path = self.epoch_dir().join(format!(
228            "{}.{SUMMARY_FILE_SUFFIX}",
229            self.checkpoint_range.start
230        ));
231        self.compress(&file_path)?;
232        let file_metadata = create_file_metadata(
233            &file_path,
234            FileType::CheckpointSummary,
235            self.epoch_num,
236            self.checkpoint_range.clone(),
237        )?;
238        Ok(file_metadata)
239    }
240
241    /// Finalizes the on-hold checkpoint and summary contents, and sends the
242    /// CheckpointUpdates to notify the channel listeners.
243    fn cut(&mut self) -> Result<()> {
244        if !self.checkpoint_range.is_empty() {
245            let checkpoint_file_metadata = self.finalize()?;
246            let summary_file_metadata = self.finalize_summary()?;
247            let checkpoint_updates = CheckpointUpdates::new(
248                self.epoch_num,
249                self.checkpoint_range.end,
250                checkpoint_file_metadata,
251                summary_file_metadata,
252                &mut self.manifest,
253            );
254            info!("Checkpoint file cut for: {:?}", checkpoint_updates);
255            self.sender.blocking_send(checkpoint_updates)?;
256        }
257        Ok(())
258    }
259    fn compress(&self, source: &Path) -> Result<()> {
260        if self.file_compression == FileCompression::None {
261            return Ok(());
262        }
263        let mut input = File::open(source)?;
264        let tmp_file_name = source.with_extension("tmp");
265        let mut output = File::create(&tmp_file_name)?;
266        compress(&mut input, &mut output)?;
267        fs::rename(tmp_file_name, source)?;
268        Ok(())
269    }
270    fn next_file(
271        dir_path: &Path,
272        checkpoint_sequence_num: u64,
273        suffix: &str,
274        magic_bytes: u32,
275        storage_format: StorageFormat,
276        file_compression: FileCompression,
277    ) -> Result<File> {
278        let next_file_path = dir_path.join(format!("{checkpoint_sequence_num}.{suffix}"));
279        let mut f = File::create(next_file_path.clone())?;
280        let mut metab = [0u8; MAGIC_BYTES];
281        BigEndian::write_u32(&mut metab, magic_bytes);
282        let n = f.write(&metab)?;
283        drop(f);
284        f = OpenOptions::new().append(true).open(next_file_path)?;
285        f.seek(SeekFrom::Start(n as u64))?;
286        f.write_u8(storage_format.into())?;
287        f.write_u8(file_compression.into())?;
288        Ok(f)
289    }
290    fn create_new_files(&mut self) -> Result<()> {
291        let f = Self::next_file(
292            &self.epoch_dir(),
293            self.checkpoint_range.start,
294            CHECKPOINT_FILE_SUFFIX,
295            CHECKPOINT_FILE_MAGIC,
296            self.storage_format,
297            self.file_compression,
298        )?;
299        self.checkpoint_buf_offset = MAGIC_BYTES;
300        self.wbuf = BufWriter::new(f);
301        let f = Self::next_file(
302            &self.epoch_dir(),
303            self.checkpoint_range.start,
304            SUMMARY_FILE_SUFFIX,
305            SUMMARY_FILE_MAGIC,
306            self.storage_format,
307            self.file_compression,
308        )?;
309        self.summary_wbuf = BufWriter::new(f);
310        Ok(())
311    }
312    fn reset(&mut self) -> Result<()> {
313        self.reset_checkpoint_range();
314        self.create_new_files()?;
315        self.reset_last_commit_ts();
316        Ok(())
317    }
318    fn reset_last_commit_ts(&mut self) {
319        self.last_commit_instant = Instant::now();
320    }
321    fn reset_checkpoint_range(&mut self) {
322        self.checkpoint_range = self.checkpoint_range.end..self.checkpoint_range.end
323    }
324    fn epoch_dir(&self) -> PathBuf {
325        self.root_dir_path
326            .join(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num))
327    }
328    fn update_to_next_epoch(&mut self) {
329        self.epoch_num = self.epoch_num.checked_add(1).unwrap();
330    }
331}
332
333/// ArchiveWriter archives history by tailing checkpoints writing them to a
334/// local staging dir and simultaneously uploading them to a remote object store
335pub struct ArchiveWriter {
336    file_compression: FileCompression,
337    storage_format: StorageFormat,
338    local_staging_dir_root: PathBuf,
339    local_object_store: Arc<DynObjectStore>,
340    remote_object_store: Arc<DynObjectStore>,
341    commit_duration: Duration,
342    commit_file_size: usize,
343    archive_metrics: Arc<ArchiveMetrics>,
344}
345
346impl ArchiveWriter {
347    pub async fn new(
348        local_store_config: ObjectStoreConfig,
349        remote_store_config: ObjectStoreConfig,
350        file_compression: FileCompression,
351        storage_format: StorageFormat,
352        commit_duration: Duration,
353        commit_file_size: usize,
354        registry: &Registry,
355    ) -> Result<Self> {
356        Ok(ArchiveWriter {
357            file_compression,
358            storage_format,
359            remote_object_store: remote_store_config.make()?,
360            local_object_store: local_store_config.make()?,
361            local_staging_dir_root: local_store_config.directory.context("Missing local dir")?,
362            commit_duration,
363            commit_file_size,
364            archive_metrics: ArchiveMetrics::new(registry),
365        })
366    }
367
368    /// Initializes the ArchiveWriter from the state of the remote archive store
369    /// if it exists; otherwise, it starts from genesis. It then creates
370    /// archive files for checkpoints and uploads them to the remote store.
371    pub async fn start<S>(&self, store: S) -> Result<tokio::sync::broadcast::Sender<()>>
372    where
373        S: WriteStore + Send + Sync + 'static,
374    {
375        let remote_archive_is_empty = self
376            .remote_object_store
377            .list_with_delimiter(None)
378            .await
379            .expect("Failed to read remote archive dir")
380            .common_prefixes
381            .is_empty();
382        let manifest = if remote_archive_is_empty {
383            // Start from genesis
384            Manifest::new(0, 0)
385        } else {
386            read_manifest(self.remote_object_store.clone())
387                .await
388                .expect("Failed to read manifest")
389        };
390        let start_checkpoint_sequence_number = manifest.next_checkpoint_seq_num();
391        let (sender, receiver) = mpsc::channel::<CheckpointUpdates>(100);
392        let checkpoint_writer = CheckpointWriter::new(
393            self.local_staging_dir_root.clone(),
394            self.file_compression,
395            self.storage_format,
396            sender,
397            manifest,
398            self.commit_duration,
399            self.commit_file_size,
400        )
401        .expect("Failed to create checkpoint writer");
402        let (kill_sender, kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
403
404        // Receives CheckpointUpdates and uploads them to the remote store.
405        tokio::spawn(Self::start_syncing_with_remote(
406            self.remote_object_store.clone(),
407            self.local_object_store.clone(),
408            self.local_staging_dir_root.clone(),
409            receiver,
410            kill_sender.subscribe(),
411            self.archive_metrics.clone(),
412        ));
413
414        // Tails checkpoints from the store and writes them to the CheckpointWriter.
415        tokio::task::spawn_blocking(move || {
416            Self::start_tailing_checkpoints(
417                start_checkpoint_sequence_number,
418                checkpoint_writer,
419                store,
420                kill_receiver,
421            )
422        });
423        Ok(kill_sender)
424    }
425
426    /// Checks if the checkpoint with `checkpoint_sequence_number` is available
427    /// to read from store, if not, sleeps for some time (3 secs) and retries.
428    /// If the checkpoint is available, writes the checkpoint contents and
429    /// summary to the CheckpointWriter.
430    fn start_tailing_checkpoints<S>(
431        start_checkpoint_sequence_number: CheckpointSequenceNumber,
432        mut checkpoint_writer: CheckpointWriter,
433        store: S,
434        mut kill: tokio::sync::broadcast::Receiver<()>,
435    ) -> Result<()>
436    where
437        S: WriteStore + Send + Sync + 'static,
438    {
439        let mut checkpoint_sequence_number = start_checkpoint_sequence_number;
440        info!("Starting checkpoint tailing from sequence number: {checkpoint_sequence_number}");
441
442        while kill.try_recv().is_err() {
443            if let Some(checkpoint_summary) = store
444                .get_checkpoint_by_sequence_number(checkpoint_sequence_number)
445                .map_err(|_| anyhow!("Failed to read checkpoint summary from store"))?
446            {
447                if let Some(checkpoint_contents) = store
448                    .get_full_checkpoint_contents(&checkpoint_summary.content_digest)
449                    .map_err(|_| anyhow!("Failed to read checkpoint content from store"))?
450                {
451                    checkpoint_writer
452                        .write(checkpoint_contents, checkpoint_summary.into_inner())?;
453                    checkpoint_sequence_number = checkpoint_sequence_number
454                        .checked_add(1)
455                        .context("checkpoint seq number overflow")?;
456                    // There is more checkpoints to tail, so continue without sleeping
457                    continue;
458                }
459            }
460            // Checkpoint with `checkpoint_sequence_number` is not available to read from
461            // store yet, sleep for sometime and then retry
462            sleep(Duration::from_secs(3));
463        }
464        Ok(())
465    }
466
467    /// By monitoring a channel that receives CheckpointUpdates, the system
468    /// uploads summary, checkpoint files, and MANIFEST to the remote store
469    async fn start_syncing_with_remote(
470        remote_object_store: Arc<DynObjectStore>,
471        local_object_store: Arc<DynObjectStore>,
472        local_staging_root_dir: PathBuf,
473        mut update_receiver: Receiver<CheckpointUpdates>,
474        mut kill: tokio::sync::broadcast::Receiver<()>,
475        metrics: Arc<ArchiveMetrics>,
476    ) -> Result<()> {
477        loop {
478            tokio::select! {
479                _ = kill.recv() => break,
480                updates = update_receiver.recv() => {
481                    if let Some(checkpoint_updates) = updates {
482                        info!("Received checkpoint update: {:?}", checkpoint_updates);
483                        let latest_checkpoint_seq_num = checkpoint_updates.manifest.next_checkpoint_seq_num();
484                        let summary_file_path = checkpoint_updates.summary_file_path();
485                        Self::sync_file_to_remote(
486                            local_staging_root_dir.clone(),
487                            summary_file_path,
488                            local_object_store.clone(),
489                            remote_object_store.clone()
490                        )
491                        .await
492                        .expect("Syncing checkpoint summary should not fail");
493
494                        let content_file_path = checkpoint_updates.content_file_path();
495                        Self::sync_file_to_remote(
496                            local_staging_root_dir.clone(),
497                            content_file_path,
498                            local_object_store.clone(),
499                            remote_object_store.clone()
500                        )
501                        .await
502                        .expect("Syncing checkpoint content should not fail");
503
504                        write_manifest(
505                            checkpoint_updates.manifest,
506                            remote_object_store.clone()
507                        )
508                        .await
509                        .expect("Updating manifest should not fail");
510                        metrics.latest_checkpoint_archived.set(latest_checkpoint_seq_num as i64)
511                    } else {
512                        info!("Terminating archive sync loop");
513                        break;
514                    }
515                },
516            }
517        }
518        Ok(())
519    }
520
521    /// Syncs a file to the remote store and deletes the local one.
522    async fn sync_file_to_remote(
523        dir: PathBuf,
524        path: object_store::path::Path,
525        from: Arc<DynObjectStore>,
526        to: Arc<DynObjectStore>,
527    ) -> Result<()> {
528        debug!("Syncing archive file to remote: {:?}", path);
529        copy_file(&path, &path, &from, &to).await?;
530        fs::remove_file(path_to_filesystem(dir, &path)?)?;
531        Ok(())
532    }
533}