iota_data_ingestion_core/reader/
common.rs

1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module shares common types and logic for the checkpoint reader.
5
6use std::collections::BTreeMap;
7
8use iota_types::{
9    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
10};
11
12/// Options for configuring how the checkpoint reader fetches new checkpoints.
13#[derive(Clone)]
14pub struct ReaderOptions {
15    /// How often to check for new checkpoints, lower values mean faster
16    /// detection but more CPU usage.
17    ///
18    /// Default: 100ms.
19    pub tick_interval_ms: u64,
20    /// Network request timeout, it applies to remote store operations.
21    ///
22    /// Default: 5 seconds.
23    pub timeout_secs: u64,
24    /// Number of maximum concurrent requests to the remote store. Increase it
25    /// for backfills, higher values increase throughput but use more resources.
26    ///
27    /// Default: 10.
28    pub batch_size: usize,
29    /// Maximum memory (bytes) for batch checkpoint processing to prevent OOM
30    /// errors. Zero indicates no limit.
31    ///
32    /// Default: 0.
33    pub data_limit: usize,
34}
35
36impl Default for ReaderOptions {
37    fn default() -> Self {
38        Self {
39            tick_interval_ms: 100,
40            timeout_secs: 5,
41            batch_size: 10,
42            data_limit: 0,
43        }
44    }
45}
46
47/// Tracks and limits the total in-progress data size for checkpoint processing.
48///
49/// `DataLimiter` is used to prevent excessive memory usage by keeping track of
50/// the cumulative size of checkpoints currently being processed. It maintains a
51/// queue of checkpoint sequence numbers and their associated sizes, and
52/// provides methods to check if the limit is exceeded, add new checkpoints, and
53/// perform garbage collection of processed checkpoints.
54pub struct DataLimiter {
55    /// The maximum allowed in-progress data size (in bytes). Zero means no
56    /// limit.
57    limit: usize,
58    /// A mapping from checkpoint sequence number to its data size (in bytes)
59    queue: BTreeMap<CheckpointSequenceNumber, usize>,
60    /// The current total in-progress data size (in bytes).
61    in_progress: usize,
62}
63
64impl DataLimiter {
65    /// Creates a new `DataLimiter` with the specified memory limit (in bytes).
66    pub fn new(limit: usize) -> Self {
67        Self {
68            limit,
69            queue: BTreeMap::new(),
70            in_progress: 0,
71        }
72    }
73
74    /// Returns `true` if the current in-progress data size exceeds the
75    /// configured limit.
76    pub fn exceeds(&self) -> bool {
77        self.limit > 0 && self.in_progress >= self.limit
78    }
79
80    /// Adds a checkpoint's data size to the in-progress queue.
81    pub fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
82        if self.limit == 0 {
83            return;
84        }
85        self.in_progress += size;
86        self.queue
87            .insert(checkpoint.checkpoint_summary.sequence_number, size);
88    }
89
90    /// Performs garbage collection by removing all checkpoints with a sequence
91    /// number less than the given `watermark`, and recalculates the total
92    /// in-progress size.
93    pub fn gc(&mut self, watermark: CheckpointSequenceNumber) {
94        if self.limit == 0 {
95            return;
96        }
97        self.queue = self.queue.split_off(&watermark);
98        self.in_progress = self.queue.values().sum();
99    }
100}