iota_data_ingestion_core/reader/
v1.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap,
7    fs,
8    path::{Path, PathBuf},
9    sync::Arc,
10    time::Duration,
11};
12
13use backoff::backoff::Backoff;
14use futures::StreamExt;
15use iota_metrics::spawn_monitored_task;
16use iota_types::{
17    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
18};
19use object_store::ObjectStore;
20use tap::pipe::Pipe;
21use tokio::{
22    sync::{
23        mpsc::{self, error::TryRecvError},
24        oneshot,
25    },
26    time::timeout,
27};
28use tracing::{debug, error, info};
29
30#[cfg(not(target_os = "macos"))]
31use crate::reader::fetch::init_watcher;
32use crate::{
33    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
34    reader::fetch::{
35        CheckpointResult, LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store,
36    },
37};
38
39/// Implements a checkpoint reader that monitors a local directory.
40/// Designed for setups where the indexer daemon is colocated with FN.
41/// This implementation is push-based and utilizes the inotify API.
42pub struct CheckpointReader {
43    path: PathBuf,
44    remote_store_url: Option<String>,
45    remote_store_options: Vec<(String, String)>,
46    current_checkpoint_number: CheckpointSequenceNumber,
47    last_pruned_watermark: CheckpointSequenceNumber,
48    checkpoint_sender: mpsc::Sender<Arc<CheckpointData>>,
49    processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
50    remote_fetcher_receiver: Option<mpsc::Receiver<CheckpointResult>>,
51    exit_receiver: oneshot::Receiver<()>,
52    options: ReaderOptions,
53    data_limiter: DataLimiter,
54}
55
56impl LocalRead for CheckpointReader {
57    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
58        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
59            || self.data_limiter.exceeds()
60    }
61
62    fn path(&self) -> &Path {
63        &self.path
64    }
65
66    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
67        self.current_checkpoint_number
68    }
69
70    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
71        self.last_pruned_watermark = watermark;
72    }
73}
74
75/// Options for configuring how the checkpoint reader fetches new checkpoints.
76#[derive(Clone)]
77pub struct ReaderOptions {
78    /// How often to check for new checkpoints, lower values mean faster
79    /// detection but more CPU usage.
80    ///
81    /// Default: 100ms.
82    pub tick_interval_ms: u64,
83    /// Network request timeout, it applies to remote store operations.
84    ///
85    /// Default: 5 seconds.
86    pub timeout_secs: u64,
87    /// Number of maximum concurrent requests to the remote store. Increase it
88    /// for backfills, higher values increase throughput but use more resources.
89    ///
90    /// Default: 10.
91    pub batch_size: usize,
92    /// Maximum memory (bytes) for batch checkpoint processing to prevent OOM
93    /// errors. Zero indicates no limit.
94    ///
95    /// Default: 0.
96    pub data_limit: usize,
97}
98
99impl Default for ReaderOptions {
100    fn default() -> Self {
101        Self {
102            tick_interval_ms: 100,
103            timeout_secs: 5,
104            batch_size: 10,
105            data_limit: 0,
106        }
107    }
108}
109
110enum RemoteStore {
111    ObjectStore(Box<dyn ObjectStore>),
112    Fullnode(iota_rest_api::Client),
113    Hybrid(Box<dyn ObjectStore>, iota_rest_api::Client),
114}
115
116impl CheckpointReader {
117    async fn remote_fetch_checkpoint_internal(
118        store: &RemoteStore,
119        checkpoint_number: CheckpointSequenceNumber,
120    ) -> CheckpointResult {
121        match store {
122            RemoteStore::ObjectStore(store) => {
123                fetch_from_object_store(store, checkpoint_number).await
124            }
125            RemoteStore::Fullnode(client) => fetch_from_full_node(client, checkpoint_number).await,
126            RemoteStore::Hybrid(store, client) => {
127                match fetch_from_full_node(client, checkpoint_number).await {
128                    Ok(result) => Ok(result),
129                    Err(_) => fetch_from_object_store(store, checkpoint_number).await,
130                }
131            }
132        }
133    }
134
135    async fn remote_fetch_checkpoint(
136        store: &RemoteStore,
137        checkpoint_number: CheckpointSequenceNumber,
138    ) -> CheckpointResult {
139        let mut backoff = backoff::ExponentialBackoff::default();
140        backoff.max_elapsed_time = Some(Duration::from_secs(60));
141        backoff.initial_interval = Duration::from_millis(100);
142        backoff.current_interval = backoff.initial_interval;
143        backoff.multiplier = 1.0;
144        loop {
145            match Self::remote_fetch_checkpoint_internal(store, checkpoint_number).await {
146                Ok(data) => return Ok(data),
147                Err(err) => match backoff.next_backoff() {
148                    Some(duration) => {
149                        if !err.to_string().contains("404") {
150                            debug!(
151                                "remote reader retry in {} ms. Error is {err:?}",
152                                duration.as_millis(),
153                            );
154                        }
155                        tokio::time::sleep(duration).await
156                    }
157                    None => return Err(err),
158                },
159            }
160        }
161    }
162
163    fn start_remote_fetcher(&mut self) -> mpsc::Receiver<CheckpointResult> {
164        let batch_size = self.options.batch_size;
165        let start_checkpoint = self.current_checkpoint_number;
166        let (sender, receiver) = mpsc::channel(batch_size);
167        let url = self
168            .remote_store_url
169            .clone()
170            .expect("remote store url must be set");
171        let store = if let Some((fn_url, remote_url)) = url.split_once('|') {
172            let object_store = create_remote_store_client(
173                remote_url.to_string(),
174                self.remote_store_options.clone(),
175                self.options.timeout_secs,
176            )
177            .expect("failed to create remote store client");
178            RemoteStore::Hybrid(object_store, iota_rest_api::Client::new(fn_url))
179        } else if url.ends_with("/api/v1") {
180            RemoteStore::Fullnode(iota_rest_api::Client::new(url))
181        } else {
182            let object_store = create_remote_store_client(
183                url,
184                self.remote_store_options.clone(),
185                self.options.timeout_secs,
186            )
187            .expect("failed to create remote store client");
188            RemoteStore::ObjectStore(object_store)
189        };
190
191        spawn_monitored_task!(async move {
192            let mut checkpoint_stream = (start_checkpoint..u64::MAX)
193                .map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
194                .pipe(futures::stream::iter)
195                .buffered(batch_size);
196
197            while let Some(checkpoint) = checkpoint_stream.next().await {
198                if sender.send(checkpoint).await.is_err() {
199                    info!("remote reader dropped");
200                    break;
201                }
202            }
203        });
204        receiver
205    }
206
207    fn remote_fetch(&mut self) -> Vec<Arc<CheckpointData>> {
208        let mut checkpoints = vec![];
209        if self.remote_fetcher_receiver.is_none() {
210            self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
211        }
212        while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
213            match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
214                Ok(Ok((checkpoint, size))) => {
215                    self.data_limiter.add(&checkpoint, size);
216                    checkpoints.push(checkpoint);
217                }
218                Ok(Err(err)) => {
219                    error!("remote reader transient error {:?}", err);
220                    self.remote_fetcher_receiver = None;
221                    break;
222                }
223                Err(TryRecvError::Disconnected) => {
224                    error!("remote reader channel disconnect error");
225                    self.remote_fetcher_receiver = None;
226                    break;
227                }
228                Err(TryRecvError::Empty) => break,
229            }
230        }
231        checkpoints
232    }
233
234    async fn sync(&mut self) -> IngestionResult<()> {
235        let mut checkpoints = self.read_local_files_with_retry().await?;
236
237        let mut read_source = ReadSource::Local;
238        if self.remote_store_url.is_some()
239            && (checkpoints.is_empty()
240                || checkpoints[0].checkpoint_summary.sequence_number
241                    > self.current_checkpoint_number)
242        {
243            checkpoints = self.remote_fetch();
244            read_source = ReadSource::Remote;
245        } else {
246            // cancel remote fetcher execution because local reader has made progress
247            self.remote_fetcher_receiver = None;
248        }
249
250        info!(
251            "Read from {read_source}. Current checkpoint number: {}, pruning watermark: {}, new updates: {:?}",
252            self.current_checkpoint_number,
253            self.last_pruned_watermark,
254            checkpoints.len(),
255        );
256        for checkpoint in checkpoints {
257            if matches!(read_source, ReadSource::Local)
258                && self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number)
259            {
260                break;
261            }
262            assert_eq!(
263                checkpoint.checkpoint_summary.sequence_number,
264                self.current_checkpoint_number
265            );
266            self.checkpoint_sender.send(checkpoint).await.map_err(|_| {
267                IngestionError::Channel(
268                    "unable to send checkpoint to executor, receiver half closed".to_owned(),
269                )
270            })?;
271            self.current_checkpoint_number += 1;
272        }
273        Ok(())
274    }
275
276    pub fn initialize(
277        path: PathBuf,
278        starting_checkpoint_number: CheckpointSequenceNumber,
279        remote_store_url: Option<String>,
280        remote_store_options: Vec<(String, String)>,
281        options: ReaderOptions,
282    ) -> (
283        Self,
284        mpsc::Receiver<Arc<CheckpointData>>,
285        mpsc::Sender<CheckpointSequenceNumber>,
286        oneshot::Sender<()>,
287    ) {
288        let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
289        let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
290        let (exit_sender, exit_receiver) = oneshot::channel();
291        let reader = Self {
292            path,
293            remote_store_url,
294            remote_store_options,
295            current_checkpoint_number: starting_checkpoint_number,
296            last_pruned_watermark: starting_checkpoint_number,
297            checkpoint_sender,
298            processed_receiver,
299            remote_fetcher_receiver: None,
300            exit_receiver,
301            data_limiter: DataLimiter::new(options.data_limit),
302            options,
303        };
304        (reader, checkpoint_recv, processed_sender, exit_sender)
305    }
306
307    pub async fn run(mut self) -> IngestionResult<()> {
308        let (_inotify_sender, mut inotify_recv) = mpsc::channel::<()>(1);
309        fs::create_dir_all(self.path()).expect("failed to create a directory");
310
311        #[cfg(not(target_os = "macos"))]
312        let _watcher = init_watcher(_inotify_sender, self.path());
313
314        self.data_limiter.gc(self.last_pruned_watermark);
315        self.gc_processed_files(self.last_pruned_watermark)
316            .expect("failed to clean the directory");
317        loop {
318            tokio::select! {
319                _ = &mut self.exit_receiver => break,
320                Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
321                    self.data_limiter.gc(gc_checkpoint_number);
322                    self.gc_processed_files(gc_checkpoint_number).expect("failed to clean the directory");
323                }
324                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.options.tick_interval_ms), inotify_recv.recv())  => {
325                    self.sync().await.expect("failed to read checkpoint files");
326                }
327            }
328        }
329        Ok(())
330    }
331}
332
333/// Tracks and limits the total in-progress data size for checkpoint processing.
334///
335/// `DataLimiter` is used to prevent excessive memory usage by keeping track of
336/// the cumulative size of checkpoints currently being processed. It maintains a
337/// queue of checkpoint sequence numbers and their associated sizes, and
338/// provides methods to check if the limit is exceeded, add new checkpoints, and
339/// perform garbage collection of processed checkpoints.
340pub struct DataLimiter {
341    /// The maximum allowed in-progress data size (in bytes). Zero means no
342    /// limit.
343    limit: usize,
344    /// A mapping from checkpoint sequence number to its data size (in bytes)
345    queue: BTreeMap<CheckpointSequenceNumber, usize>,
346    /// The current total in-progress data size (in bytes).
347    in_progress: usize,
348}
349
350impl DataLimiter {
351    /// Creates a new `DataLimiter` with the specified memory limit (in bytes).
352    pub fn new(limit: usize) -> Self {
353        Self {
354            limit,
355            queue: BTreeMap::new(),
356            in_progress: 0,
357        }
358    }
359
360    /// Returns `true` if the current in-progress data size exceeds the
361    /// configured limit.
362    pub fn exceeds(&self) -> bool {
363        self.limit > 0 && self.in_progress >= self.limit
364    }
365
366    /// Adds a checkpoint's data size to the in-progress queue.
367    pub fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
368        if self.limit == 0 {
369            return;
370        }
371        self.in_progress += size;
372        self.queue
373            .insert(checkpoint.checkpoint_summary.sequence_number, size);
374    }
375
376    /// Performs garbage collection by removing all checkpoints with a sequence
377    /// number less than the given `watermark`, and recalculates the total
378    /// in-progress size.
379    pub fn gc(&mut self, watermark: CheckpointSequenceNumber) {
380        if self.limit == 0 {
381            return;
382        }
383        self.queue = self.queue.split_off(&watermark);
384        self.in_progress = self.queue.values().sum();
385    }
386}