iota_data_ingestion_core/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, ffi::OsString, fs, path::PathBuf, sync::Arc, time::Duration};
6
7use backoff::backoff::Backoff;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use iota_rest_api::Client;
11use iota_storage::blob::Blob;
12use iota_types::{
13    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
14};
15use notify::{RecursiveMode, Watcher};
16use object_store::{ObjectStore, path::Path};
17use tap::pipe::Pipe;
18use tokio::{
19    sync::{
20        mpsc::{self, error::TryRecvError},
21        oneshot,
22    },
23    time::timeout,
24};
25use tracing::{debug, error, info};
26
27use crate::{
28    IngestionError, IngestionResult, create_remote_store_client,
29    executor::MAX_CHECKPOINTS_IN_PROGRESS,
30};
31
32type CheckpointResult = IngestionResult<(Arc<CheckpointData>, usize)>;
33
34/// Implements a checkpoint reader that monitors a local directory.
35/// Designed for setups where the indexer daemon is colocated with FN.
36/// This implementation is push-based and utilizes the inotify API.
37pub struct CheckpointReader {
38    path: PathBuf,
39    remote_store_url: Option<String>,
40    remote_store_options: Vec<(String, String)>,
41    current_checkpoint_number: CheckpointSequenceNumber,
42    last_pruned_watermark: CheckpointSequenceNumber,
43    checkpoint_sender: mpsc::Sender<Arc<CheckpointData>>,
44    processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
45    remote_fetcher_receiver: Option<mpsc::Receiver<CheckpointResult>>,
46    exit_receiver: oneshot::Receiver<()>,
47    options: ReaderOptions,
48    data_limiter: DataLimiter,
49}
50
51/// Options for configuring how the checkpoint reader fetches new checkpoints.
52#[derive(Clone)]
53pub struct ReaderOptions {
54    /// How often to check for new checkpoints, lower values mean faster
55    /// detection but more CPU usage.
56    ///
57    /// Default: 100ms.
58    pub tick_interval_ms: u64,
59    /// Network request timeout, it applies to remote store operations.
60    ///
61    /// Default: 5 seconds.
62    pub timeout_secs: u64,
63    /// Number of maximum concurrent requests to the remote store. Increase it
64    /// for backfills, higher values increase throughput but use more resources.
65    ///
66    /// Default: 10.
67    pub batch_size: usize,
68    /// Maximum memory (bytes) for batch checkpoint processing to prevent OOM
69    /// errors. Zero indicates no limit.
70    ///
71    /// Default: 0.
72    pub data_limit: usize,
73}
74
75impl Default for ReaderOptions {
76    fn default() -> Self {
77        Self {
78            tick_interval_ms: 100,
79            timeout_secs: 5,
80            batch_size: 10,
81            data_limit: 0,
82        }
83    }
84}
85
86enum RemoteStore {
87    ObjectStore(Box<dyn ObjectStore>),
88    Rest(iota_rest_api::Client),
89    Hybrid(Box<dyn ObjectStore>, iota_rest_api::Client),
90}
91
92impl CheckpointReader {
93    /// Represents a single iteration of the reader.
94    /// Reads files in a local directory, validates them, and forwards
95    /// `CheckpointData` to the executor.
96    async fn read_local_files(&self) -> IngestionResult<Vec<Arc<CheckpointData>>> {
97        let mut files = vec![];
98        for entry in fs::read_dir(self.path.clone())? {
99            let entry = entry?;
100            let filename = entry.file_name();
101            if let Some(sequence_number) = Self::checkpoint_number_from_file_path(&filename) {
102                if sequence_number >= self.current_checkpoint_number {
103                    files.push((sequence_number, entry.path()));
104                }
105            }
106        }
107        files.sort();
108        debug!("unprocessed local files {:?}", files);
109        let mut checkpoints = vec![];
110        for (_, filename) in files.iter().take(MAX_CHECKPOINTS_IN_PROGRESS) {
111            let checkpoint = Blob::from_bytes::<Arc<CheckpointData>>(&fs::read(filename)?)
112                .map_err(|err| IngestionError::DeserializeCheckpoint(err.to_string()))?;
113            if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
114                break;
115            }
116            checkpoints.push(checkpoint);
117        }
118        Ok(checkpoints)
119    }
120
121    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
122        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
123            || self.data_limiter.exceeds()
124    }
125
126    async fn fetch_from_object_store(
127        store: &dyn ObjectStore,
128        checkpoint_number: CheckpointSequenceNumber,
129    ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
130        let path = Path::from(format!("{}.chk", checkpoint_number));
131        let response = store.get(&path).await?;
132        let bytes = response.bytes().await?;
133        Ok((
134            Blob::from_bytes::<Arc<CheckpointData>>(&bytes)
135                .map_err(|err| IngestionError::DeserializeCheckpoint(err.to_string()))?,
136            bytes.len(),
137        ))
138    }
139
140    async fn fetch_from_full_node(
141        client: &Client,
142        checkpoint_number: CheckpointSequenceNumber,
143    ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
144        let checkpoint = client.get_full_checkpoint(checkpoint_number).await?;
145        let size = bcs::serialized_size(&checkpoint)?;
146        Ok((Arc::new(checkpoint), size))
147    }
148
149    async fn remote_fetch_checkpoint_internal(
150        store: &RemoteStore,
151        checkpoint_number: CheckpointSequenceNumber,
152    ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
153        match store {
154            RemoteStore::ObjectStore(store) => {
155                Self::fetch_from_object_store(store, checkpoint_number).await
156            }
157            RemoteStore::Rest(client) => {
158                Self::fetch_from_full_node(client, checkpoint_number).await
159            }
160            RemoteStore::Hybrid(store, client) => {
161                match Self::fetch_from_full_node(client, checkpoint_number).await {
162                    Ok(result) => Ok(result),
163                    Err(_) => Self::fetch_from_object_store(store, checkpoint_number).await,
164                }
165            }
166        }
167    }
168
169    async fn remote_fetch_checkpoint(
170        store: &RemoteStore,
171        checkpoint_number: CheckpointSequenceNumber,
172    ) -> IngestionResult<(Arc<CheckpointData>, usize)> {
173        let mut backoff = backoff::ExponentialBackoff::default();
174        backoff.max_elapsed_time = Some(Duration::from_secs(60));
175        backoff.initial_interval = Duration::from_millis(100);
176        backoff.current_interval = backoff.initial_interval;
177        backoff.multiplier = 1.0;
178        loop {
179            match Self::remote_fetch_checkpoint_internal(store, checkpoint_number).await {
180                Ok(data) => return Ok(data),
181                Err(err) => match backoff.next_backoff() {
182                    Some(duration) => {
183                        if !err.to_string().contains("404") {
184                            debug!(
185                                "remote reader retry in {} ms. Error is {:?}",
186                                duration.as_millis(),
187                                err
188                            );
189                        }
190                        tokio::time::sleep(duration).await
191                    }
192                    None => return Err(err),
193                },
194            }
195        }
196    }
197
198    fn start_remote_fetcher(
199        &mut self,
200    ) -> mpsc::Receiver<IngestionResult<(Arc<CheckpointData>, usize)>> {
201        let batch_size = self.options.batch_size;
202        let start_checkpoint = self.current_checkpoint_number;
203        let (sender, receiver) = mpsc::channel(batch_size);
204        let url = self
205            .remote_store_url
206            .clone()
207            .expect("remote store url must be set");
208        let store = if let Some((fn_url, remote_url)) = url.split_once('|') {
209            let object_store = create_remote_store_client(
210                remote_url.to_string(),
211                self.remote_store_options.clone(),
212                self.options.timeout_secs,
213            )
214            .expect("failed to create remote store client");
215            RemoteStore::Hybrid(object_store, iota_rest_api::Client::new(fn_url))
216        } else if url.ends_with("/api/v1") {
217            RemoteStore::Rest(iota_rest_api::Client::new(url))
218        } else {
219            let object_store = create_remote_store_client(
220                url,
221                self.remote_store_options.clone(),
222                self.options.timeout_secs,
223            )
224            .expect("failed to create remote store client");
225            RemoteStore::ObjectStore(object_store)
226        };
227
228        spawn_monitored_task!(async move {
229            let mut checkpoint_stream = (start_checkpoint..u64::MAX)
230                .map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
231                .pipe(futures::stream::iter)
232                .buffered(batch_size);
233
234            while let Some(checkpoint) = checkpoint_stream.next().await {
235                if sender.send(checkpoint).await.is_err() {
236                    info!("remote reader dropped");
237                    break;
238                }
239            }
240        });
241        receiver
242    }
243
244    fn remote_fetch(&mut self) -> Vec<Arc<CheckpointData>> {
245        let mut checkpoints = vec![];
246        if self.remote_fetcher_receiver.is_none() {
247            self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
248        }
249        while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
250            match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
251                Ok(Ok((checkpoint, size))) => {
252                    self.data_limiter.add(&checkpoint, size);
253                    checkpoints.push(checkpoint);
254                }
255                Ok(Err(err)) => {
256                    error!("remote reader transient error {:?}", err);
257                    self.remote_fetcher_receiver = None;
258                    break;
259                }
260                Err(TryRecvError::Disconnected) => {
261                    error!("remote reader channel disconnect error");
262                    self.remote_fetcher_receiver = None;
263                    break;
264                }
265                Err(TryRecvError::Empty) => break,
266            }
267        }
268        checkpoints
269    }
270
271    async fn sync(&mut self) -> IngestionResult<()> {
272        let backoff = backoff::ExponentialBackoff::default();
273        let mut checkpoints = backoff::future::retry(backoff, || async {
274            self.read_local_files().await.map_err(|err| {
275                info!("transient local read error {:?}", err);
276                backoff::Error::transient(err)
277            })
278        })
279        .await?;
280
281        let mut read_source: &str = "local";
282        if self.remote_store_url.is_some()
283            && (checkpoints.is_empty()
284                || checkpoints[0].checkpoint_summary.sequence_number
285                    > self.current_checkpoint_number)
286        {
287            checkpoints = self.remote_fetch();
288            read_source = "remote";
289        } else {
290            // cancel remote fetcher execution because local reader has made progress
291            self.remote_fetcher_receiver = None;
292        }
293
294        info!(
295            "Read from {}. Current checkpoint number: {}, pruning watermark: {}, new updates: {:?}",
296            read_source,
297            self.current_checkpoint_number,
298            self.last_pruned_watermark,
299            checkpoints.len(),
300        );
301        for checkpoint in checkpoints {
302            if read_source == "local"
303                && checkpoint.checkpoint_summary.sequence_number > self.current_checkpoint_number
304            {
305                break;
306            }
307            assert_eq!(
308                checkpoint.checkpoint_summary.sequence_number,
309                self.current_checkpoint_number
310            );
311            self.checkpoint_sender.send(checkpoint).await.map_err(|_| {
312                IngestionError::Channel(
313                    "unable to send checkpoint to executor, receiver half closed".to_owned(),
314                )
315            })?;
316            self.current_checkpoint_number += 1;
317        }
318        Ok(())
319    }
320
321    /// Cleans the local directory by removing all processed checkpoint files.
322    fn gc_processed_files(&mut self, watermark: CheckpointSequenceNumber) -> IngestionResult<()> {
323        info!("cleaning processed files, watermark is {}", watermark);
324        self.data_limiter.gc(watermark);
325        self.last_pruned_watermark = watermark;
326        for entry in fs::read_dir(self.path.clone())? {
327            let entry = entry?;
328            let filename = entry.file_name();
329            if let Some(sequence_number) = Self::checkpoint_number_from_file_path(&filename) {
330                if sequence_number < watermark {
331                    fs::remove_file(entry.path())?;
332                }
333            }
334        }
335        Ok(())
336    }
337
338    fn checkpoint_number_from_file_path(file_name: &OsString) -> Option<CheckpointSequenceNumber> {
339        file_name
340            .to_str()
341            .and_then(|s| s.rfind('.').map(|pos| &s[..pos]))
342            .and_then(|s| s.parse().ok())
343    }
344
345    pub fn initialize(
346        path: PathBuf,
347        starting_checkpoint_number: CheckpointSequenceNumber,
348        remote_store_url: Option<String>,
349        remote_store_options: Vec<(String, String)>,
350        options: ReaderOptions,
351    ) -> (
352        Self,
353        mpsc::Receiver<Arc<CheckpointData>>,
354        mpsc::Sender<CheckpointSequenceNumber>,
355        oneshot::Sender<()>,
356    ) {
357        let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
358        let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
359        let (exit_sender, exit_receiver) = oneshot::channel();
360        let reader = Self {
361            path,
362            remote_store_url,
363            remote_store_options,
364            current_checkpoint_number: starting_checkpoint_number,
365            last_pruned_watermark: starting_checkpoint_number,
366            checkpoint_sender,
367            processed_receiver,
368            remote_fetcher_receiver: None,
369            exit_receiver,
370            data_limiter: DataLimiter::new(options.data_limit),
371            options,
372        };
373        (reader, checkpoint_recv, processed_sender, exit_sender)
374    }
375
376    pub async fn run(mut self) -> IngestionResult<()> {
377        let (inotify_sender, mut inotify_recv) = mpsc::channel(1);
378        std::fs::create_dir_all(self.path.clone()).expect("failed to create a directory");
379        let mut watcher = notify::recommended_watcher(move |res| {
380            if let Err(err) = res {
381                eprintln!("watch error: {:?}", err);
382            }
383            inotify_sender
384                .blocking_send(())
385                .expect("Failed to send inotify update");
386        })
387        .expect("Failed to init inotify");
388
389        watcher
390            .watch(&self.path, RecursiveMode::NonRecursive)
391            .expect("Inotify watcher failed");
392        self.gc_processed_files(self.last_pruned_watermark)
393            .expect("Failed to clean the directory");
394
395        loop {
396            tokio::select! {
397                _ = &mut self.exit_receiver => break,
398                Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
399                    self.gc_processed_files(gc_checkpoint_number).expect("Failed to clean the directory");
400                }
401                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.options.tick_interval_ms), inotify_recv.recv())  => {
402                    self.sync().await.expect("Failed to read checkpoint files");
403                }
404            }
405        }
406        Ok(())
407    }
408}
409
410pub struct DataLimiter {
411    limit: usize,
412    queue: BTreeMap<CheckpointSequenceNumber, usize>,
413    in_progress: usize,
414}
415
416impl DataLimiter {
417    fn new(limit: usize) -> Self {
418        Self {
419            limit,
420            queue: BTreeMap::new(),
421            in_progress: 0,
422        }
423    }
424
425    fn exceeds(&self) -> bool {
426        self.limit > 0 && self.in_progress >= self.limit
427    }
428
429    fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
430        if self.limit == 0 {
431            return;
432        }
433        self.in_progress += size;
434        self.queue
435            .insert(checkpoint.checkpoint_summary.sequence_number, size);
436    }
437
438    fn gc(&mut self, watermark: CheckpointSequenceNumber) {
439        if self.limit == 0 {
440            return;
441        }
442        self.queue = self.queue.split_off(&watermark);
443        self.in_progress = self.queue.values().sum();
444    }
445}