iota_light_client/
checkpoint.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2025 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use core::sync::atomic::AtomicU64;
6use std::{
7    collections::HashSet,
8    fs,
9    io::{Read, Write},
10    num::NonZeroUsize,
11    sync::Arc,
12};
13
14use anyhow::{Context, Result, bail};
15use getset::Getters;
16use iota_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
17use iota_config::{genesis::Genesis, node::ArchiveReaderConfig};
18use iota_json_rpc_types::CheckpointId;
19use iota_sdk::IotaClientBuilder;
20use iota_types::{
21    committee::Committee,
22    messages_checkpoint::{CertifiedCheckpointSummary, EndOfEpochData, VerifiedCheckpoint},
23    storage::{ObjectStore, ReadStore, WriteStore},
24};
25use prometheus::Registry;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::{
30    config::Config, graphql::query_last_checkpoint_of_epoch, object_store::CheckpointStore,
31};
32
33// The list of checkpoints at the end of each epoch
34#[derive(Debug, Clone, Default, Deserialize, Serialize, Getters)]
35#[getset(get = "pub")]
36pub struct CheckpointList {
37    checkpoints: Vec<u64>,
38}
39
40impl CheckpointList {
41    pub fn len(&self) -> usize {
42        self.checkpoints.len()
43    }
44
45    pub fn is_empty(&self) -> bool {
46        self.checkpoints.is_empty()
47    }
48}
49
50pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointList> {
51    let checkpoints_path = config.checkpoints_list_file_path();
52    let reader = fs::File::open(checkpoints_path)?;
53    Ok(serde_yaml::from_reader(reader)?)
54}
55
56pub fn read_checkpoint_summary(config: &Config, seq: u64) -> Result<CertifiedCheckpointSummary> {
57    let checkpoint_path = config.checkpoint_summary_file_path(seq);
58    let mut reader = fs::File::open(checkpoint_path)?;
59    let mut buffer = Vec::new();
60    reader.read_to_end(&mut buffer)?;
61    Ok(bcs::from_bytes(&buffer).expect("Unable to parse checkpoint file"))
62}
63
64pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointList) -> Result<()> {
65    let checkpoints_path = config.checkpoints_list_file_path();
66    let mut writer = fs::File::create(checkpoints_path)?;
67    let bytes = serde_yaml::to_vec(checkpoints_list)?;
68    writer
69        .write_all(&bytes)
70        .context("Unable to serialize checkpoint list")
71}
72
73pub fn write_checkpoint_summary(
74    config: &Config,
75    summary: &CertifiedCheckpointSummary,
76) -> Result<()> {
77    let path = config.checkpoint_summary_file_path(*summary.sequence_number());
78    bcs::serialize_into(
79        &mut fs::File::create(&path)
80            .context(format!("error writing summary file '{}'", path.display()))?,
81        &summary,
82    )
83    .expect("error serializing to bcs");
84    Ok(())
85}
86
87/// Downloads the list of end of epoch checkpoints from the archive store or the
88/// GraphQL endpoint
89pub async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointList> {
90    let checkpoints_from_archive = if config.archive_store_config.is_some() {
91        match sync_checkpoint_list_to_latest_from_archive(config).await {
92            Ok(list) => list,
93            Err(e) => {
94                warn!("Failed to sync checkpoint list from archive: {e}");
95                CheckpointList::default()
96            }
97        }
98    } else {
99        CheckpointList::default()
100    };
101
102    let checkpoints_from_graphql = if config.graphql_url.is_some() {
103        match sync_checkpoint_list_to_latest_from_graphql(config).await {
104            Ok(list) => list,
105            Err(e) => {
106                warn!("Failed to sync checkpoints from full node: {e}");
107                CheckpointList::default()
108            }
109        }
110    } else {
111        CheckpointList::default()
112    };
113
114    let checkpoint_list =
115        merge_checkpoint_lists(&checkpoints_from_archive, &checkpoints_from_graphql);
116
117    if checkpoint_list.is_empty() {
118        bail!("Unable to sync from configured sources");
119    }
120
121    // Write the fetched checkpoint list to disk
122    write_checkpoint_list(config, &checkpoint_list)?;
123
124    Ok(checkpoint_list)
125}
126
127/// Merges two checkpoint lists, removing duplicates and ensuring the result is
128/// sorted
129fn merge_checkpoint_lists(list1: &CheckpointList, list2: &CheckpointList) -> CheckpointList {
130    let unique_checkpoints: HashSet<u64> = list1
131        .checkpoints
132        .iter()
133        .chain(list2.checkpoints.iter())
134        .copied()
135        .collect();
136
137    // Convert to sorted vector
138    let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
139    sorted_checkpoints.sort();
140
141    CheckpointList {
142        checkpoints: sorted_checkpoints,
143    }
144}
145
146/// Syncs the list of end-of-epoch checkpoints from GraphQL.
147async fn sync_checkpoint_list_to_latest_from_graphql(
148    config: &Config,
149) -> anyhow::Result<CheckpointList> {
150    info!("Syncing checkpoint list from GraphQL.");
151
152    // Get the local checkpoint list, or create an empty one if it doesn't exist
153    let mut checkpoints_list = match read_checkpoint_list(config) {
154        Ok(list) => list,
155        Err(_) => {
156            info!("No existing checkpoint file found. Creating a new checkpoint list.");
157            CheckpointList::default()
158        }
159    };
160
161    // Get the last synced epoch, or fetch the first
162    let last_epoch = if !checkpoints_list.is_empty() {
163        checkpoints_list.len() as u64 - 1
164    } else {
165        let first_epoch = 0u64;
166        let first_seq = query_last_checkpoint_of_epoch(config, first_epoch).await?;
167        checkpoints_list.checkpoints.push(first_seq);
168        info!("Synced epoch: {first_epoch}, checkpoint: {first_seq}",);
169        first_epoch
170    };
171
172    // Download the last synced checkpoint from the node
173    let client = IotaClientBuilder::default()
174        .build(config.rpc_url.as_str())
175        .await?;
176    let read_api = client.read_api();
177
178    // Download the latest available checkpoint from the node
179    let latest_seq = read_api.get_latest_checkpoint_sequence_number().await?;
180    let latest_checkpoint = read_api
181        .get_checkpoint(CheckpointId::SequenceNumber(latest_seq))
182        .await?;
183
184    // Sequentially record all the missing end of epoch checkpoints numbers
185    for target_epoch in (last_epoch + 1)..latest_checkpoint.epoch {
186        let target_seq = query_last_checkpoint_of_epoch(config, target_epoch).await?;
187        checkpoints_list.checkpoints.push(target_seq);
188        info!("Synced epoch: {target_epoch}, checkpoint: {target_seq}");
189    }
190
191    Ok(checkpoints_list)
192}
193
194/// Syncs the list of end-of-epoch checkpoints from an archive store.
195async fn sync_checkpoint_list_to_latest_from_archive(
196    config: &Config,
197) -> anyhow::Result<CheckpointList> {
198    info!("Syncing checkpoint list from archive store.");
199
200    let Some(archive_store_config) = &config.archive_store_config else {
201        bail!("Archive store config is not provided");
202    };
203
204    let config = ArchiveReaderConfig {
205        remote_store_config: archive_store_config.clone(),
206        download_concurrency: NonZeroUsize::new(5).unwrap(),
207        use_for_pruning_watermark: false,
208    };
209
210    let metrics = ArchiveReaderMetrics::new(&Registry::default());
211    let archive_reader = ArchiveReader::new(config, &metrics)?;
212    archive_reader.sync_manifest_once().await?;
213
214    let manifest = archive_reader.get_manifest().await?;
215    let checkpoints = manifest.get_all_end_of_epoch_checkpoint_seq_numbers()?;
216
217    Ok(CheckpointList { checkpoints })
218}
219
220pub async fn sync_and_verify_checkpoints(config: &Config) -> anyhow::Result<()> {
221    let checkpoints_list = sync_checkpoint_list_to_latest(config)
222        .await
223        .context("Failed to sync checkpoint list")?;
224
225    // Load the genesis committee
226    let genesis_committee = Genesis::load(config.genesis_blob_file_path())?
227        .committee()
228        .context("Failed to load genesis file")?;
229
230    // Create a list of summaries that need to be downloaded
231    let mut missing = Vec::new();
232    for seq in checkpoints_list.checkpoints.iter().copied() {
233        if !config.checkpoint_summary_file_path(seq).exists() {
234            // ensure the file is valid and can be parsed
235            if read_checkpoint_summary(config, seq).is_err() {
236                missing.push(seq);
237            }
238        }
239    }
240
241    if !missing.is_empty() {
242        if let Some(archive_store_config) = &config.archive_store_config {
243            info!("Downloading missing checkpoints from archive store.");
244
245            // Download summaries from archive store
246            let archive_reader_config = ArchiveReaderConfig {
247                remote_store_config: archive_store_config.clone(),
248                download_concurrency: NonZeroUsize::new(5).unwrap(),
249                use_for_pruning_watermark: false,
250            };
251
252            let store = CheckpointSummaryFileStore::new(config);
253            let counter = Arc::new(AtomicU64::new(0));
254            let metrics = ArchiveReaderMetrics::new(&Registry::default());
255            let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
256            archive_reader.sync_manifest_once().await?;
257            archive_reader
258                .read_summaries_for_list_no_verify(store.clone(), missing, counter)
259                .await?;
260        } else if let Some(_checkpoint_store_url) = &config.checkpoint_store_config {
261            info!("Downloading missing checkpoints from checkpoint store.");
262
263            let checkpoint_store = CheckpointStore::new(config)?;
264            for seq in missing {
265                info!("Downloading checkpoint: {seq}");
266
267                let summary = checkpoint_store
268                    .fetch_checkpoint_summary(seq)
269                    .await
270                    .context(format!(
271                        "Failed to download checkpoint summary '{seq}' from checkpoint store"
272                    ))?;
273                write_checkpoint_summary(config, &summary)?;
274            }
275        } else {
276            info!("Downloading missing checkpoints from node.");
277
278            // Download summaries from the full node
279            let client = iota_rest_api::Client::new(&config.rpc_url);
280
281            // Download all missing checkpoints
282            for seq in missing {
283                info!("Downloading checkpoint: {seq}");
284
285                let summary = client
286                    .get_checkpoint_summary(seq)
287                    .await
288                    .context(format!("Failed to download checkpoint summary '{seq}'"))?;
289
290                write_checkpoint_summary(config, &summary)?;
291            }
292        }
293    }
294
295    info!("Verifying checkpoints.");
296
297    // Check the signatures of all checkpoints
298    let mut prev_committee = genesis_committee;
299    for seq in checkpoints_list.checkpoints {
300        // Check if there is a corresponding checkpoint summary file in the checkpoints
301        // directory
302        let summary_path = config.checkpoint_summary_file_path(seq);
303
304        // If file exists read the file otherwise download it from the server
305        let summary = if summary_path.exists() {
306            read_checkpoint_summary(config, seq).context("Failed to read checkpoint summary")?
307        } else {
308            panic!("corrupted checkpoint directory");
309        };
310
311        // Verify the checkpoint
312        summary.clone().try_into_verified(&prev_committee)?;
313
314        info!(
315            "Verified epoch: {}, checkpoint: {seq}, checkpoint digest: {}",
316            summary.epoch(),
317            summary.digest()
318        );
319
320        // Extract the next committee information
321        if let Some(EndOfEpochData {
322            next_epoch_committee,
323            ..
324        }) = &summary.end_of_epoch_data
325        {
326            let next_committee = next_epoch_committee.iter().cloned().collect();
327            prev_committee =
328                Committee::new(summary.epoch().checked_add(1).unwrap(), next_committee);
329        } else {
330            bail!("Expected all checkpoints to be end-of-epoch checkpoints");
331        }
332    }
333
334    Ok(())
335}
336
337#[derive(Clone, Debug)]
338struct CheckpointSummaryFileStore<'a> {
339    config: &'a Config,
340}
341
342impl<'a> CheckpointSummaryFileStore<'a> {
343    fn new(config: &'a Config) -> Self {
344        Self { config }
345    }
346}
347
348impl WriteStore for CheckpointSummaryFileStore<'_> {
349    fn insert_checkpoint(
350        &self,
351        checkpoint: &VerifiedCheckpoint,
352    ) -> iota_types::storage::error::Result<()> {
353        let path = self
354            .config
355            .checkpoint_summary_file_path(*checkpoint.sequence_number());
356        info!("Downloading checkpoint summary to '{}'", path.display());
357        bcs::serialize_into(
358            &mut fs::File::create(&path).expect("error writing file"),
359            &checkpoint.clone().into_inner(),
360        )
361        .expect("error serializing summary checkpoint to bcs");
362        Ok(())
363    }
364
365    fn update_highest_synced_checkpoint(
366        &self,
367        _: &iota_types::messages_checkpoint::VerifiedCheckpoint,
368    ) -> iota_types::storage::error::Result<()> {
369        unimplemented!()
370    }
371
372    fn update_highest_verified_checkpoint(
373        &self,
374        _: &iota_types::messages_checkpoint::VerifiedCheckpoint,
375    ) -> iota_types::storage::error::Result<()> {
376        unimplemented!()
377    }
378
379    fn insert_checkpoint_contents(
380        &self,
381        _: &iota_types::messages_checkpoint::VerifiedCheckpoint,
382        _: iota_types::messages_checkpoint::VerifiedCheckpointContents,
383    ) -> iota_types::storage::error::Result<()> {
384        unimplemented!()
385    }
386
387    fn insert_committee(&self, _: Committee) -> iota_types::storage::error::Result<()> {
388        unimplemented!()
389    }
390}
391
392impl ReadStore for CheckpointSummaryFileStore<'_> {
393    fn get_committee(
394        &self,
395        _: iota_types::committee::EpochId,
396    ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
397        unimplemented!()
398    }
399
400    fn get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
401        unimplemented!()
402    }
403
404    fn get_highest_verified_checkpoint(
405        &self,
406    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
407        unimplemented!()
408    }
409
410    fn get_highest_synced_checkpoint(
411        &self,
412    ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
413        unimplemented!()
414    }
415
416    fn get_lowest_available_checkpoint(
417        &self,
418    ) -> iota_types::storage::error::Result<iota_types::messages_checkpoint::CheckpointSequenceNumber>
419    {
420        unimplemented!()
421    }
422
423    fn get_checkpoint_by_digest(
424        &self,
425        _: &iota_types::digests::CheckpointDigest,
426    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
427        unimplemented!()
428    }
429
430    fn get_checkpoint_by_sequence_number(
431        &self,
432        _: iota_types::messages_checkpoint::CheckpointSequenceNumber,
433    ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
434        unimplemented!()
435    }
436
437    fn get_checkpoint_contents_by_digest(
438        &self,
439        _: &iota_types::digests::CheckpointContentsDigest,
440    ) -> iota_types::storage::error::Result<
441        Option<iota_types::messages_checkpoint::CheckpointContents>,
442    > {
443        unimplemented!()
444    }
445
446    fn get_checkpoint_contents_by_sequence_number(
447        &self,
448        _: iota_types::messages_checkpoint::CheckpointSequenceNumber,
449    ) -> iota_types::storage::error::Result<
450        Option<iota_types::messages_checkpoint::CheckpointContents>,
451    > {
452        unimplemented!()
453    }
454
455    fn get_transaction(
456        &self,
457        _: &iota_types::digests::TransactionDigest,
458    ) -> iota_types::storage::error::Result<Option<Arc<iota_types::transaction::VerifiedTransaction>>>
459    {
460        unimplemented!()
461    }
462
463    fn get_transaction_effects(
464        &self,
465        _: &iota_types::digests::TransactionDigest,
466    ) -> iota_types::storage::error::Result<Option<iota_types::effects::TransactionEffects>> {
467        unimplemented!()
468    }
469
470    fn get_events(
471        &self,
472        _: &iota_types::digests::TransactionEventsDigest,
473    ) -> iota_types::storage::error::Result<Option<iota_types::effects::TransactionEvents>> {
474        unimplemented!()
475    }
476
477    fn get_full_checkpoint_contents_by_sequence_number(
478        &self,
479        _: iota_types::messages_checkpoint::CheckpointSequenceNumber,
480    ) -> iota_types::storage::error::Result<
481        Option<iota_types::messages_checkpoint::FullCheckpointContents>,
482    > {
483        unimplemented!()
484    }
485
486    fn get_full_checkpoint_contents(
487        &self,
488        _: &iota_types::digests::CheckpointContentsDigest,
489    ) -> iota_types::storage::error::Result<
490        Option<iota_types::messages_checkpoint::FullCheckpointContents>,
491    > {
492        unimplemented!()
493    }
494}
495
496impl ObjectStore for CheckpointSummaryFileStore<'_> {
497    fn get_object(
498        &self,
499        _: &iota_types::base_types::ObjectID,
500    ) -> iota_types::storage::error::Result<Option<iota_types::object::Object>> {
501        unimplemented!()
502    }
503
504    fn get_object_by_key(
505        &self,
506        _: &iota_types::base_types::ObjectID,
507        _: iota_types::base_types::VersionNumber,
508    ) -> iota_types::storage::error::Result<Option<iota_types::object::Object>> {
509        unimplemented!()
510    }
511}
512
513#[cfg(test)]
514mod tests {
515    use iota_types::{
516        crypto::AuthorityQuorumSignInfo,
517        gas::GasCostSummary,
518        message_envelope::Envelope,
519        messages_checkpoint::{CheckpointContents, CheckpointSummary},
520        supported_protocol_versions::ProtocolConfig,
521    };
522    use roaring::RoaringBitmap;
523    use tempfile::TempDir;
524
525    use super::*;
526
527    fn create_test_config() -> (Config, TempDir) {
528        let temp_dir = TempDir::new().unwrap();
529        let config = Config {
530            rpc_url: "http://localhost:9000".parse().unwrap(),
531            graphql_url: None,
532            checkpoints_dir: temp_dir.path().to_path_buf(),
533            sync_before_check: false,
534            genesis_blob_download_url: None,
535            checkpoint_store_config: None,
536            archive_store_config: None,
537        };
538        (config, temp_dir)
539    }
540
541    #[test]
542    fn test_checkpoint_list_read_write() {
543        let (config, _temp_dir) = create_test_config();
544        let test_list = CheckpointList {
545            checkpoints: vec![1, 2, 3],
546        };
547
548        write_checkpoint_list(&config, &test_list).unwrap();
549        let read_list = read_checkpoint_list(&config).unwrap();
550
551        assert_eq!(test_list.checkpoints, read_list.checkpoints);
552    }
553
554    #[test]
555    fn test_checkpoint_read_write() {
556        let (config, _temp_dir) = create_test_config();
557        let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
558        let summary = CheckpointSummary::new(
559            &ProtocolConfig::get_for_max_version_UNSAFE(),
560            0,
561            0,
562            0,
563            &contents,
564            None,
565            GasCostSummary::default(),
566            None,
567            0,
568            Vec::new(),
569        );
570        let info = AuthorityQuorumSignInfo::<true> {
571            epoch: 0,
572            signature: Default::default(),
573            signers_map: RoaringBitmap::new(),
574        };
575        let test_summary = Envelope::new_from_data_and_sig(summary, info);
576
577        write_checkpoint_summary(&config, &test_summary).unwrap();
578        let read_summary = read_checkpoint_summary(&config, 0).unwrap();
579
580        assert_eq!(
581            test_summary.sequence_number(),
582            read_summary.sequence_number()
583        );
584    }
585}