iota_data_ingestion/workers/
blob.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Range, sync::Arc};
6
7use anyhow::{Result, bail};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::{StreamExt, stream};
11use iota_config::object_storage_config::ObjectStoreConfig;
12use iota_data_ingestion_core::Worker;
13use iota_rest_api::Client;
14use iota_storage::blob::{Blob, BlobEncoding};
15use iota_types::{
16    committee::EpochId, full_checkpoint_content::CheckpointData,
17    messages_checkpoint::CheckpointSequenceNumber,
18};
19use object_store::{DynObjectStore, MultipartUpload, ObjectStore, path::Path};
20use serde::{Deserialize, Deserializer, Serialize};
21use tokio::sync::Mutex;
22
23use crate::common;
24
25/// Minimum allowed chunk size to be uploaded to remote store
26const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB
27/// The maximum number of concurrent requests allowed when uploading checkpoint
28/// chunk parts to remote store
29const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50;
30const MAX_CONCURRENT_DELETE_REQUESTS: usize = 10;
31
32const CHECKPOINT_FILE_SUFFIX: &str = "chk";
33const LIVE_DIR_NAME: &str = "live";
34const INGESTION_DIR_NAME: &str = "ingestion";
35
36#[derive(Serialize, Deserialize, Clone, Debug)]
37#[serde(rename_all = "kebab-case")]
38pub struct BlobTaskConfig {
39    pub object_store_config: ObjectStoreConfig,
40    #[serde(deserialize_with = "deserialize_chunk")]
41    pub checkpoint_chunk_size_mb: u64,
42    pub node_rest_api_url: String,
43}
44
45fn deserialize_chunk<'de, D>(deserializer: D) -> Result<u64, D::Error>
46where
47    D: Deserializer<'de>,
48{
49    let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024;
50    if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB {
51        return Err(serde::de::Error::custom("Chunk size must be at least 5 MB"));
52    }
53    Ok(checkpoint_chunk_size)
54}
55
56pub struct BlobWorker {
57    remote_store: Arc<DynObjectStore>,
58    rest_client: Client,
59    checkpoint_chunk_size_mb: u64,
60    current_epoch: Arc<Mutex<EpochId>>,
61}
62
63impl BlobWorker {
64    pub fn new(
65        config: BlobTaskConfig,
66        rest_client: Client,
67        current_epoch: EpochId,
68    ) -> anyhow::Result<Self> {
69        Ok(Self {
70            checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb,
71            remote_store: config.object_store_config.make()?,
72            current_epoch: Arc::new(Mutex::new(current_epoch)),
73            rest_client,
74        })
75    }
76
77    /// Resets the remote object store by deleting checkpoints within the
78    /// specified range.
79    pub async fn reset_remote_store(
80        &self,
81        range: Range<CheckpointSequenceNumber>,
82    ) -> anyhow::Result<()> {
83        tracing::info!("delete checkpoints from remote store: {range:?}");
84
85        let paths = range
86            .into_iter()
87            .map(|chk_seq_num| Ok(Self::file_path(chk_seq_num)))
88            .collect::<Vec<_>>();
89
90        let paths_stream = futures::stream::iter(paths).boxed();
91
92        _ = self
93            .remote_store
94            .delete_stream(paths_stream)
95            .for_each_concurrent(MAX_CONCURRENT_DELETE_REQUESTS, |delete_result| async {
96                _ = delete_result.inspect_err(|err| tracing::warn!("deletion failed with: {err}"));
97            })
98            .await;
99
100        Ok(())
101    }
102
103    /// Uploads a Checkpoint blob to the Remote Store.
104    ///
105    /// If the blob size exceeds the configured `CHUNK_SIZE`,
106    /// it uploads the blob in parts using multipart upload.
107    /// Otherwise, it uploads the blob directly.
108    async fn upload_blob(&self, bytes: Vec<u8>, chk_seq_num: u64, location: Path) -> Result<()> {
109        if bytes.len() > self.checkpoint_chunk_size_mb as usize {
110            return self
111                .upload_blob_multipart(bytes, chk_seq_num, location)
112                .await;
113        }
114
115        self.remote_store
116            .put(&location, Bytes::from(bytes).into())
117            .await?;
118
119        Ok(())
120    }
121
122    /// Uploads a large Checkpoint blob to the Remote Store using multipart
123    /// upload.
124    ///
125    /// This function divides the input `bytes` into chunks of size `CHUNK_SIZE`
126    /// and uploads each chunk individually.
127    /// Finally, it completes the multipart upload by assembling all the
128    /// uploaded parts.
129    async fn upload_blob_multipart(
130        &self,
131        bytes: Vec<u8>,
132        chk_seq_num: u64,
133        location: Path,
134    ) -> Result<()> {
135        let mut multipart = self.remote_store.put_multipart(&location).await?;
136        let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize);
137        let total_chunks = chunks.len();
138
139        let parts_futures = chunks
140            .into_iter()
141            .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into()))
142            .collect::<Vec<_>>();
143
144        let mut buffered_uploaded_parts = stream::iter(parts_futures)
145            .buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD)
146            .enumerate();
147
148        while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await {
149            match part_result {
150                Ok(()) => {
151                    tracing::info!(
152                        "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}",
153                        uploaded_chunk_id + 1
154                    );
155                }
156                Err(err) => {
157                    tracing::error!("error uploading part: {err}");
158                    multipart.abort().await?;
159                    bail!("checkpoint {chk_seq_num} multipart upload aborted");
160                }
161            }
162        }
163
164        let start_time = std::time::Instant::now();
165        multipart.complete().await?;
166        tracing::info!(
167            "checkpoint {chk_seq_num} multipart completion request finished in {:?}",
168            start_time.elapsed()
169        );
170
171        Ok(())
172    }
173
174    /// Constructs a file path for a checkpoint file based on the checkpoint
175    /// sequence number.
176    fn file_path(chk_seq_num: CheckpointSequenceNumber) -> Path {
177        Path::from(INGESTION_DIR_NAME)
178            .child(LIVE_DIR_NAME)
179            .child(format!("{chk_seq_num}.{CHECKPOINT_FILE_SUFFIX}"))
180    }
181}
182
183#[async_trait]
184impl Worker for BlobWorker {
185    type Message = ();
186    type Error = anyhow::Error;
187
188    async fn process_checkpoint(
189        &self,
190        checkpoint: Arc<CheckpointData>,
191    ) -> Result<Self::Message, Self::Error> {
192        let chk_seq_num = checkpoint.checkpoint_summary.sequence_number;
193        let epoch = checkpoint.checkpoint_summary.epoch;
194
195        {
196            let mut current_epoch = self.current_epoch.lock().await;
197            if epoch > *current_epoch {
198                let delete_start = common::epoch_first_checkpoint_sequence_number(
199                    &self.rest_client,
200                    *current_epoch,
201                )
202                .await?;
203                self.reset_remote_store(delete_start..chk_seq_num).await?;
204                // we update the epoch once we made sure that reset was successful.
205                *current_epoch = epoch;
206            }
207        }
208
209        let bytes = Blob::encode(&checkpoint, BlobEncoding::Bcs)?.to_bytes();
210        self.upload_blob(bytes, chk_seq_num, Self::file_path(chk_seq_num))
211            .await?;
212
213        Ok(())
214    }
215}