iota_data_ingestion/workers/
blob.rs1use std::{ops::Range, sync::Arc};
6
7use anyhow::{Result, bail};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::{StreamExt, stream};
11use iota_config::object_storage_config::ObjectStoreConfig;
12use iota_data_ingestion_core::Worker;
13use iota_grpc_client::Client;
14use iota_storage::blob::{Blob, BlobEncoding};
15use iota_types::{
16 committee::EpochId, full_checkpoint_content::CheckpointData,
17 messages_checkpoint::CheckpointSequenceNumber,
18};
19use object_store::{DynObjectStore, MultipartUpload, ObjectStore, ObjectStoreExt, path::Path};
20use serde::{Deserialize, Deserializer, Serialize};
21use tokio::sync::Mutex;
22
23use crate::common;
24
25const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50;
30const MAX_CONCURRENT_DELETE_REQUESTS: usize = 10;
31
32const CHECKPOINT_FILE_SUFFIX: &str = "chk";
33const LIVE_DIR_NAME: &str = "live";
34const INGESTION_DIR_NAME: &str = "ingestion";
35
36#[derive(Serialize, Deserialize, Clone, Debug)]
37#[serde(rename_all = "kebab-case")]
38pub struct BlobTaskConfig {
39 pub object_store_config: ObjectStoreConfig,
40 #[serde(deserialize_with = "deserialize_chunk")]
41 pub checkpoint_chunk_size_mb: u64,
42}
43
44fn deserialize_chunk<'de, D>(deserializer: D) -> Result<u64, D::Error>
45where
46 D: Deserializer<'de>,
47{
48 let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024;
49 if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB {
50 return Err(serde::de::Error::custom("chunk size must be at least 5 MB"));
51 }
52 Ok(checkpoint_chunk_size)
53}
54
55pub struct BlobWorker {
56 remote_store: Arc<DynObjectStore>,
57 grpc_client: Client,
58 checkpoint_chunk_size_mb: u64,
59 current_epoch: Arc<Mutex<EpochId>>,
60}
61
62impl BlobWorker {
63 pub fn new(
64 config: BlobTaskConfig,
65 grpc_client: Client,
66 current_epoch: EpochId,
67 ) -> anyhow::Result<Self> {
68 Ok(Self {
69 checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb,
70 remote_store: config.object_store_config.make()?,
71 current_epoch: Arc::new(Mutex::new(current_epoch)),
72 grpc_client,
73 })
74 }
75
76 pub async fn reset_remote_store(
79 &self,
80 range: Range<CheckpointSequenceNumber>,
81 ) -> anyhow::Result<()> {
82 tracing::info!("delete checkpoints from remote store: {range:?}");
83
84 let paths = range
85 .into_iter()
86 .map(|chk_seq_num| Ok(Self::file_path(chk_seq_num)))
87 .collect::<Vec<_>>();
88
89 let paths_stream = futures::stream::iter(paths).boxed();
90
91 _ = self
92 .remote_store
93 .delete_stream(paths_stream)
94 .for_each_concurrent(MAX_CONCURRENT_DELETE_REQUESTS, |delete_result| async {
95 _ = delete_result.inspect_err(|err| tracing::warn!("deletion failed with: {err}"));
96 })
97 .await;
98
99 Ok(())
100 }
101
102 async fn upload_blob(&self, bytes: Vec<u8>, chk_seq_num: u64, location: Path) -> Result<()> {
108 if bytes.len() > self.checkpoint_chunk_size_mb as usize {
109 return self
110 .upload_blob_multipart(bytes, chk_seq_num, location)
111 .await;
112 }
113
114 self.remote_store
115 .put(&location, Bytes::from(bytes).into())
116 .await?;
117
118 Ok(())
119 }
120
121 async fn upload_blob_multipart(
129 &self,
130 bytes: Vec<u8>,
131 chk_seq_num: u64,
132 location: Path,
133 ) -> Result<()> {
134 let mut multipart = self.remote_store.put_multipart(&location).await?;
135 let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize);
136 let total_chunks = chunks.len();
137
138 let parts_futures = chunks
139 .into_iter()
140 .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into()))
141 .collect::<Vec<_>>();
142
143 let mut buffered_uploaded_parts = stream::iter(parts_futures)
144 .buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD)
145 .enumerate();
146
147 while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await {
148 match part_result {
149 Ok(()) => {
150 tracing::info!(
151 "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}",
152 uploaded_chunk_id + 1
153 );
154 }
155 Err(err) => {
156 tracing::error!("error uploading part: {err}");
157 multipart.abort().await?;
158 bail!("checkpoint {chk_seq_num} multipart upload aborted");
159 }
160 }
161 }
162
163 let start_time = std::time::Instant::now();
164 multipart.complete().await?;
165 tracing::info!(
166 "checkpoint {chk_seq_num} multipart completion request finished in {:?}",
167 start_time.elapsed()
168 );
169
170 Ok(())
171 }
172
173 fn file_path(chk_seq_num: CheckpointSequenceNumber) -> Path {
176 Path::from(INGESTION_DIR_NAME)
177 .child(LIVE_DIR_NAME)
178 .child(format!("{chk_seq_num}.{CHECKPOINT_FILE_SUFFIX}"))
179 }
180}
181
182#[async_trait]
183impl Worker for BlobWorker {
184 type Message = ();
185 type Error = anyhow::Error;
186
187 async fn process_checkpoint(
188 &self,
189 checkpoint: Arc<CheckpointData>,
190 ) -> Result<Self::Message, Self::Error> {
191 let chk_seq_num = checkpoint.checkpoint_summary.sequence_number;
192 let epoch = checkpoint.checkpoint_summary.epoch;
193
194 {
195 let mut current_epoch = self.current_epoch.lock().await;
196 if epoch > *current_epoch {
197 let (_, epoch_first_checkpoint_seq_num) =
198 common::epoch_info(&self.grpc_client, Some(*current_epoch)).await?;
199 self.reset_remote_store(epoch_first_checkpoint_seq_num..chk_seq_num)
200 .await?;
201 *current_epoch = epoch;
203 }
204 }
205
206 let bytes = Blob::encode(&checkpoint, BlobEncoding::Bcs)?.to_bytes();
207 self.upload_blob(bytes, chk_seq_num, Self::file_path(chk_seq_num))
208 .await?;
209
210 Ok(())
211 }
212}