iota_data_ingestion_core/reader/
v1.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap,
7    path::{Path, PathBuf},
8    sync::Arc,
9    time::Duration,
10};
11
12use backoff::backoff::Backoff;
13use futures::StreamExt;
14use iota_metrics::spawn_monitored_task;
15use iota_types::{
16    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
17};
18use object_store::ObjectStore;
19use tap::pipe::Pipe;
20use tokio::{
21    sync::{
22        mpsc::{self, error::TryRecvError},
23        oneshot,
24    },
25    time::timeout,
26};
27use tracing::{debug, error, info};
28
29use crate::{
30    IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
31    reader::fetch::{
32        CheckpointResult, LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store,
33    },
34};
35
36/// Implements a checkpoint reader that monitors a local directory.
37/// Designed for setups where the indexer daemon is colocated with FN.
38/// This implementation is push-based and utilizes the inotify API.
39pub struct CheckpointReader {
40    path: PathBuf,
41    remote_store_url: Option<String>,
42    remote_store_options: Vec<(String, String)>,
43    current_checkpoint_number: CheckpointSequenceNumber,
44    last_pruned_watermark: CheckpointSequenceNumber,
45    checkpoint_sender: mpsc::Sender<Arc<CheckpointData>>,
46    processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
47    remote_fetcher_receiver: Option<mpsc::Receiver<CheckpointResult>>,
48    exit_receiver: oneshot::Receiver<()>,
49    options: ReaderOptions,
50    data_limiter: DataLimiter,
51}
52
53impl LocalRead for CheckpointReader {
54    fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
55        ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
56            || self.data_limiter.exceeds()
57    }
58
59    fn path(&self) -> &Path {
60        &self.path
61    }
62
63    fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
64        self.current_checkpoint_number
65    }
66
67    fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
68        self.last_pruned_watermark = watermark;
69    }
70}
71
72/// Options for configuring how the checkpoint reader fetches new checkpoints.
73#[derive(Clone)]
74pub struct ReaderOptions {
75    /// How often to check for new checkpoints, lower values mean faster
76    /// detection but more CPU usage.
77    ///
78    /// Default: 100ms.
79    pub tick_interval_ms: u64,
80    /// Network request timeout, it applies to remote store operations.
81    ///
82    /// Default: 5 seconds.
83    pub timeout_secs: u64,
84    /// Number of maximum concurrent requests to the remote store. Increase it
85    /// for backfills, higher values increase throughput but use more resources.
86    ///
87    /// Default: 10.
88    pub batch_size: usize,
89    /// Maximum memory (bytes) for batch checkpoint processing to prevent OOM
90    /// errors. Zero indicates no limit.
91    ///
92    /// Default: 0.
93    pub data_limit: usize,
94}
95
96impl Default for ReaderOptions {
97    fn default() -> Self {
98        Self {
99            tick_interval_ms: 100,
100            timeout_secs: 5,
101            batch_size: 10,
102            data_limit: 0,
103        }
104    }
105}
106
107enum RemoteStore {
108    ObjectStore(Box<dyn ObjectStore>),
109    Fullnode(iota_rest_api::Client),
110    Hybrid(Box<dyn ObjectStore>, iota_rest_api::Client),
111}
112
113impl CheckpointReader {
114    async fn remote_fetch_checkpoint_internal(
115        store: &RemoteStore,
116        checkpoint_number: CheckpointSequenceNumber,
117    ) -> CheckpointResult {
118        match store {
119            RemoteStore::ObjectStore(store) => {
120                fetch_from_object_store(store, checkpoint_number).await
121            }
122            RemoteStore::Fullnode(client) => fetch_from_full_node(client, checkpoint_number).await,
123            RemoteStore::Hybrid(store, client) => {
124                match fetch_from_full_node(client, checkpoint_number).await {
125                    Ok(result) => Ok(result),
126                    Err(_) => fetch_from_object_store(store, checkpoint_number).await,
127                }
128            }
129        }
130    }
131
132    async fn remote_fetch_checkpoint(
133        store: &RemoteStore,
134        checkpoint_number: CheckpointSequenceNumber,
135    ) -> CheckpointResult {
136        let mut backoff = backoff::ExponentialBackoff::default();
137        backoff.max_elapsed_time = Some(Duration::from_secs(60));
138        backoff.initial_interval = Duration::from_millis(100);
139        backoff.current_interval = backoff.initial_interval;
140        backoff.multiplier = 1.0;
141        loop {
142            match Self::remote_fetch_checkpoint_internal(store, checkpoint_number).await {
143                Ok(data) => return Ok(data),
144                Err(err) => match backoff.next_backoff() {
145                    Some(duration) => {
146                        if !err.to_string().contains("404") {
147                            debug!(
148                                "remote reader retry in {} ms. Error is {:?}",
149                                duration.as_millis(),
150                                err
151                            );
152                        }
153                        tokio::time::sleep(duration).await
154                    }
155                    None => return Err(err),
156                },
157            }
158        }
159    }
160
161    fn start_remote_fetcher(&mut self) -> mpsc::Receiver<CheckpointResult> {
162        let batch_size = self.options.batch_size;
163        let start_checkpoint = self.current_checkpoint_number;
164        let (sender, receiver) = mpsc::channel(batch_size);
165        let url = self
166            .remote_store_url
167            .clone()
168            .expect("remote store url must be set");
169        let store = if let Some((fn_url, remote_url)) = url.split_once('|') {
170            let object_store = create_remote_store_client(
171                remote_url.to_string(),
172                self.remote_store_options.clone(),
173                self.options.timeout_secs,
174            )
175            .expect("failed to create remote store client");
176            RemoteStore::Hybrid(object_store, iota_rest_api::Client::new(fn_url))
177        } else if url.ends_with("/api/v1") {
178            RemoteStore::Fullnode(iota_rest_api::Client::new(url))
179        } else {
180            let object_store = create_remote_store_client(
181                url,
182                self.remote_store_options.clone(),
183                self.options.timeout_secs,
184            )
185            .expect("failed to create remote store client");
186            RemoteStore::ObjectStore(object_store)
187        };
188
189        spawn_monitored_task!(async move {
190            let mut checkpoint_stream = (start_checkpoint..u64::MAX)
191                .map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
192                .pipe(futures::stream::iter)
193                .buffered(batch_size);
194
195            while let Some(checkpoint) = checkpoint_stream.next().await {
196                if sender.send(checkpoint).await.is_err() {
197                    info!("remote reader dropped");
198                    break;
199                }
200            }
201        });
202        receiver
203    }
204
205    fn remote_fetch(&mut self) -> Vec<Arc<CheckpointData>> {
206        let mut checkpoints = vec![];
207        if self.remote_fetcher_receiver.is_none() {
208            self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
209        }
210        while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
211            match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
212                Ok(Ok((checkpoint, size))) => {
213                    self.data_limiter.add(&checkpoint, size);
214                    checkpoints.push(checkpoint);
215                }
216                Ok(Err(err)) => {
217                    error!("remote reader transient error {:?}", err);
218                    self.remote_fetcher_receiver = None;
219                    break;
220                }
221                Err(TryRecvError::Disconnected) => {
222                    error!("remote reader channel disconnect error");
223                    self.remote_fetcher_receiver = None;
224                    break;
225                }
226                Err(TryRecvError::Empty) => break,
227            }
228        }
229        checkpoints
230    }
231
232    async fn sync(&mut self) -> IngestionResult<()> {
233        let mut checkpoints = self.read_local_files_with_retry().await?;
234
235        let mut read_source = ReadSource::Local;
236        if self.remote_store_url.is_some()
237            && (checkpoints.is_empty()
238                || checkpoints[0].checkpoint_summary.sequence_number
239                    > self.current_checkpoint_number)
240        {
241            checkpoints = self.remote_fetch();
242            read_source = ReadSource::Remote;
243        } else {
244            // cancel remote fetcher execution because local reader has made progress
245            self.remote_fetcher_receiver = None;
246        }
247
248        info!(
249            "Read from {read_source}. Current checkpoint number: {}, pruning watermark: {}, new updates: {:?}",
250            self.current_checkpoint_number,
251            self.last_pruned_watermark,
252            checkpoints.len(),
253        );
254        for checkpoint in checkpoints {
255            if matches!(read_source, ReadSource::Local)
256                && self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number)
257            {
258                break;
259            }
260            assert_eq!(
261                checkpoint.checkpoint_summary.sequence_number,
262                self.current_checkpoint_number
263            );
264            self.checkpoint_sender.send(checkpoint).await.map_err(|_| {
265                IngestionError::Channel(
266                    "unable to send checkpoint to executor, receiver half closed".to_owned(),
267                )
268            })?;
269            self.current_checkpoint_number += 1;
270        }
271        Ok(())
272    }
273
274    pub fn initialize(
275        path: PathBuf,
276        starting_checkpoint_number: CheckpointSequenceNumber,
277        remote_store_url: Option<String>,
278        remote_store_options: Vec<(String, String)>,
279        options: ReaderOptions,
280    ) -> (
281        Self,
282        mpsc::Receiver<Arc<CheckpointData>>,
283        mpsc::Sender<CheckpointSequenceNumber>,
284        oneshot::Sender<()>,
285    ) {
286        let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
287        let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
288        let (exit_sender, exit_receiver) = oneshot::channel();
289        let reader = Self {
290            path,
291            remote_store_url,
292            remote_store_options,
293            current_checkpoint_number: starting_checkpoint_number,
294            last_pruned_watermark: starting_checkpoint_number,
295            checkpoint_sender,
296            processed_receiver,
297            remote_fetcher_receiver: None,
298            exit_receiver,
299            data_limiter: DataLimiter::new(options.data_limit),
300            options,
301        };
302        (reader, checkpoint_recv, processed_sender, exit_sender)
303    }
304
305    pub async fn run(mut self) -> IngestionResult<()> {
306        let (_watcher, mut inotify_recv) = self.setup_directory_watcher();
307        self.data_limiter.gc(self.last_pruned_watermark);
308        self.gc_processed_files(self.last_pruned_watermark)
309            .expect("Failed to clean the directory");
310        loop {
311            tokio::select! {
312                _ = &mut self.exit_receiver => break,
313                Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
314                    self.data_limiter.gc(gc_checkpoint_number);
315                    self.gc_processed_files(gc_checkpoint_number).expect("Failed to clean the directory");
316                }
317                Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.options.tick_interval_ms), inotify_recv.recv())  => {
318                    self.sync().await.expect("Failed to read checkpoint files");
319                }
320            }
321        }
322        Ok(())
323    }
324}
325
326/// Tracks and limits the total in-progress data size for checkpoint processing.
327///
328/// `DataLimiter` is used to prevent excessive memory usage by keeping track of
329/// the cumulative size of checkpoints currently being processed. It maintains a
330/// queue of checkpoint sequence numbers and their associated sizes, and
331/// provides methods to check if the limit is exceeded, add new checkpoints, and
332/// perform garbage collection of processed checkpoints.
333pub struct DataLimiter {
334    /// The maximum allowed in-progress data size (in bytes). Zero means no
335    /// limit.
336    limit: usize,
337    /// A mapping from checkpoint sequence number to its data size (in bytes)
338    queue: BTreeMap<CheckpointSequenceNumber, usize>,
339    /// The current total in-progress data size (in bytes).
340    in_progress: usize,
341}
342
343impl DataLimiter {
344    /// Creates a new `DataLimiter` with the specified memory limit (in bytes).
345    pub fn new(limit: usize) -> Self {
346        Self {
347            limit,
348            queue: BTreeMap::new(),
349            in_progress: 0,
350        }
351    }
352
353    /// Returns `true` if the current in-progress data size exceeds the
354    /// configured limit.
355    pub fn exceeds(&self) -> bool {
356        self.limit > 0 && self.in_progress >= self.limit
357    }
358
359    /// Adds a checkpoint's data size to the in-progress queue.
360    pub fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
361        if self.limit == 0 {
362            return;
363        }
364        self.in_progress += size;
365        self.queue
366            .insert(checkpoint.checkpoint_summary.sequence_number, size);
367    }
368
369    /// Performs garbage collection by removing all checkpoints with a sequence
370    /// number less than the given `watermark`, and recalculates the total
371    /// in-progress size.
372    pub fn gc(&mut self, watermark: CheckpointSequenceNumber) {
373        if self.limit == 0 {
374            return;
375        }
376        self.queue = self.queue.split_off(&watermark);
377        self.in_progress = self.queue.values().sum();
378    }
379}