iota_data_ingestion/workers/
blob.rs1use std::{ops::Range, sync::Arc};
6
7use anyhow::{Result, bail};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::{StreamExt, stream};
11use iota_config::object_storage_config::ObjectStoreConfig;
12use iota_data_ingestion_core::Worker;
13use iota_rest_api::Client;
14use iota_storage::blob::{Blob, BlobEncoding};
15use iota_types::{
16 committee::EpochId, full_checkpoint_content::CheckpointData,
17 messages_checkpoint::CheckpointSequenceNumber,
18};
19use object_store::{DynObjectStore, MultipartUpload, ObjectStore, path::Path};
20use serde::{Deserialize, Deserializer, Serialize};
21use tokio::sync::Mutex;
22
23use crate::common;
24
25const MIN_CHUNK_SIZE_MB: u64 = 5 * 1024 * 1024; const MAX_CONCURRENT_PARTS_UPLOAD: usize = 50;
30const MAX_CONCURRENT_DELETE_REQUESTS: usize = 10;
31
32const CHECKPOINT_FILE_SUFFIX: &str = "chk";
33const LIVE_DIR_NAME: &str = "live";
34const INGESTION_DIR_NAME: &str = "ingestion";
35
36#[derive(Serialize, Deserialize, Clone, Debug)]
37#[serde(rename_all = "kebab-case")]
38pub struct BlobTaskConfig {
39 pub object_store_config: ObjectStoreConfig,
40 #[serde(deserialize_with = "deserialize_chunk")]
41 pub checkpoint_chunk_size_mb: u64,
42 pub node_rest_api_url: String,
43}
44
45fn deserialize_chunk<'de, D>(deserializer: D) -> Result<u64, D::Error>
46where
47 D: Deserializer<'de>,
48{
49 let checkpoint_chunk_size = u64::deserialize(deserializer)? * 1024 * 1024;
50 if checkpoint_chunk_size < MIN_CHUNK_SIZE_MB {
51 return Err(serde::de::Error::custom("Chunk size must be at least 5 MB"));
52 }
53 Ok(checkpoint_chunk_size)
54}
55
56pub struct BlobWorker {
57 remote_store: Arc<DynObjectStore>,
58 rest_client: Client,
59 checkpoint_chunk_size_mb: u64,
60 current_epoch: Arc<Mutex<EpochId>>,
61}
62
63impl BlobWorker {
64 pub fn new(
65 config: BlobTaskConfig,
66 rest_client: Client,
67 current_epoch: EpochId,
68 ) -> anyhow::Result<Self> {
69 Ok(Self {
70 checkpoint_chunk_size_mb: config.checkpoint_chunk_size_mb,
71 remote_store: config.object_store_config.make()?,
72 current_epoch: Arc::new(Mutex::new(current_epoch)),
73 rest_client,
74 })
75 }
76
77 pub async fn reset_remote_store(
80 &self,
81 range: Range<CheckpointSequenceNumber>,
82 ) -> anyhow::Result<()> {
83 tracing::info!("delete checkpoints from remote store: {range:?}");
84
85 let paths = range
86 .into_iter()
87 .map(|chk_seq_num| Ok(Self::file_path(chk_seq_num)))
88 .collect::<Vec<_>>();
89
90 let paths_stream = futures::stream::iter(paths).boxed();
91
92 _ = self
93 .remote_store
94 .delete_stream(paths_stream)
95 .for_each_concurrent(MAX_CONCURRENT_DELETE_REQUESTS, |delete_result| async {
96 _ = delete_result.inspect_err(|err| tracing::warn!("deletion failed with: {err}"));
97 })
98 .await;
99
100 Ok(())
101 }
102
103 async fn upload_blob(&self, bytes: Vec<u8>, chk_seq_num: u64, location: Path) -> Result<()> {
109 if bytes.len() > self.checkpoint_chunk_size_mb as usize {
110 return self
111 .upload_blob_multipart(bytes, chk_seq_num, location)
112 .await;
113 }
114
115 self.remote_store
116 .put(&location, Bytes::from(bytes).into())
117 .await?;
118
119 Ok(())
120 }
121
122 async fn upload_blob_multipart(
130 &self,
131 bytes: Vec<u8>,
132 chk_seq_num: u64,
133 location: Path,
134 ) -> Result<()> {
135 let mut multipart = self.remote_store.put_multipart(&location).await?;
136 let chunks = bytes.chunks(self.checkpoint_chunk_size_mb as usize);
137 let total_chunks = chunks.len();
138
139 let parts_futures = chunks
140 .into_iter()
141 .map(|chunk| multipart.put_part(Bytes::copy_from_slice(chunk).into()))
142 .collect::<Vec<_>>();
143
144 let mut buffered_uploaded_parts = stream::iter(parts_futures)
145 .buffer_unordered(MAX_CONCURRENT_PARTS_UPLOAD)
146 .enumerate();
147
148 while let Some((uploaded_chunk_id, part_result)) = buffered_uploaded_parts.next().await {
149 match part_result {
150 Ok(()) => {
151 tracing::info!(
152 "uploaded checkpoint {chk_seq_num} chunk {}/{total_chunks}",
153 uploaded_chunk_id + 1
154 );
155 }
156 Err(err) => {
157 tracing::error!("error uploading part: {err}");
158 multipart.abort().await?;
159 bail!("checkpoint {chk_seq_num} multipart upload aborted");
160 }
161 }
162 }
163
164 let start_time = std::time::Instant::now();
165 multipart.complete().await?;
166 tracing::info!(
167 "checkpoint {chk_seq_num} multipart completion request finished in {:?}",
168 start_time.elapsed()
169 );
170
171 Ok(())
172 }
173
174 fn file_path(chk_seq_num: CheckpointSequenceNumber) -> Path {
177 Path::from(INGESTION_DIR_NAME)
178 .child(LIVE_DIR_NAME)
179 .child(format!("{chk_seq_num}.{CHECKPOINT_FILE_SUFFIX}"))
180 }
181}
182
183#[async_trait]
184impl Worker for BlobWorker {
185 type Message = ();
186 type Error = anyhow::Error;
187
188 async fn process_checkpoint(
189 &self,
190 checkpoint: Arc<CheckpointData>,
191 ) -> Result<Self::Message, Self::Error> {
192 let chk_seq_num = checkpoint.checkpoint_summary.sequence_number;
193 let epoch = checkpoint.checkpoint_summary.epoch;
194
195 {
196 let mut current_epoch = self.current_epoch.lock().await;
197 if epoch > *current_epoch {
198 let delete_start = common::epoch_first_checkpoint_sequence_number(
199 &self.rest_client,
200 *current_epoch,
201 )
202 .await?;
203 self.reset_remote_store(delete_start..chk_seq_num).await?;
204 *current_epoch = epoch;
206 }
207 }
208
209 let bytes = Blob::encode(&checkpoint, BlobEncoding::Bcs)?.to_bytes();
210 self.upload_blob(bytes, chk_seq_num, Self::file_path(chk_seq_num))
211 .await?;
212
213 Ok(())
214 }
215}