iota_data_ingestion/workers/
blob.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Range, sync::Arc};
6
7use anyhow::{Result, bail};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::{StreamExt, stream};
11use iota_config::object_storage_config::ObjectStoreConfig;
12use iota_data_ingestion_core::Worker;
13use iota_grpc_client::Client;
14use iota_storage::blob::{Blob, BlobEncoding};
15use iota_types::{
16    committee::EpochId, full_checkpoint_content::CheckpointData,
17    messages_checkpoint::CheckpointSequenceNumber,
18};
19use object_store::{DynObjectStore, MultipartUpload, ObjectStore, ObjectStoreExt, path::Path};
20use serde::{Deserialize, Deserializer, Serialize};
21use tokio::sync::Mutex;
22
23use crate::common;
24
25/// Minimum allowed chunk size to be uploaded to remote store
26const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; // 5 MB
27/// The maximum number of concurrent requests allowed when uploading checkpoint
28/// chunk parts to remote store
29const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50;
30const MAX_CONCURRENT_DELETE_REQUESTS: usize = 10;
31
32const CHECKPOINT_FILE_SUFFIX: &str = "chk";
33const LIVE_DIR_NAME: &str = "live";
34const INGESTION_DIR_NAME: &str = "ingestion";
35
36#[derive(Serialize, Deserialize, Clone, Debug)]
37#[serde(rename_all = "kebab-case")]
38pub struct BlobTaskConfig {
39    pub object_store_config: ObjectStoreConfig,
40    #[serde(deserialize_with = "deserialize_chunk")]
41    pub checkpoint_chunk_size_mb: u64,
42}
43
44fn deserialize_chunk<'de, D>(deserializer: D) -> Result<u64, D::Error>
45where
46    D: Deserializer<'de>,
47{
48    let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024;
49    if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB {
50        return Err(serde::de::Error::custom("chunk size must be at least 5 MB"));
51    }
52    Ok(checkpoint_chunk_size)
53}
54
55pub struct BlobWorker {
56    remote_store: Arc<DynObjectStore>,
57    grpc_client: Client,
58    checkpoint_chunk_size_mb: u64,
59    current_epoch: Arc<Mutex<EpochId>>,
60}
61
62impl BlobWorker {
63    pub fn new(
64        config: BlobTaskConfig,
65        grpc_client: Client,
66        current_epoch: EpochId,
67    ) -> anyhow::Result<Self> {
68        Ok(Self {
69            checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb,
70            remote_store: config.object_store_config.make()?,
71            current_epoch: Arc::new(Mutex::new(current_epoch)),
72            grpc_client,
73        })
74    }
75
76    /// Resets the remote object store by deleting checkpoints within the
77    /// specified range.
78    pub async fn reset_remote_store(
79        &self,
80        range: Range<CheckpointSequenceNumber>,
81    ) -> anyhow::Result<()> {
82        tracing::info!("delete checkpoints from remote store: {range:?}");
83
84        let paths = range
85            .into_iter()
86            .map(|chk_seq_num| Ok(Self::file_path(chk_seq_num)))
87            .collect::<Vec<_>>();
88
89        let paths_stream = futures::stream::iter(paths).boxed();
90
91        _ = self
92            .remote_store
93            .delete_stream(paths_stream)
94            .for_each_concurrent(MAX_CONCURRENT_DELETE_REQUESTS, |delete_result| async {
95                _ = delete_result.inspect_err(|err| tracing::warn!("deletion failed with: {err}"));
96            })
97            .await;
98
99        Ok(())
100    }
101
102    /// Uploads a Checkpoint blob to the Remote Store.
103    ///
104    /// If the blob size exceeds the configured `CHUNK_SIZE`,
105    /// it uploads the blob in parts using multipart upload.
106    /// Otherwise, it uploads the blob directly.
107    async fn upload_blob(&self, bytes: Vec<u8>, chk_seq_num: u64, location: Path) -> Result<()> {
108        if bytes.len() > self.checkpoint_chunk_size_mb as usize {
109            return self
110                .upload_blob_multipart(bytes, chk_seq_num, location)
111                .await;
112        }
113
114        self.remote_store
115            .put(&location, Bytes::from(bytes).into())
116            .await?;
117
118        Ok(())
119    }
120
121    /// Uploads a large Checkpoint blob to the Remote Store using multipart
122    /// upload.
123    ///
124    /// This function divides the input `bytes` into chunks of size `CHUNK_SIZE`
125    /// and uploads each chunk individually.
126    /// Finally, it completes the multipart upload by assembling all the
127    /// uploaded parts.
128    async fn upload_blob_multipart(
129        &self,
130        bytes: Vec<u8>,
131        chk_seq_num: u64,
132        location: Path,
133    ) -> Result<()> {
134        let mut multipart = self.remote_store.put_multipart(&location).await?;
135        let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize);
136        let total_chunks = chunks.len();
137
138        let parts_futures = chunks
139            .into_iter()
140            .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into()))
141            .collect::<Vec<_>>();
142
143        let mut buffered_uploaded_parts = stream::iter(parts_futures)
144            .buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD)
145            .enumerate();
146
147        while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await {
148            match part_result {
149                Ok(()) => {
150                    tracing::info!(
151                        "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}",
152                        uploaded_chunk_id + 1
153                    );
154                }
155                Err(err) => {
156                    tracing::error!("error uploading part: {err}");
157                    multipart.abort().await?;
158                    bail!("checkpoint {chk_seq_num} multipart upload aborted");
159                }
160            }
161        }
162
163        let start_time = std::time::Instant::now();
164        multipart.complete().await?;
165        tracing::info!(
166            "checkpoint {chk_seq_num} multipart completion request finished in {:?}",
167            start_time.elapsed()
168        );
169
170        Ok(())
171    }
172
173    /// Constructs a file path for a checkpoint file based on the checkpoint
174    /// sequence number.
175    fn file_path(chk_seq_num: CheckpointSequenceNumber) -> Path {
176        Path::from(INGESTION_DIR_NAME)
177            .child(LIVE_DIR_NAME)
178            .child(format!("{chk_seq_num}.{CHECKPOINT_FILE_SUFFIX}"))
179    }
180}
181
182#[async_trait]
183impl Worker for BlobWorker {
184    type Message = ();
185    type Error = anyhow::Error;
186
187    async fn process_checkpoint(
188        &self,
189        checkpoint: Arc<CheckpointData>,
190    ) -> Result<Self::Message, Self::Error> {
191        let chk_seq_num = checkpoint.checkpoint_summary.sequence_number;
192        let epoch = checkpoint.checkpoint_summary.epoch;
193
194        {
195            let mut current_epoch = self.current_epoch.lock().await;
196            if epoch > *current_epoch {
197                let (_, epoch_first_checkpoint_seq_num) =
198                    common::epoch_info(&self.grpc_client, Some(*current_epoch)).await?;
199                self.reset_remote_store(epoch_first_checkpoint_seq_num..chk_seq_num)
200                    .await?;
201                // we update the epoch once we made sure that reset was successful.
202                *current_epoch = epoch;
203            }
204        }
205
206        let bytes = Blob::encode(&checkpoint, BlobEncoding::Bcs)?.to_bytes();
207        self.upload_blob(bytes, chk_seq_num, Self::file_path(chk_seq_num))
208            .await?;
209
210        Ok(())
211    }
212}