1use core::sync::atomic::AtomicU64;
6use std::{
7 collections::HashSet,
8 fs,
9 io::{Read, Write},
10 num::NonZeroUsize,
11 sync::Arc,
12};
13
14use anyhow::{Context, Result, bail};
15use getset::Getters;
16use iota_archival::reader::{ArchiveReader, ArchiveReaderMetrics};
17use iota_config::{genesis::Genesis, node::ArchiveReaderConfig};
18use iota_json_rpc_types::CheckpointId;
19use iota_sdk::IotaClientBuilder;
20use iota_types::{
21 committee::Committee,
22 messages_checkpoint::{CertifiedCheckpointSummary, EndOfEpochData, VerifiedCheckpoint},
23 storage::{ObjectStore, ReadStore, WriteStore},
24};
25use prometheus::Registry;
26use serde::{Deserialize, Serialize};
27use tracing::{info, warn};
28
29use crate::{
30 config::Config, graphql::query_last_checkpoint_of_epoch, object_store::CheckpointStore,
31};
32
33#[derive(Debug, Clone, Default, Deserialize, Serialize, Getters)]
35#[getset(get = "pub")]
36pub struct CheckpointList {
37 checkpoints: Vec<u64>,
38}
39
40impl CheckpointList {
41 pub fn len(&self) -> usize {
42 self.checkpoints.len()
43 }
44
45 pub fn is_empty(&self) -> bool {
46 self.checkpoints.is_empty()
47 }
48}
49
50pub fn read_checkpoint_list(config: &Config) -> Result<CheckpointList> {
51 let checkpoints_path = config.checkpoints_list_file_path();
52 let reader = fs::File::open(checkpoints_path)?;
53 Ok(serde_yaml::from_reader(reader)?)
54}
55
56pub fn read_checkpoint_summary(config: &Config, seq: u64) -> Result<CertifiedCheckpointSummary> {
57 let checkpoint_path = config.checkpoint_summary_file_path(seq);
58 let mut reader = fs::File::open(checkpoint_path)?;
59 let mut buffer = Vec::new();
60 reader.read_to_end(&mut buffer)?;
61 Ok(bcs::from_bytes(&buffer).expect("Unable to parse checkpoint file"))
62}
63
64pub fn write_checkpoint_list(config: &Config, checkpoints_list: &CheckpointList) -> Result<()> {
65 let checkpoints_path = config.checkpoints_list_file_path();
66 let mut writer = fs::File::create(checkpoints_path)?;
67 let bytes = serde_yaml::to_vec(checkpoints_list)?;
68 writer
69 .write_all(&bytes)
70 .context("Unable to serialize checkpoint list")
71}
72
73pub fn write_checkpoint_summary(
74 config: &Config,
75 summary: &CertifiedCheckpointSummary,
76) -> Result<()> {
77 let path = config.checkpoint_summary_file_path(*summary.sequence_number());
78 bcs::serialize_into(
79 &mut fs::File::create(&path)
80 .context(format!("error writing summary file '{}'", path.display()))?,
81 &summary,
82 )
83 .expect("error serializing to bcs");
84 Ok(())
85}
86
87pub async fn sync_checkpoint_list_to_latest(config: &Config) -> anyhow::Result<CheckpointList> {
90 let checkpoints_from_archive = if config.archive_store_config.is_some() {
91 match sync_checkpoint_list_to_latest_from_archive(config).await {
92 Ok(list) => list,
93 Err(e) => {
94 warn!("Failed to sync checkpoint list from archive: {e}");
95 CheckpointList::default()
96 }
97 }
98 } else {
99 CheckpointList::default()
100 };
101
102 let checkpoints_from_graphql = if config.graphql_url.is_some() {
103 match sync_checkpoint_list_to_latest_from_graphql(config).await {
104 Ok(list) => list,
105 Err(e) => {
106 warn!("Failed to sync checkpoints from full node: {e}");
107 CheckpointList::default()
108 }
109 }
110 } else {
111 CheckpointList::default()
112 };
113
114 let checkpoint_list =
115 merge_checkpoint_lists(&checkpoints_from_archive, &checkpoints_from_graphql);
116
117 if checkpoint_list.is_empty() {
118 bail!("Unable to sync from configured sources");
119 }
120
121 write_checkpoint_list(config, &checkpoint_list)?;
123
124 Ok(checkpoint_list)
125}
126
127fn merge_checkpoint_lists(list1: &CheckpointList, list2: &CheckpointList) -> CheckpointList {
130 let unique_checkpoints: HashSet<u64> = list1
131 .checkpoints
132 .iter()
133 .chain(list2.checkpoints.iter())
134 .copied()
135 .collect();
136
137 let mut sorted_checkpoints: Vec<_> = unique_checkpoints.into_iter().collect();
139 sorted_checkpoints.sort();
140
141 CheckpointList {
142 checkpoints: sorted_checkpoints,
143 }
144}
145
146async fn sync_checkpoint_list_to_latest_from_graphql(
148 config: &Config,
149) -> anyhow::Result<CheckpointList> {
150 info!("Syncing checkpoint list from GraphQL.");
151
152 let mut checkpoints_list = match read_checkpoint_list(config) {
154 Ok(list) => list,
155 Err(_) => {
156 info!("No existing checkpoint file found. Creating a new checkpoint list.");
157 CheckpointList::default()
158 }
159 };
160
161 let last_epoch = if !checkpoints_list.is_empty() {
163 checkpoints_list.len() as u64 - 1
164 } else {
165 let first_epoch = 0u64;
166 let first_seq = query_last_checkpoint_of_epoch(config, first_epoch).await?;
167 checkpoints_list.checkpoints.push(first_seq);
168 info!("Synced epoch: {first_epoch}, checkpoint: {first_seq}",);
169 first_epoch
170 };
171
172 let client = IotaClientBuilder::default()
174 .build(config.rpc_url.as_str())
175 .await?;
176 let read_api = client.read_api();
177
178 let latest_seq = read_api.get_latest_checkpoint_sequence_number().await?;
180 let latest_checkpoint = read_api
181 .get_checkpoint(CheckpointId::SequenceNumber(latest_seq))
182 .await?;
183
184 for target_epoch in (last_epoch + 1)..latest_checkpoint.epoch {
186 let target_seq = query_last_checkpoint_of_epoch(config, target_epoch).await?;
187 checkpoints_list.checkpoints.push(target_seq);
188 info!("Synced epoch: {target_epoch}, checkpoint: {target_seq}");
189 }
190
191 Ok(checkpoints_list)
192}
193
194async fn sync_checkpoint_list_to_latest_from_archive(
196 config: &Config,
197) -> anyhow::Result<CheckpointList> {
198 info!("Syncing checkpoint list from archive store.");
199
200 let Some(archive_store_config) = &config.archive_store_config else {
201 bail!("Archive store config is not provided");
202 };
203
204 let config = ArchiveReaderConfig {
205 remote_store_config: archive_store_config.clone(),
206 download_concurrency: NonZeroUsize::new(5).unwrap(),
207 use_for_pruning_watermark: false,
208 };
209
210 let metrics = ArchiveReaderMetrics::new(&Registry::default());
211 let archive_reader = ArchiveReader::new(config, &metrics)?;
212 archive_reader.sync_manifest_once().await?;
213
214 let manifest = archive_reader.get_manifest().await?;
215 let checkpoints = manifest.get_all_end_of_epoch_checkpoint_seq_numbers()?;
216
217 Ok(CheckpointList { checkpoints })
218}
219
220pub async fn sync_and_verify_checkpoints(config: &Config) -> anyhow::Result<()> {
221 let checkpoints_list = sync_checkpoint_list_to_latest(config)
222 .await
223 .context("Failed to sync checkpoint list")?;
224
225 let genesis_committee = Genesis::load(config.genesis_blob_file_path())?
227 .committee()
228 .context("Failed to load genesis file")?;
229
230 let mut missing = Vec::new();
232 for seq in checkpoints_list.checkpoints.iter().copied() {
233 if !config.checkpoint_summary_file_path(seq).exists() {
234 if read_checkpoint_summary(config, seq).is_err() {
236 missing.push(seq);
237 }
238 }
239 }
240
241 if !missing.is_empty() {
242 if let Some(archive_store_config) = &config.archive_store_config {
243 info!("Downloading missing checkpoints from archive store.");
244
245 let archive_reader_config = ArchiveReaderConfig {
247 remote_store_config: archive_store_config.clone(),
248 download_concurrency: NonZeroUsize::new(5).unwrap(),
249 use_for_pruning_watermark: false,
250 };
251
252 let store = CheckpointSummaryFileStore::new(config);
253 let counter = Arc::new(AtomicU64::new(0));
254 let metrics = ArchiveReaderMetrics::new(&Registry::default());
255 let archive_reader = ArchiveReader::new(archive_reader_config, &metrics)?;
256 archive_reader.sync_manifest_once().await?;
257 archive_reader
258 .read_summaries_for_list_no_verify(store.clone(), missing, counter)
259 .await?;
260 } else if let Some(_checkpoint_store_url) = &config.checkpoint_store_config {
261 info!("Downloading missing checkpoints from checkpoint store.");
262
263 let checkpoint_store = CheckpointStore::new(config)?;
264 for seq in missing {
265 info!("Downloading checkpoint: {seq}");
266
267 let summary = checkpoint_store
268 .fetch_checkpoint_summary(seq)
269 .await
270 .context(format!(
271 "Failed to download checkpoint summary '{seq}' from checkpoint store"
272 ))?;
273 write_checkpoint_summary(config, &summary)?;
274 }
275 } else {
276 info!("Downloading missing checkpoints from node.");
277
278 let client = iota_rest_api::Client::new(&config.rpc_url);
280
281 for seq in missing {
283 info!("Downloading checkpoint: {seq}");
284
285 let summary = client
286 .get_checkpoint_summary(seq)
287 .await
288 .context(format!("Failed to download checkpoint summary '{seq}'"))?;
289
290 write_checkpoint_summary(config, &summary)?;
291 }
292 }
293 }
294
295 info!("Verifying checkpoints.");
296
297 let mut prev_committee = genesis_committee;
299 for seq in checkpoints_list.checkpoints {
300 let summary_path = config.checkpoint_summary_file_path(seq);
303
304 let summary = if summary_path.exists() {
306 read_checkpoint_summary(config, seq).context("Failed to read checkpoint summary")?
307 } else {
308 panic!("corrupted checkpoint directory");
309 };
310
311 summary.clone().try_into_verified(&prev_committee)?;
313
314 info!(
315 "Verified epoch: {}, checkpoint: {seq}, checkpoint digest: {}",
316 summary.epoch(),
317 summary.digest()
318 );
319
320 if let Some(EndOfEpochData {
322 next_epoch_committee,
323 ..
324 }) = &summary.end_of_epoch_data
325 {
326 let next_committee = next_epoch_committee.iter().cloned().collect();
327 prev_committee =
328 Committee::new(summary.epoch().checked_add(1).unwrap(), next_committee);
329 } else {
330 bail!("Expected all checkpoints to be end-of-epoch checkpoints");
331 }
332 }
333
334 Ok(())
335}
336
337#[derive(Clone, Debug)]
338struct CheckpointSummaryFileStore<'a> {
339 config: &'a Config,
340}
341
342impl<'a> CheckpointSummaryFileStore<'a> {
343 fn new(config: &'a Config) -> Self {
344 Self { config }
345 }
346}
347
348impl WriteStore for CheckpointSummaryFileStore<'_> {
349 fn insert_checkpoint(
350 &self,
351 checkpoint: &VerifiedCheckpoint,
352 ) -> iota_types::storage::error::Result<()> {
353 let path = self
354 .config
355 .checkpoint_summary_file_path(*checkpoint.sequence_number());
356 info!("Downloading checkpoint summary to '{}'", path.display());
357 bcs::serialize_into(
358 &mut fs::File::create(&path).expect("error writing file"),
359 &checkpoint.clone().into_inner(),
360 )
361 .expect("error serializing summary checkpoint to bcs");
362 Ok(())
363 }
364
365 fn update_highest_synced_checkpoint(
366 &self,
367 _: &iota_types::messages_checkpoint::VerifiedCheckpoint,
368 ) -> iota_types::storage::error::Result<()> {
369 unimplemented!()
370 }
371
372 fn update_highest_verified_checkpoint(
373 &self,
374 _: &iota_types::messages_checkpoint::VerifiedCheckpoint,
375 ) -> iota_types::storage::error::Result<()> {
376 unimplemented!()
377 }
378
379 fn insert_checkpoint_contents(
380 &self,
381 _: &iota_types::messages_checkpoint::VerifiedCheckpoint,
382 _: iota_types::messages_checkpoint::VerifiedCheckpointContents,
383 ) -> iota_types::storage::error::Result<()> {
384 unimplemented!()
385 }
386
387 fn insert_committee(&self, _: Committee) -> iota_types::storage::error::Result<()> {
388 unimplemented!()
389 }
390}
391
392impl ReadStore for CheckpointSummaryFileStore<'_> {
393 fn get_committee(
394 &self,
395 _: iota_types::committee::EpochId,
396 ) -> iota_types::storage::error::Result<Option<Arc<Committee>>> {
397 unimplemented!()
398 }
399
400 fn get_latest_checkpoint(&self) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
401 unimplemented!()
402 }
403
404 fn get_highest_verified_checkpoint(
405 &self,
406 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
407 unimplemented!()
408 }
409
410 fn get_highest_synced_checkpoint(
411 &self,
412 ) -> iota_types::storage::error::Result<VerifiedCheckpoint> {
413 unimplemented!()
414 }
415
416 fn get_lowest_available_checkpoint(
417 &self,
418 ) -> iota_types::storage::error::Result<iota_types::messages_checkpoint::CheckpointSequenceNumber>
419 {
420 unimplemented!()
421 }
422
423 fn get_checkpoint_by_digest(
424 &self,
425 _: &iota_types::digests::CheckpointDigest,
426 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
427 unimplemented!()
428 }
429
430 fn get_checkpoint_by_sequence_number(
431 &self,
432 _: iota_types::messages_checkpoint::CheckpointSequenceNumber,
433 ) -> iota_types::storage::error::Result<Option<VerifiedCheckpoint>> {
434 unimplemented!()
435 }
436
437 fn get_checkpoint_contents_by_digest(
438 &self,
439 _: &iota_types::digests::CheckpointContentsDigest,
440 ) -> iota_types::storage::error::Result<
441 Option<iota_types::messages_checkpoint::CheckpointContents>,
442 > {
443 unimplemented!()
444 }
445
446 fn get_checkpoint_contents_by_sequence_number(
447 &self,
448 _: iota_types::messages_checkpoint::CheckpointSequenceNumber,
449 ) -> iota_types::storage::error::Result<
450 Option<iota_types::messages_checkpoint::CheckpointContents>,
451 > {
452 unimplemented!()
453 }
454
455 fn get_transaction(
456 &self,
457 _: &iota_types::digests::TransactionDigest,
458 ) -> iota_types::storage::error::Result<Option<Arc<iota_types::transaction::VerifiedTransaction>>>
459 {
460 unimplemented!()
461 }
462
463 fn get_transaction_effects(
464 &self,
465 _: &iota_types::digests::TransactionDigest,
466 ) -> iota_types::storage::error::Result<Option<iota_types::effects::TransactionEffects>> {
467 unimplemented!()
468 }
469
470 fn get_events(
471 &self,
472 _: &iota_types::digests::TransactionEventsDigest,
473 ) -> iota_types::storage::error::Result<Option<iota_types::effects::TransactionEvents>> {
474 unimplemented!()
475 }
476
477 fn get_full_checkpoint_contents_by_sequence_number(
478 &self,
479 _: iota_types::messages_checkpoint::CheckpointSequenceNumber,
480 ) -> iota_types::storage::error::Result<
481 Option<iota_types::messages_checkpoint::FullCheckpointContents>,
482 > {
483 unimplemented!()
484 }
485
486 fn get_full_checkpoint_contents(
487 &self,
488 _: &iota_types::digests::CheckpointContentsDigest,
489 ) -> iota_types::storage::error::Result<
490 Option<iota_types::messages_checkpoint::FullCheckpointContents>,
491 > {
492 unimplemented!()
493 }
494}
495
496impl ObjectStore for CheckpointSummaryFileStore<'_> {
497 fn get_object(
498 &self,
499 _: &iota_types::base_types::ObjectID,
500 ) -> iota_types::storage::error::Result<Option<iota_types::object::Object>> {
501 unimplemented!()
502 }
503
504 fn get_object_by_key(
505 &self,
506 _: &iota_types::base_types::ObjectID,
507 _: iota_types::base_types::VersionNumber,
508 ) -> iota_types::storage::error::Result<Option<iota_types::object::Object>> {
509 unimplemented!()
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use iota_types::{
516 crypto::AuthorityQuorumSignInfo,
517 gas::GasCostSummary,
518 message_envelope::Envelope,
519 messages_checkpoint::{CheckpointContents, CheckpointSummary},
520 supported_protocol_versions::ProtocolConfig,
521 };
522 use roaring::RoaringBitmap;
523 use tempfile::TempDir;
524
525 use super::*;
526
527 fn create_test_config() -> (Config, TempDir) {
528 let temp_dir = TempDir::new().unwrap();
529 let config = Config {
530 rpc_url: "http://localhost:9000".parse().unwrap(),
531 graphql_url: None,
532 checkpoints_dir: temp_dir.path().to_path_buf(),
533 sync_before_check: false,
534 genesis_blob_download_url: None,
535 checkpoint_store_config: None,
536 archive_store_config: None,
537 };
538 (config, temp_dir)
539 }
540
541 #[test]
542 fn test_checkpoint_list_read_write() {
543 let (config, _temp_dir) = create_test_config();
544 let test_list = CheckpointList {
545 checkpoints: vec![1, 2, 3],
546 };
547
548 write_checkpoint_list(&config, &test_list).unwrap();
549 let read_list = read_checkpoint_list(&config).unwrap();
550
551 assert_eq!(test_list.checkpoints, read_list.checkpoints);
552 }
553
554 #[test]
555 fn test_checkpoint_read_write() {
556 let (config, _temp_dir) = create_test_config();
557 let contents = CheckpointContents::new_with_digests_only_for_tests(vec![]);
558 let summary = CheckpointSummary::new(
559 &ProtocolConfig::get_for_max_version_UNSAFE(),
560 0,
561 0,
562 0,
563 &contents,
564 None,
565 GasCostSummary::default(),
566 None,
567 0,
568 Vec::new(),
569 );
570 let info = AuthorityQuorumSignInfo::<true> {
571 epoch: 0,
572 signature: Default::default(),
573 signers_map: RoaringBitmap::new(),
574 };
575 let test_summary = Envelope::new_from_data_and_sig(summary, info);
576
577 write_checkpoint_summary(&config, &test_summary).unwrap();
578 let read_summary = read_checkpoint_summary(&config, 0).unwrap();
579
580 assert_eq!(
581 test_summary.sequence_number(),
582 read_summary.sequence_number()
583 );
584 }
585}