iota_data_ingestion_core/history/
epoch_boundaries.rs1use std::{collections::BTreeMap, ops::RangeBounds, time::Duration};
7
8use bytes::Bytes;
9use iota_storage::object_store::{ObjectStoreGetExt, util::get};
10use iota_types::{committee::EpochId, messages_checkpoint::CheckpointSequenceNumber};
11use object_store::{ObjectStore, PutMode, path::Path};
12use serde::{Deserialize, Serialize};
13
14use crate::{
15 IngestionError,
16 errors::IngestionResult as Result,
17 history::{
18 EPOCH_BOUNDARIES_FILE_MAGIC, EPOCH_BOUNDARIES_FILENAME, finalize_magic_blob,
19 read_magic_blob,
20 },
21};
22
23const GET_TIMEOUT_SECS: u64 = 5;
24
25#[derive(Debug, Clone, Default, Serialize, Deserialize, Eq, PartialEq)]
43pub struct EpochBoundaries(BTreeMap<EpochId, CheckpointSequenceNumber>);
44
45impl FromIterator<(EpochId, CheckpointSequenceNumber)> for EpochBoundaries {
46 fn from_iter<T>(iter: T) -> Self
47 where
48 T: IntoIterator<Item = (EpochId, CheckpointSequenceNumber)>,
49 {
50 Self(iter.into_iter().collect())
51 }
52}
53
54impl EpochBoundaries {
55 pub fn get(&self, epoch: EpochId) -> Option<CheckpointSequenceNumber> {
57 self.0.get(&epoch).copied()
58 }
59
60 pub fn range(
63 &self,
64 range: impl RangeBounds<EpochId>,
65 ) -> impl Iterator<Item = (EpochId, CheckpointSequenceNumber)> + '_ {
66 self.0
67 .range(range)
68 .map(|(&epoch, &boundary)| (epoch, boundary))
69 }
70
71 pub fn contains(&self, epoch: EpochId) -> bool {
73 self.0.contains_key(&epoch)
74 }
75
76 pub fn insert_next(
83 &mut self,
84 epoch: EpochId,
85 boundary: CheckpointSequenceNumber,
86 ) -> Result<()> {
87 if epoch > 0 && !self.contains(epoch - 1) {
88 return Err(IngestionError::EpochBoundary(format!(
89 "epoch {epoch} just ended but its predecessor is not recorded"
90 )));
91 }
92 self.0.insert(epoch, boundary);
93 Ok(())
94 }
95
96 pub fn file_path() -> Path {
98 Path::from(EPOCH_BOUNDARIES_FILENAME)
99 }
100}
101
102pub async fn read_epoch_boundaries<S: ObjectStoreGetExt>(
108 remote_store: S,
109) -> Result<EpochBoundaries> {
110 let bytes = tokio::time::timeout(
111 Duration::from_secs(GET_TIMEOUT_SECS),
112 get(&remote_store, &EpochBoundaries::file_path()),
113 )
114 .await
115 .map_err(|e| IngestionError::EpochBoundary(e.to_string()))??;
116 read_epoch_boundaries_from_bytes(bytes.to_vec())
117}
118
119pub fn read_epoch_boundaries_from_bytes(vec: Vec<u8>) -> Result<EpochBoundaries> {
126 read_magic_blob(vec, EPOCH_BOUNDARIES_FILE_MAGIC, EPOCH_BOUNDARIES_FILENAME)
127}
128
129pub fn finalize_epoch_boundaries(boundaries: &EpochBoundaries) -> Result<Bytes> {
132 finalize_magic_blob(boundaries, EPOCH_BOUNDARIES_FILE_MAGIC)
133}
134
135pub async fn write_epoch_boundaries<S: ObjectStore>(
144 boundaries: &EpochBoundaries,
145 remote_store: S,
146 put_mode: PutMode,
147) -> Result<()> {
148 let bytes = finalize_epoch_boundaries(boundaries)?;
149 remote_store
150 .put_opts(&EpochBoundaries::file_path(), bytes.into(), put_mode.into())
151 .await?;
152 Ok(())
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158 use crate::{IngestionError, history::MAGIC_BYTES};
159
160 fn sample() -> EpochBoundaries {
161 [(0, 5), (1, 100), (2, 1000)].into_iter().collect()
162 }
163
164 #[test]
165 fn insert_next_enforces_contiguity() {
166 let mut boundaries = EpochBoundaries::default();
167 assert!(matches!(
169 boundaries.insert_next(1, 50),
170 Err(IngestionError::EpochBoundary(_))
171 ));
172 boundaries.insert_next(0, 5).unwrap();
173 boundaries.insert_next(1, 100).unwrap();
174 assert!(boundaries.insert_next(3, 200).is_err());
176 }
177
178 #[test]
179 fn write_read() {
180 for boundaries in [EpochBoundaries::default(), sample()] {
181 let bytes = finalize_epoch_boundaries(&boundaries).unwrap();
182 assert_eq!(
183 read_epoch_boundaries_from_bytes(bytes.to_vec()).unwrap(),
184 boundaries
185 );
186 }
187 }
188
189 #[test]
190 fn rejects_wrong_magic() {
191 let mut bytes = finalize_epoch_boundaries(&sample()).unwrap().to_vec();
192 bytes[0] ^= 0xFF;
193 assert!(matches!(
194 read_epoch_boundaries_from_bytes(bytes),
195 Err(IngestionError::HistoryRead(_))
196 ));
197 }
198
199 #[test]
200 fn rejects_corrupted_content() {
201 let mut bytes = finalize_epoch_boundaries(&sample()).unwrap().to_vec();
202 bytes[MAGIC_BYTES + 1] ^= 0xFF;
204 assert!(matches!(
205 read_epoch_boundaries_from_bytes(bytes),
206 Err(IngestionError::HistoryRead(_))
207 ));
208 }
209}