iota_data_ingestion_core/reader/common.rs
1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module shares common types and logic for the checkpoint reader.
5
6use std::collections::BTreeMap;
7
8use iota_types::{
9 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
10};
11
12/// Options for configuring how the checkpoint reader fetches new checkpoints.
13#[derive(Clone)]
14pub struct ReaderOptions {
15 /// How often to check for new checkpoints, lower values mean faster
16 /// detection but more CPU usage.
17 ///
18 /// Default: 100ms.
19 pub tick_interval_ms: u64,
20 /// Network request timeout, it applies to remote store operations.
21 ///
22 /// Default: 5 seconds.
23 pub timeout_secs: u64,
24 /// Number of maximum concurrent requests to the remote store. Increase it
25 /// for backfills, higher values increase throughput but use more resources.
26 ///
27 /// Default: 10.
28 pub batch_size: usize,
29 /// Maximum memory (bytes) for batch checkpoint processing to prevent OOM
30 /// errors. Zero indicates no limit.
31 ///
32 /// Default: 0.
33 pub data_limit: usize,
34}
35
36impl Default for ReaderOptions {
37 fn default() -> Self {
38 Self {
39 tick_interval_ms: 100,
40 timeout_secs: 5,
41 batch_size: 10,
42 data_limit: 0,
43 }
44 }
45}
46
47/// Tracks and limits the total in-progress data size for checkpoint processing.
48///
49/// `DataLimiter` is used to prevent excessive memory usage by keeping track of
50/// the cumulative size of checkpoints currently being processed. It maintains a
51/// queue of checkpoint sequence numbers and their associated sizes, and
52/// provides methods to check if the limit is exceeded, add new checkpoints, and
53/// perform garbage collection of processed checkpoints.
54pub struct DataLimiter {
55 /// The maximum allowed in-progress data size (in bytes). Zero means no
56 /// limit.
57 limit: usize,
58 /// A mapping from checkpoint sequence number to its data size (in bytes)
59 queue: BTreeMap<CheckpointSequenceNumber, usize>,
60 /// The current total in-progress data size (in bytes).
61 in_progress: usize,
62}
63
64impl DataLimiter {
65 /// Creates a new `DataLimiter` with the specified memory limit (in bytes).
66 pub fn new(limit: usize) -> Self {
67 Self {
68 limit,
69 queue: BTreeMap::new(),
70 in_progress: 0,
71 }
72 }
73
74 /// Returns `true` if the current in-progress data size exceeds the
75 /// configured limit.
76 pub fn exceeds(&self) -> bool {
77 self.limit > 0 && self.in_progress >= self.limit
78 }
79
80 /// Adds a checkpoint's data size to the in-progress queue.
81 pub fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
82 if self.limit == 0 {
83 return;
84 }
85 self.in_progress += size;
86 self.queue
87 .insert(checkpoint.checkpoint_summary.sequence_number, size);
88 }
89
90 /// Performs garbage collection by removing all checkpoints with a sequence
91 /// number less than the given `watermark`, and recalculates the total
92 /// in-progress size.
93 pub fn gc(&mut self, watermark: CheckpointSequenceNumber) {
94 if self.limit == 0 {
95 return;
96 }
97 self.queue = self.queue.split_off(&watermark);
98 self.in_progress = self.queue.values().sum();
99 }
100}