iota_data_ingestion/workers/
archival.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{io::Cursor, sync::Arc};
6
7use async_trait::async_trait;
8use byteorder::{BigEndian, ByteOrder};
9use bytes::Bytes;
10use iota_archival::{
11    CHECKPOINT_FILE_MAGIC, FileType, Manifest, SUMMARY_FILE_MAGIC, create_file_metadata_from_bytes,
12    finalize_manifest, read_manifest_from_bytes,
13};
14use iota_data_ingestion_core::{Reducer, create_remote_store_client};
15use iota_storage::{
16    FileCompression, StorageFormat,
17    blob::{Blob, BlobEncoding},
18    compress,
19};
20use iota_types::{
21    base_types::{EpochId, ExecutionData},
22    full_checkpoint_content::CheckpointData,
23    messages_checkpoint::{CheckpointSequenceNumber, FullCheckpointContents},
24};
25use object_store::{ObjectStore, path::Path};
26use serde::{Deserialize, Serialize};
27
28use crate::workers::RelayWorker;
29
30#[derive(Serialize, Deserialize, Clone, Debug)]
31#[serde(rename_all = "kebab-case")]
32pub struct ArchivalConfig {
33    pub remote_url: String,
34    pub remote_store_options: Vec<(String, String)>,
35    pub commit_file_size: usize,
36    pub commit_duration_seconds: u64,
37}
38
39pub struct ArchivalReducer {
40    remote_store: Box<dyn ObjectStore>,
41    commit_duration_ms: u64,
42}
43
44impl ArchivalReducer {
45    pub async fn new(config: ArchivalConfig) -> anyhow::Result<Self> {
46        let remote_store =
47            create_remote_store_client(config.remote_url, config.remote_store_options, 10)?;
48
49        Ok(Self {
50            remote_store,
51            commit_duration_ms: config.commit_duration_seconds * 1000,
52        })
53    }
54
55    async fn upload(
56        &self,
57        epoch: EpochId,
58        start: CheckpointSequenceNumber,
59        end: CheckpointSequenceNumber,
60        summary_buffer: Vec<u8>,
61        buffer: Vec<u8>,
62    ) -> anyhow::Result<()> {
63        let checkpoint_file_path = format!("epoch_{}/{}.chk", epoch, start);
64        let chk_bytes = self
65            .upload_file(
66                Path::from(checkpoint_file_path.clone()),
67                CHECKPOINT_FILE_MAGIC,
68                &buffer,
69            )
70            .await?;
71        let summary_file_path = format!("epoch_{}/{}.sum", epoch, start);
72        let sum_bytes = self
73            .upload_file(
74                Path::from(summary_file_path.clone()),
75                SUMMARY_FILE_MAGIC,
76                &summary_buffer,
77            )
78            .await?;
79        let mut manifest = Self::read_manifest(&self.remote_store).await?;
80        let checkpoint_file_metadata = create_file_metadata_from_bytes(
81            chk_bytes,
82            FileType::CheckpointContent,
83            epoch,
84            start..end,
85        )?;
86        let summary_file_metadata = create_file_metadata_from_bytes(
87            sum_bytes,
88            FileType::CheckpointSummary,
89            epoch,
90            start..end,
91        )?;
92        manifest.update(epoch, end, checkpoint_file_metadata, summary_file_metadata);
93
94        let bytes = finalize_manifest(manifest)?;
95        self.remote_store
96            .put(&Path::from("MANIFEST"), bytes.into())
97            .await?;
98        Ok(())
99    }
100
101    async fn upload_file(
102        &self,
103        location: Path,
104        magic: u32,
105        content: &[u8],
106    ) -> anyhow::Result<Bytes> {
107        let mut buffer = vec![0; 4];
108        BigEndian::write_u32(&mut buffer, magic);
109        buffer.push(StorageFormat::Blob.into());
110        buffer.push(FileCompression::Zstd.into());
111        buffer.extend_from_slice(content);
112        let mut compressed_buffer = vec![];
113        let mut cursor = Cursor::new(buffer);
114        compress(&mut cursor, &mut compressed_buffer)?;
115        self.remote_store
116            .put(&location, Bytes::from(compressed_buffer.clone()).into())
117            .await?;
118        Ok(Bytes::from(compressed_buffer))
119    }
120
121    pub async fn get_watermark(&self) -> anyhow::Result<CheckpointSequenceNumber> {
122        let manifest = Self::read_manifest(&self.remote_store).await?;
123        Ok(manifest.next_checkpoint_seq_num())
124    }
125
126    async fn read_manifest(remote_store: &dyn ObjectStore) -> anyhow::Result<Manifest> {
127        Ok(match remote_store.get(&Path::from("MANIFEST")).await {
128            Ok(resp) => read_manifest_from_bytes(resp.bytes().await?.to_vec())?,
129            Err(err) if err.to_string().contains("404") => Manifest::new(0, 0),
130            Err(err) => Err(err)?,
131        })
132    }
133}
134
135#[async_trait]
136impl Reducer<RelayWorker> for ArchivalReducer {
137    async fn commit(&self, batch: &[Arc<CheckpointData>]) -> Result<(), anyhow::Error> {
138        if batch.is_empty() {
139            return Err(anyhow::anyhow!("commit batch can't be empty"));
140        }
141        let mut summary_buffer = vec![];
142        let mut buffer = vec![];
143        let first_checkpoint = &batch[0];
144        let epoch = first_checkpoint.checkpoint_summary.epoch;
145        let start_checkpoint = first_checkpoint.checkpoint_summary.sequence_number;
146        let mut last_checkpoint = start_checkpoint;
147        for checkpoint in batch {
148            let full_checkpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
149                checkpoint.checkpoint_contents.clone(),
150                checkpoint
151                    .transactions
152                    .iter()
153                    .map(|t| ExecutionData::new(t.transaction.clone(), t.effects.clone())),
154            );
155            let contents_blob = Blob::encode(&full_checkpoint_contents, BlobEncoding::Bcs).unwrap();
156            let summary_blob =
157                Blob::encode(&checkpoint.checkpoint_summary, BlobEncoding::Bcs).unwrap();
158            contents_blob.write(&mut buffer).unwrap();
159            summary_blob.write(&mut summary_buffer).unwrap();
160            last_checkpoint += 1;
161        }
162        self.upload(
163            epoch,
164            start_checkpoint,
165            last_checkpoint,
166            summary_buffer,
167            buffer,
168        )
169        .await
170        .unwrap();
171        Ok(())
172    }
173
174    fn should_close_batch(
175        &self,
176        batch: &[Arc<CheckpointData>],
177        next_item: Option<&Arc<CheckpointData>>,
178    ) -> bool {
179        // never close a batch without a trigger condition
180        if batch.is_empty() || next_item.is_none() {
181            return false;
182        }
183        let first_checkpoint = &batch[0].checkpoint_summary;
184        let next_checkpoint = next_item.expect("invariant's checked");
185        next_checkpoint.checkpoint_summary.epoch != first_checkpoint.epoch
186            || next_checkpoint.checkpoint_summary.timestamp_ms
187                > (self.commit_duration_ms + first_checkpoint.timestamp_ms)
188    }
189}