iota_data_ingestion_core/history/verifier.rs
1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! Verify the last checkpoint of an epoch against the committee of that epoch.
5//!
6//! The committee of an epoch is recorded only in the last checkpoint of the
7//! previous epoch, as `EndOfEpochData::next_epoch_committee`. Verifying the
8//! last checkpoint of epoch `N` therefore requires the committee taken from the
9//! last checkpoint of epoch `N - 1`, which must itself be verified the same
10//! way going all the way back to the genesis committee.
11//!
12//! [`EpochBoundaryVerifier`] runs this verification given a starting committee
13//! (the genesis committee) after resolving the epoch boundaries from the remote
14//! store. Each checkpoint is fetched into memory from the remote store and
15//! dropped once verified. The verified checkpoints are exposed as a [`Stream`],
16//! so callers can consume each epoch's checkpoint as soon as it is verified.
17//!
18//! The most prominent use of this logic is the verification of formal
19//! snapshots. This is done by comparing the elliptic-curve multiset hash (ECMH)
20//! of the live objects included in the snapshot against the
21//! [`CheckpointCommitment`](iota_types::messages_checkpoint::CheckpointCommitment)
22//! stored in the last checkpoint of the respective epoch.
23
24use futures::{Stream, stream::TryStreamExt};
25use iota_config::genesis::Genesis;
26use iota_types::{
27 committee::{Committee, CommitteeChainVerifier, EpochId},
28 messages_checkpoint::{
29 CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
30 },
31};
32
33use crate::{
34 IngestionError,
35 errors::IngestionResult as Result,
36 history::{epoch_boundaries::EpochBoundaries, reader::HistoricalReader},
37};
38
39/// Verifies the last checkpoint of each listed epoch against the committee of
40/// that epoch.
41///
42/// A verifier is defined by a starting committee and the sequence numbers of
43/// the checkpoints to verify; the committee advances as each checkpoint is
44/// verified.
45///
46/// # Examples
47///
48/// ```ignore
49/// use iota_config::{genesis::Genesis, node::ArchiveReaderConfig};
50/// use iota_data_ingestion_core::history::{reader::HistoricalReader, verifier::EpochBoundaryVerifier};
51///
52/// let reader = HistoricalReader::new(config)?;
53/// let genesis = Genesis::load(genesis_path)?;
54///
55/// let target_epoch = 1000;
56/// let verifier = EpochBoundaryVerifier::from_genesis(reader, &genesis, target_epoch).await?;
57/// let target = verifier.verify_target_epoch_boundary().await?;
58/// println!("verified last checkpoint of epoch {}", target.epoch());
59/// ```
60pub struct EpochBoundaryVerifier {
61 reader: HistoricalReader,
62 /// The committee-chain walk; its committee is the one expected to have
63 /// signed the next checkpoint to verify.
64 chain_verifier: CommitteeChainVerifier,
65 /// The epoch boundaries stored in the remote store.
66 epoch_boundaries: EpochBoundaries,
67 /// The final epoch to verify.
68 target_epoch: EpochId,
69}
70
71impl EpochBoundaryVerifier {
72 /// Creates a verifier from a starting committee for the target epoch.
73 ///
74 /// # Errors
75 ///
76 /// Fails if the target epoch precedes the starting committee's epoch, or if
77 /// the epoch boundaries cannot be read from the remote store.
78 pub async fn new(
79 reader: HistoricalReader,
80 starting_committee: Committee,
81 target_epoch: EpochId,
82 ) -> Result<Self> {
83 if target_epoch < starting_committee.epoch {
84 return Err(IngestionError::Verification(format!(
85 "target epoch {target_epoch} precedes the starting committee's epoch {}",
86 starting_committee.epoch
87 )));
88 }
89 let epoch_boundaries = reader.epoch_boundaries().await?;
90 Ok(Self {
91 reader,
92 chain_verifier: CommitteeChainVerifier::new(starting_committee),
93 epoch_boundaries,
94 target_epoch,
95 })
96 }
97
98 /// Creates a verifier for the target epoch, whose starting committee is the
99 /// genesis committee.
100 ///
101 /// # Errors
102 ///
103 /// Fails if the epoch boundaries cannot be read from the remote store.
104 pub async fn from_genesis(
105 reader: HistoricalReader,
106 genesis: &Genesis,
107 target_epoch: EpochId,
108 ) -> Result<Self> {
109 let committee = genesis.committee().map_err(|e| {
110 IngestionError::Verification(format!("failed to load genesis committee: {e}"))
111 })?;
112 Self::new(reader, committee, target_epoch).await
113 }
114
115 /// Verifies the last checkpoint of the given epoch.
116 ///
117 /// This consumes the verifier, draining the stream returned by
118 /// [`Self::stream_verified_checkpoints`].
119 ///
120 /// # Errors
121 ///
122 /// Fails if [`Self::stream_verified_checkpoints`] fails.
123 pub async fn verify_target_epoch_boundary(self) -> Result<VerifiedCheckpoint> {
124 let last = self
125 .stream_verified_checkpoints()
126 .await?
127 .try_fold(None, |_, verified| async move { Ok(Some(verified)) })
128 .await?
129 .expect("stream guarantees to yield at least one checkpoint if successful");
130
131 Ok(last)
132 }
133
134 /// Streams the verified last checkpoints from the starting committee's
135 /// epoch up to the target epoch of the verifier.
136 ///
137 /// This consumes the verifier. Each checkpoint is fetched into memory and
138 /// verified only when the stream is polled. Upon successful verification
139 /// the committee is advanced to verify the last checkpoint of the
140 /// next epoch.
141 ///
142 /// # Errors
143 ///
144 /// Generating the stream only fails if the MANIFEST in the remote store
145 /// cannot be synced.
146 ///
147 /// The stream returns an error in the following occasions:
148 ///
149 /// * If the last checkpoint of the next epoch is not recorded in the epoch
150 /// boundaries.
151 /// * If the summary cannot be fetched from the remote store
152 /// * If signature verification fails
153 /// * If the checkpoint is not the last checkpoint of the epoch
154 pub async fn stream_verified_checkpoints(
155 self,
156 ) -> Result<impl Stream<Item = Result<VerifiedCheckpoint>> + Send> {
157 // Refresh the manifest once before fetching; the per-checkpoint fetches
158 // read from the cached manifest.
159 self.reader.sync_manifest_once().await?;
160
161 Ok(futures::stream::try_unfold(
162 self,
163 |mut verifier| async move {
164 let Some(verified) = verifier.verify_next().await? else {
165 return Ok(None);
166 };
167 Ok(Some((verified, verifier)))
168 },
169 ))
170 }
171
172 /// Verifies the checkpoint of the next epoch in queue.
173 ///
174 /// In order to do so, the checkpoint summary is fetched from the remote
175 /// store and it is verified against the active committee of the
176 /// corresponding epoch.
177 ///
178 /// The method returns [`None`] if the target epoch has been already
179 /// verified.
180 ///
181 /// Otherwise it returns the verified checkpoint, advancing the committee
182 /// chain to the next epoch.
183 ///
184 /// # Errors
185 ///
186 /// Fails in the following occasions:
187 ///
188 /// * If the last checkpoint of the next epoch is not recorded in the epoch
189 /// boundaries.
190 /// * If the summary cannot be fetched from the remote store
191 /// * If signature verification fails
192 /// * If the checkpoint is not the last checkpoint of the epoch
193 async fn verify_next(&mut self) -> Result<Option<VerifiedCheckpoint>> {
194 let epoch_to_verify = self.chain_verifier.epoch();
195 if epoch_to_verify > self.target_epoch {
196 return Ok(None);
197 }
198 let Some(sequence_number) = self.epoch_boundaries.get(epoch_to_verify) else {
199 return Err(IngestionError::EpochBoundary(format!(
200 "did not find epoch boundary for epoch {epoch_to_verify}"
201 )));
202 };
203
204 let summary = self.fetch_summary(sequence_number).await?;
205
206 let verified = self
207 .chain_verifier
208 .verify_epoch_close(summary)
209 .map_err(|e| {
210 IngestionError::Verification(format!(
211 "failed to verify checkpoint {sequence_number} as the close of epoch \
212 {epoch_to_verify}: {e}"
213 ))
214 })?;
215
216 Ok(Some(verified))
217 }
218
219 /// Fetches a single checkpoint summary into memory.
220 ///
221 /// The method downloads the file with the respective batch of checkpoints,
222 /// and then gets the summary from the full-checkpoint data that match the
223 /// requested `sequence_number`.
224 ///
225 /// # Errors
226 ///
227 /// Fails if the checkpoint cannot be read from the remote store, or if
228 /// it is not found.
229 async fn fetch_summary(
230 &self,
231 sequence_number: CheckpointSequenceNumber,
232 ) -> Result<CertifiedCheckpointSummary> {
233 self.reader
234 .iter_for_range(sequence_number..sequence_number + 1)
235 .await?
236 .next()
237 .map(|data| data.checkpoint_summary)
238 .ok_or_else(|| {
239 IngestionError::HistoryRead(format!(
240 "checkpoint {sequence_number} not found in remote store"
241 ))
242 })
243 }
244}