iota_data_ingestion_core/
reader.rs1use std::{collections::BTreeMap, ffi::OsString, fs, path::PathBuf, sync::Arc, time::Duration};
6
7use backoff::backoff::Backoff;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use iota_rest_api::Client;
11use iota_storage::blob::Blob;
12use iota_types::{
13 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
14};
15use notify::{RecursiveMode, Watcher};
16use object_store::{ObjectStore, path::Path};
17use tap::pipe::Pipe;
18use tokio::{
19 sync::{
20 mpsc::{self, error::TryRecvError},
21 oneshot,
22 },
23 time::timeout,
24};
25use tracing::{debug, error, info};
26
27use crate::{
28 IngestionError, IngestionResult, create_remote_store_client,
29 executor::MAX_CHECKPOINTS_IN_PROGRESS,
30};
31
32type CheckpointResult = IngestionResult<(Arc<CheckpointData>, usize)>;
33
34pub struct CheckpointReader {
38 path: PathBuf,
39 remote_store_url: Option<String>,
40 remote_store_options: Vec<(String, String)>,
41 current_checkpoint_number: CheckpointSequenceNumber,
42 last_pruned_watermark: CheckpointSequenceNumber,
43 checkpoint_sender: mpsc::Sender<Arc<CheckpointData>>,
44 processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
45 remote_fetcher_receiver: Option<mpsc::Receiver<CheckpointResult>>,
46 exit_receiver: oneshot::Receiver<()>,
47 options: ReaderOptions,
48 data_limiter: DataLimiter,
49}
50
51#[derive(Clone)]
53pub struct ReaderOptions {
54 pub tick_interval_ms: u64,
59 pub timeout_secs: u64,
63 pub batch_size: usize,
68 pub data_limit: usize,
73}
74
75impl Default for ReaderOptions {
76 fn default() -> Self {
77 Self {
78 tick_interval_ms: 100,
79 timeout_secs: 5,
80 batch_size: 10,
81 data_limit: 0,
82 }
83 }
84}
85
86enum RemoteStore {
87 ObjectStore(Box<dyn ObjectStore>),
88 Rest(iota_rest_api::Client),
89 Hybrid(Box<dyn ObjectStore>, iota_rest_api::Client),
90}
91
92impl CheckpointReader {
93 async fn read_local_files(&self) -> IngestionResult<Vec<Arc<CheckpointData>>> {
97 let mut files = vec![];
98 for entry in fs::read_dir(self.path.clone())? {
99 let entry = entry?;
100 let filename = entry.file_name();
101 if let Some(sequence_number) = Self::checkpoint_number_from_file_path(&filename) {
102 if sequence_number >= self.current_checkpoint_number {
103 files.push((sequence_number, entry.path()));
104 }
105 }
106 }
107 files.sort();
108 debug!("unprocessed local files {:?}", files);
109 let mut checkpoints = vec![];
110 for (_, filename) in files.iter().take(MAX_CHECKPOINTS_IN_PROGRESS) {
111 let checkpoint = Blob::from_bytes::<Arc<CheckpointData>>(&fs::read(filename)?)
112 .map_err(|err| IngestionError::DeserializeCheckpoint(err.to_string()))?;
113 if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
114 break;
115 }
116 checkpoints.push(checkpoint);
117 }
118 Ok(checkpoints)
119 }
120
121 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
122 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
123 || self.data_limiter.exceeds()
124 }
125
126 async fn fetch_from_object_store(
127 store: &dyn ObjectStore,
128 checkpoint_number: CheckpointSequenceNumber,
129 ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
130 let path = Path::from(format!("{}.chk", checkpoint_number));
131 let response = store.get(&path).await?;
132 let bytes = response.bytes().await?;
133 Ok((
134 Blob::from_bytes::<Arc<CheckpointData>>(&bytes)
135 .map_err(|err| IngestionError::DeserializeCheckpoint(err.to_string()))?,
136 bytes.len(),
137 ))
138 }
139
140 async fn fetch_from_full_node(
141 client: &Client,
142 checkpoint_number: CheckpointSequenceNumber,
143 ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
144 let checkpoint = client.get_full_checkpoint(checkpoint_number).await?;
145 let size = bcs::serialized_size(&checkpoint)?;
146 Ok((Arc::new(checkpoint), size))
147 }
148
149 async fn remote_fetch_checkpoint_internal(
150 store: &RemoteStore,
151 checkpoint_number: CheckpointSequenceNumber,
152 ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
153 match store {
154 RemoteStore::ObjectStore(store) => {
155 Self::fetch_from_object_store(store, checkpoint_number).await
156 }
157 RemoteStore::Rest(client) => {
158 Self::fetch_from_full_node(client, checkpoint_number).await
159 }
160 RemoteStore::Hybrid(store, client) => {
161 match Self::fetch_from_full_node(client, checkpoint_number).await {
162 Ok(result) => Ok(result),
163 Err(_) => Self::fetch_from_object_store(store, checkpoint_number).await,
164 }
165 }
166 }
167 }
168
169 async fn remote_fetch_checkpoint(
170 store: &RemoteStore,
171 checkpoint_number: CheckpointSequenceNumber,
172 ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
173 let mut backoff = backoff::ExponentialBackoff::default();
174 backoff.max_elapsed_time = Some(Duration::from_secs(60));
175 backoff.initial_interval = Duration::from_millis(100);
176 backoff.current_interval = backoff.initial_interval;
177 backoff.multiplier = 1.0;
178 loop {
179 match Self::remote_fetch_checkpoint_internal(store, checkpoint_number).await {
180 Ok(data) => return Ok(data),
181 Err(err) => match backoff.next_backoff() {
182 Some(duration) => {
183 if !err.to_string().contains("404") {
184 debug!(
185 "remote reader retry in {} ms. Error is {:?}",
186 duration.as_millis(),
187 err
188 );
189 }
190 tokio::time::sleep(duration).await
191 }
192 None => return Err(err),
193 },
194 }
195 }
196 }
197
198 fn start_remote_fetcher(
199 &mut self,
200 ) -> mpsc::Receiver<IngestionResult<(Arc<CheckpointData>, usize)>> {
201 let batch_size = self.options.batch_size;
202 let start_checkpoint = self.current_checkpoint_number;
203 let (sender, receiver) = mpsc::channel(batch_size);
204 let url = self
205 .remote_store_url
206 .clone()
207 .expect("remote store url must be set");
208 let store = if let Some((fn_url, remote_url)) = url.split_once('|') {
209 let object_store = create_remote_store_client(
210 remote_url.to_string(),
211 self.remote_store_options.clone(),
212 self.options.timeout_secs,
213 )
214 .expect("failed to create remote store client");
215 RemoteStore::Hybrid(object_store, iota_rest_api::Client::new(fn_url))
216 } else if url.ends_with("/api/v1") {
217 RemoteStore::Rest(iota_rest_api::Client::new(url))
218 } else {
219 let object_store = create_remote_store_client(
220 url,
221 self.remote_store_options.clone(),
222 self.options.timeout_secs,
223 )
224 .expect("failed to create remote store client");
225 RemoteStore::ObjectStore(object_store)
226 };
227
228 spawn_monitored_task!(async move {
229 let mut checkpoint_stream = (start_checkpoint..u64::MAX)
230 .map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
231 .pipe(futures::stream::iter)
232 .buffered(batch_size);
233
234 while let Some(checkpoint) = checkpoint_stream.next().await {
235 if sender.send(checkpoint).await.is_err() {
236 info!("remote reader dropped");
237 break;
238 }
239 }
240 });
241 receiver
242 }
243
244 fn remote_fetch(&mut self) -> Vec<Arc<CheckpointData>> {
245 let mut checkpoints = vec![];
246 if self.remote_fetcher_receiver.is_none() {
247 self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
248 }
249 while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
250 match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
251 Ok(Ok((checkpoint, size))) => {
252 self.data_limiter.add(&checkpoint, size);
253 checkpoints.push(checkpoint);
254 }
255 Ok(Err(err)) => {
256 error!("remote reader transient error {:?}", err);
257 self.remote_fetcher_receiver = None;
258 break;
259 }
260 Err(TryRecvError::Disconnected) => {
261 error!("remote reader channel disconnect error");
262 self.remote_fetcher_receiver = None;
263 break;
264 }
265 Err(TryRecvError::Empty) => break,
266 }
267 }
268 checkpoints
269 }
270
271 async fn sync(&mut self) -> IngestionResult<()> {
272 let backoff = backoff::ExponentialBackoff::default();
273 let mut checkpoints = backoff::future::retry(backoff, || async {
274 self.read_local_files().await.map_err(|err| {
275 info!("transient local read error {:?}", err);
276 backoff::Error::transient(err)
277 })
278 })
279 .await?;
280
281 let mut read_source: &str = "local";
282 if self.remote_store_url.is_some()
283 && (checkpoints.is_empty()
284 || checkpoints[0].checkpoint_summary.sequence_number
285 > self.current_checkpoint_number)
286 {
287 checkpoints = self.remote_fetch();
288 read_source = "remote";
289 } else {
290 self.remote_fetcher_receiver = None;
292 }
293
294 info!(
295 "Read from {}. Current checkpoint number: {}, pruning watermark: {}, new updates: {:?}",
296 read_source,
297 self.current_checkpoint_number,
298 self.last_pruned_watermark,
299 checkpoints.len(),
300 );
301 for checkpoint in checkpoints {
302 if read_source == "local"
303 && checkpoint.checkpoint_summary.sequence_number > self.current_checkpoint_number
304 {
305 break;
306 }
307 assert_eq!(
308 checkpoint.checkpoint_summary.sequence_number,
309 self.current_checkpoint_number
310 );
311 self.checkpoint_sender.send(checkpoint).await.map_err(|_| {
312 IngestionError::Channel(
313 "unable to send checkpoint to executor, receiver half closed".to_owned(),
314 )
315 })?;
316 self.current_checkpoint_number += 1;
317 }
318 Ok(())
319 }
320
321 fn gc_processed_files(&mut self, watermark: CheckpointSequenceNumber) -> IngestionResult<()> {
323 info!("cleaning processed files, watermark is {}", watermark);
324 self.data_limiter.gc(watermark);
325 self.last_pruned_watermark = watermark;
326 for entry in fs::read_dir(self.path.clone())? {
327 let entry = entry?;
328 let filename = entry.file_name();
329 if let Some(sequence_number) = Self::checkpoint_number_from_file_path(&filename) {
330 if sequence_number < watermark {
331 fs::remove_file(entry.path())?;
332 }
333 }
334 }
335 Ok(())
336 }
337
338 fn checkpoint_number_from_file_path(file_name: &OsString) -> Option<CheckpointSequenceNumber> {
339 file_name
340 .to_str()
341 .and_then(|s| s.rfind('.').map(|pos| &s[..pos]))
342 .and_then(|s| s.parse().ok())
343 }
344
345 pub fn initialize(
346 path: PathBuf,
347 starting_checkpoint_number: CheckpointSequenceNumber,
348 remote_store_url: Option<String>,
349 remote_store_options: Vec<(String, String)>,
350 options: ReaderOptions,
351 ) -> (
352 Self,
353 mpsc::Receiver<Arc<CheckpointData>>,
354 mpsc::Sender<CheckpointSequenceNumber>,
355 oneshot::Sender<()>,
356 ) {
357 let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
358 let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
359 let (exit_sender, exit_receiver) = oneshot::channel();
360 let reader = Self {
361 path,
362 remote_store_url,
363 remote_store_options,
364 current_checkpoint_number: starting_checkpoint_number,
365 last_pruned_watermark: starting_checkpoint_number,
366 checkpoint_sender,
367 processed_receiver,
368 remote_fetcher_receiver: None,
369 exit_receiver,
370 data_limiter: DataLimiter::new(options.data_limit),
371 options,
372 };
373 (reader, checkpoint_recv, processed_sender, exit_sender)
374 }
375
376 pub async fn run(mut self) -> IngestionResult<()> {
377 let (inotify_sender, mut inotify_recv) = mpsc::channel(1);
378 std::fs::create_dir_all(self.path.clone()).expect("failed to create a directory");
379 let mut watcher = notify::recommended_watcher(move |res| {
380 if let Err(err) = res {
381 eprintln!("watch error: {:?}", err);
382 }
383 inotify_sender
384 .blocking_send(())
385 .expect("Failed to send inotify update");
386 })
387 .expect("Failed to init inotify");
388
389 watcher
390 .watch(&self.path, RecursiveMode::NonRecursive)
391 .expect("Inotify watcher failed");
392 self.gc_processed_files(self.last_pruned_watermark)
393 .expect("Failed to clean the directory");
394
395 loop {
396 tokio::select! {
397 _ = &mut self.exit_receiver => break,
398 Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
399 self.gc_processed_files(gc_checkpoint_number).expect("Failed to clean the directory");
400 }
401 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.options.tick_interval_ms), inotify_recv.recv()) => {
402 self.sync().await.expect("Failed to read checkpoint files");
403 }
404 }
405 }
406 Ok(())
407 }
408}
409
410pub struct DataLimiter {
411 limit: usize,
412 queue: BTreeMap<CheckpointSequenceNumber, usize>,
413 in_progress: usize,
414}
415
416impl DataLimiter {
417 fn new(limit: usize) -> Self {
418 Self {
419 limit,
420 queue: BTreeMap::new(),
421 in_progress: 0,
422 }
423 }
424
425 fn exceeds(&self) -> bool {
426 self.limit > 0 && self.in_progress >= self.limit
427 }
428
429 fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
430 if self.limit == 0 {
431 return;
432 }
433 self.in_progress += size;
434 self.queue
435 .insert(checkpoint.checkpoint_summary.sequence_number, size);
436 }
437
438 fn gc(&mut self, watermark: CheckpointSequenceNumber) {
439 if self.limit == 0 {
440 return;
441 }
442 self.queue = self.queue.split_off(&watermark);
443 self.in_progress = self.queue.values().sum();
444 }
445}