Skip to main content

iota_data_ingestion_core/history/
epoch_boundaries.rs

1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! Maintain the sequence number of the last checkpoint of each epoch.
5
6use std::{collections::BTreeMap, ops::RangeBounds, time::Duration};
7
8use bytes::Bytes;
9use iota_storage::object_store::{ObjectStoreGetExt, util::get};
10use iota_types::{committee::EpochId, messages_checkpoint::CheckpointSequenceNumber};
11use object_store::{ObjectStore, PutMode, path::Path};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15    IngestionError,
16    errors::IngestionResult as Result,
17    history::{
18        EPOCH_BOUNDARIES_FILE_MAGIC, EPOCH_BOUNDARIES_FILENAME, finalize_magic_blob,
19        read_magic_blob,
20    },
21};
22
23const GET_TIMEOUT_SECS: u64 = 5;
24
25/// Stores the epoch boundaries.
26///
27/// The representation stored is a map between the epoch and the sequence number
28/// of the respective last checkpoint.
29///
30/// # Examples
31///
32/// ```
33/// # use iota_data_ingestion_core::history::epoch_boundaries::EpochBoundaries;
34/// let boundaries: EpochBoundaries = [(0, 5), (1, 100), (2, 1000)].into_iter().collect();
35/// assert_eq!(boundaries.get(1), Some(100));
36/// // The last checkpoints of a range of epochs, in epoch order.
37/// assert_eq!(
38///     boundaries.range(..2).collect::<Vec<_>>(),
39///     vec![(0, 5), (1, 100)]
40/// );
41/// ```
42#[derive(Debug, Clone, Default, Serialize, Deserialize, Eq, PartialEq)]
43pub struct EpochBoundaries(BTreeMap<EpochId, CheckpointSequenceNumber>);
44
45impl FromIterator<(EpochId, CheckpointSequenceNumber)> for EpochBoundaries {
46    fn from_iter<T>(iter: T) -> Self
47    where
48        T: IntoIterator<Item = (EpochId, CheckpointSequenceNumber)>,
49    {
50        Self(iter.into_iter().collect())
51    }
52}
53
54impl EpochBoundaries {
55    /// Returns the boundary of the given epoch.
56    pub fn get(&self, epoch: EpochId) -> Option<CheckpointSequenceNumber> {
57        self.0.get(&epoch).copied()
58    }
59
60    /// Returns the recorded `(epoch, last checkpoint)` pairs for the epochs in
61    /// `range`, in epoch order.
62    pub fn range(
63        &self,
64        range: impl RangeBounds<EpochId>,
65    ) -> impl Iterator<Item = (EpochId, CheckpointSequenceNumber)> + '_ {
66        self.0
67            .range(range)
68            .map(|(&epoch, &boundary)| (epoch, boundary))
69    }
70
71    /// Returns whether the given epoch has a recorded boundary.
72    pub fn contains(&self, epoch: EpochId) -> bool {
73        self.0.contains_key(&epoch)
74    }
75
76    /// Inserts a new epoch boundary, keeping the recorded epochs contiguous.
77    /// Any existing boundary for the same epoch is overwritten.
78    ///
79    /// # Errors
80    ///
81    /// Fails if the previous epoch has not been already recorded.
82    pub fn insert_next(
83        &mut self,
84        epoch: EpochId,
85        boundary: CheckpointSequenceNumber,
86    ) -> Result<()> {
87        if epoch > 0 && !self.contains(epoch - 1) {
88            return Err(IngestionError::EpochBoundary(format!(
89                "epoch {epoch} just ended but its predecessor is not recorded"
90            )));
91        }
92        self.0.insert(epoch, boundary);
93        Ok(())
94    }
95
96    /// The relative path of the file with the serialized boundaries.
97    pub fn file_path() -> Path {
98        Path::from(EPOCH_BOUNDARIES_FILENAME)
99    }
100}
101
102/// Reads the epoch boundaries from the store.
103///
104/// # Errors
105///
106/// Fails if the file cannot be fetched, of if it fails to decode.
107pub async fn read_epoch_boundaries<S: ObjectStoreGetExt>(
108    remote_store: S,
109) -> Result<EpochBoundaries> {
110    let bytes = tokio::time::timeout(
111        Duration::from_secs(GET_TIMEOUT_SECS),
112        get(&remote_store, &EpochBoundaries::file_path()),
113    )
114    .await
115    .map_err(|e| IngestionError::EpochBoundary(e.to_string()))??;
116    read_epoch_boundaries_from_bytes(bytes.to_vec())
117}
118
119/// Decodes epoch boundaries from the given byte vector and verifies their
120/// integrity.
121///
122/// # Errors
123///
124/// Fails if the magic byte or the trailing SHA3-256 checksum does not match.
125pub fn read_epoch_boundaries_from_bytes(vec: Vec<u8>) -> Result<EpochBoundaries> {
126    read_magic_blob(vec, EPOCH_BOUNDARIES_FILE_MAGIC, EPOCH_BOUNDARIES_FILENAME)
127}
128
129/// Encodes the epoch boundaries with its magic byte and a trailing SHA3-256
130/// checksum.
131pub fn finalize_epoch_boundaries(boundaries: &EpochBoundaries) -> Result<Bytes> {
132    finalize_magic_blob(boundaries, EPOCH_BOUNDARIES_FILE_MAGIC)
133}
134
135/// Writes the epoch boundaries to the store atomically.
136///
137///
138///
139/// # Errors
140///
141/// Fails if the encoding fails, if the [`PutMode`] invariants are not upheld,
142/// or for any other reason the upload might fail.
143pub async fn write_epoch_boundaries<S: ObjectStore>(
144    boundaries: &EpochBoundaries,
145    remote_store: S,
146    put_mode: PutMode,
147) -> Result<()> {
148    let bytes = finalize_epoch_boundaries(boundaries)?;
149    remote_store
150        .put_opts(&EpochBoundaries::file_path(), bytes.into(), put_mode.into())
151        .await?;
152    Ok(())
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158    use crate::{IngestionError, history::MAGIC_BYTES};
159
160    fn sample() -> EpochBoundaries {
161        [(0, 5), (1, 100), (2, 1000)].into_iter().collect()
162    }
163
164    #[test]
165    fn insert_next_enforces_contiguity() {
166        let mut boundaries = EpochBoundaries::default();
167        // The first recorded epoch must be 0.
168        assert!(matches!(
169            boundaries.insert_next(1, 50),
170            Err(IngestionError::EpochBoundary(_))
171        ));
172        boundaries.insert_next(0, 5).unwrap();
173        boundaries.insert_next(1, 100).unwrap();
174        // A gap is rejected.
175        assert!(boundaries.insert_next(3, 200).is_err());
176    }
177
178    #[test]
179    fn write_read() {
180        for boundaries in [EpochBoundaries::default(), sample()] {
181            let bytes = finalize_epoch_boundaries(&boundaries).unwrap();
182            assert_eq!(
183                read_epoch_boundaries_from_bytes(bytes.to_vec()).unwrap(),
184                boundaries
185            );
186        }
187    }
188
189    #[test]
190    fn rejects_wrong_magic() {
191        let mut bytes = finalize_epoch_boundaries(&sample()).unwrap().to_vec();
192        bytes[0] ^= 0xFF;
193        assert!(matches!(
194            read_epoch_boundaries_from_bytes(bytes),
195            Err(IngestionError::HistoryRead(_))
196        ));
197    }
198
199    #[test]
200    fn rejects_corrupted_content() {
201        let mut bytes = finalize_epoch_boundaries(&sample()).unwrap().to_vec();
202        // Flip a byte in the encoded body, past the 4-byte magic.
203        bytes[MAGIC_BYTES + 1] ^= 0xFF;
204        assert!(matches!(
205            read_epoch_boundaries_from_bytes(bytes),
206            Err(IngestionError::HistoryRead(_))
207        ));
208    }
209}