Skip to main content

iota_data_ingestion_core/history/
verifier.rs

1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! Verify the last checkpoint of an epoch against the committee of that epoch.
5//!
6//! The committee of an epoch is recorded only in the last checkpoint of the
7//! previous epoch, as `EndOfEpochData::next_epoch_committee`. Verifying the
8//! last checkpoint of epoch `N` therefore requires the committee taken from the
9//! last checkpoint of epoch `N - 1`, which must itself be verified the same
10//! way going all the way back to the genesis committee.
11//!
12//! [`EpochBoundaryVerifier`] runs this verification given a starting committee
13//! (the genesis committee) after resolving the epoch boundaries from the remote
14//! store. Each checkpoint is fetched into memory from the remote store and
15//! dropped once verified. The verified checkpoints are exposed as a [`Stream`],
16//! so callers can consume each epoch's checkpoint as soon as it is verified.
17//!
18//! The most prominent use of this logic is the verification of formal
19//! snapshots. This is done by comparing the elliptic-curve multiset hash (ECMH)
20//! of the live objects included in the snapshot against the
21//! [`CheckpointCommitment`](iota_types::messages_checkpoint::CheckpointCommitment)
22//! stored in the last checkpoint of the respective epoch.
23
24use futures::{Stream, stream::TryStreamExt};
25use iota_config::genesis::Genesis;
26use iota_types::{
27    committee::{Committee, CommitteeChainVerifier, EpochId},
28    messages_checkpoint::{
29        CertifiedCheckpointSummary, CheckpointSequenceNumber, VerifiedCheckpoint,
30    },
31};
32
33use crate::{
34    IngestionError,
35    errors::IngestionResult as Result,
36    history::{epoch_boundaries::EpochBoundaries, reader::HistoricalReader},
37};
38
39/// Verifies the last checkpoint of each listed epoch against the committee of
40/// that epoch.
41///
42/// A verifier is defined by a starting committee and the sequence numbers of
43/// the checkpoints to verify; the committee advances as each checkpoint is
44/// verified.
45///
46/// # Examples
47///
48/// ```ignore
49/// use iota_config::{genesis::Genesis, node::ArchiveReaderConfig};
50/// use iota_data_ingestion_core::history::{reader::HistoricalReader, verifier::EpochBoundaryVerifier};
51///
52/// let reader = HistoricalReader::new(config)?;
53/// let genesis = Genesis::load(genesis_path)?;
54///
55/// let target_epoch = 1000;
56/// let verifier = EpochBoundaryVerifier::from_genesis(reader, &genesis, target_epoch).await?;
57/// let target = verifier.verify_target_epoch_boundary().await?;
58/// println!("verified last checkpoint of epoch {}", target.epoch());
59/// ```
60pub struct EpochBoundaryVerifier {
61    reader: HistoricalReader,
62    /// The committee-chain walk; its committee is the one expected to have
63    /// signed the next checkpoint to verify.
64    chain_verifier: CommitteeChainVerifier,
65    /// The epoch boundaries stored in the remote store.
66    epoch_boundaries: EpochBoundaries,
67    /// The final epoch to verify.
68    target_epoch: EpochId,
69}
70
71impl EpochBoundaryVerifier {
72    /// Creates a verifier from a starting committee for the target epoch.
73    ///
74    /// # Errors
75    ///
76    /// Fails if the target epoch precedes the starting committee's epoch, or if
77    /// the epoch boundaries cannot be read from the remote store.
78    pub async fn new(
79        reader: HistoricalReader,
80        starting_committee: Committee,
81        target_epoch: EpochId,
82    ) -> Result<Self> {
83        if target_epoch < starting_committee.epoch {
84            return Err(IngestionError::Verification(format!(
85                "target epoch {target_epoch} precedes the starting committee's epoch {}",
86                starting_committee.epoch
87            )));
88        }
89        let epoch_boundaries = reader.epoch_boundaries().await?;
90        Ok(Self {
91            reader,
92            chain_verifier: CommitteeChainVerifier::new(starting_committee),
93            epoch_boundaries,
94            target_epoch,
95        })
96    }
97
98    /// Creates a verifier for the target epoch, whose starting committee is the
99    /// genesis committee.
100    ///
101    /// # Errors
102    ///
103    /// Fails if the epoch boundaries cannot be read from the remote store.
104    pub async fn from_genesis(
105        reader: HistoricalReader,
106        genesis: &Genesis,
107        target_epoch: EpochId,
108    ) -> Result<Self> {
109        let committee = genesis.committee().map_err(|e| {
110            IngestionError::Verification(format!("failed to load genesis committee: {e}"))
111        })?;
112        Self::new(reader, committee, target_epoch).await
113    }
114
115    /// Verifies the last checkpoint of the given epoch.
116    ///
117    /// This consumes the verifier, draining the stream returned by
118    /// [`Self::stream_verified_checkpoints`].
119    ///
120    /// # Errors
121    ///
122    /// Fails if [`Self::stream_verified_checkpoints`] fails.
123    pub async fn verify_target_epoch_boundary(self) -> Result<VerifiedCheckpoint> {
124        let last = self
125            .stream_verified_checkpoints()
126            .await?
127            .try_fold(None, |_, verified| async move { Ok(Some(verified)) })
128            .await?
129            .expect("stream guarantees to yield at least one checkpoint if successful");
130
131        Ok(last)
132    }
133
134    /// Streams the verified last checkpoints from the starting committee's
135    /// epoch up to the target epoch of the verifier.
136    ///
137    /// This consumes the verifier. Each checkpoint is fetched into memory and
138    /// verified only when the stream is polled. Upon successful verification
139    /// the committee is advanced to verify the last checkpoint of the
140    /// next epoch.
141    ///
142    /// # Errors
143    ///
144    /// Generating the stream only fails if the MANIFEST in the remote store
145    /// cannot be synced.
146    ///
147    /// The stream returns an error in the following occasions:
148    ///
149    /// * If the last checkpoint of the next epoch is not recorded in the epoch
150    ///   boundaries.
151    /// * If the summary cannot be fetched from the remote store
152    /// * If signature verification fails
153    /// * If the checkpoint is not the last checkpoint of the epoch
154    pub async fn stream_verified_checkpoints(
155        self,
156    ) -> Result<impl Stream<Item = Result<VerifiedCheckpoint>> + Send> {
157        // Refresh the manifest once before fetching; the per-checkpoint fetches
158        // read from the cached manifest.
159        self.reader.sync_manifest_once().await?;
160
161        Ok(futures::stream::try_unfold(
162            self,
163            |mut verifier| async move {
164                let Some(verified) = verifier.verify_next().await? else {
165                    return Ok(None);
166                };
167                Ok(Some((verified, verifier)))
168            },
169        ))
170    }
171
172    /// Verifies the checkpoint of the next epoch in queue.
173    ///
174    /// In order to do so, the checkpoint summary is fetched from the remote
175    /// store and it is verified against the active committee of the
176    /// corresponding epoch.
177    ///
178    /// The method returns [`None`] if the target epoch has been already
179    /// verified.
180    ///
181    /// Otherwise it returns the verified checkpoint, advancing the committee
182    /// chain to the next epoch.
183    ///
184    /// # Errors
185    ///
186    /// Fails in the following occasions:
187    ///
188    /// * If the last checkpoint of the next epoch is not recorded in the epoch
189    ///   boundaries.
190    /// * If the summary cannot be fetched from the remote store
191    /// * If signature verification fails
192    /// * If the checkpoint is not the last checkpoint of the epoch
193    async fn verify_next(&mut self) -> Result<Option<VerifiedCheckpoint>> {
194        let epoch_to_verify = self.chain_verifier.epoch();
195        if epoch_to_verify > self.target_epoch {
196            return Ok(None);
197        }
198        let Some(sequence_number) = self.epoch_boundaries.get(epoch_to_verify) else {
199            return Err(IngestionError::EpochBoundary(format!(
200                "did not find epoch boundary for epoch {epoch_to_verify}"
201            )));
202        };
203
204        let summary = self.fetch_summary(sequence_number).await?;
205
206        let verified = self
207            .chain_verifier
208            .verify_epoch_close(summary)
209            .map_err(|e| {
210                IngestionError::Verification(format!(
211                    "failed to verify checkpoint {sequence_number} as the close of epoch \
212                 {epoch_to_verify}: {e}"
213                ))
214            })?;
215
216        Ok(Some(verified))
217    }
218
219    /// Fetches a single checkpoint summary into memory.
220    ///
221    /// The method downloads the file with the respective batch of checkpoints,
222    /// and then gets the summary from the full-checkpoint data that match the
223    /// requested `sequence_number`.
224    ///
225    /// # Errors
226    ///
227    /// Fails if the checkpoint cannot be read from the remote store, or if
228    /// it is not found.
229    async fn fetch_summary(
230        &self,
231        sequence_number: CheckpointSequenceNumber,
232    ) -> Result<CertifiedCheckpointSummary> {
233        self.reader
234            .iter_for_range(sequence_number..sequence_number + 1)
235            .await?
236            .next()
237            .map(|data| data.checkpoint_summary)
238            .ok_or_else(|| {
239                IngestionError::HistoryRead(format!(
240                    "checkpoint {sequence_number} not found in remote store"
241                ))
242            })
243    }
244}