1#![allow(dead_code)]
6
7pub mod reader;
8pub mod writer;
9
10#[cfg(test)]
11mod tests;
12
13use std::{
14 fs,
15 io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
16 num::NonZeroUsize,
17 ops::Range,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25use anyhow::{Result, anyhow, bail};
26use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
27use bytes::Bytes;
28use fastcrypto::hash::{HashFunction, Sha3_256};
29use indicatif::{ProgressBar, ProgressStyle};
30use iota_config::{
31 genesis::Genesis, node::ArchiveReaderConfig, object_storage_config::ObjectStoreConfig,
32};
33use iota_storage::{
34 SHA3_BYTES,
35 blob::{Blob, BlobEncoding},
36 compute_sha3_checksum, compute_sha3_checksum_for_bytes,
37 object_store::{
38 ObjectStoreGetExt, ObjectStorePutExt,
39 util::{get, put},
40 },
41};
42use iota_types::{
43 base_types::ExecutionData,
44 messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents},
45 storage::{SingleCheckpointSharedInMemoryStore, WriteStore},
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use prometheus::Registry;
50use serde::{Deserialize, Serialize};
51use tracing::{error, info};
52
53use crate::reader::{ArchiveReader, ArchiveReaderMetrics};
54
55#[expect(rustdoc::invalid_html_tags)]
56pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
111pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
112const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
113const MAGIC_BYTES: usize = 4;
114const CHECKPOINT_FILE_SUFFIX: &str = "chk";
115const SUMMARY_FILE_SUFFIX: &str = "sum";
116const EPOCH_DIR_PREFIX: &str = "epoch_";
117const MANIFEST_FILENAME: &str = "MANIFEST";
118
119#[derive(
120 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
121)]
122#[repr(u8)]
123pub enum FileType {
124 CheckpointContent = 0,
125 CheckpointSummary,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub struct FileMetadata {
130 pub file_type: FileType,
131 pub epoch_num: u64,
132 pub checkpoint_seq_range: Range<u64>,
133 pub sha3_digest: [u8; 32],
134}
135
136impl FileMetadata {
137 pub fn file_path(&self) -> Path {
138 let dir_path = Path::from(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num));
139 match self.file_type {
140 FileType::CheckpointContent => dir_path.child(&*format!(
141 "{}.{CHECKPOINT_FILE_SUFFIX}",
142 self.checkpoint_seq_range.start
143 )),
144 FileType::CheckpointSummary => dir_path.child(&*format!(
145 "{}.{SUMMARY_FILE_SUFFIX}",
146 self.checkpoint_seq_range.start
147 )),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
153pub struct ManifestV1 {
154 pub archive_version: u8,
155 pub next_checkpoint_seq_num: u64,
156 pub file_metadata: Vec<FileMetadata>,
157 pub epoch: u64,
158}
159
160#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
161pub enum Manifest {
162 V1(ManifestV1),
163}
164
165impl Manifest {
166 pub fn new(epoch: u64, next_checkpoint_seq_num: u64) -> Self {
167 Manifest::V1(ManifestV1 {
168 archive_version: 1,
169 next_checkpoint_seq_num,
170 file_metadata: vec![],
171 epoch,
172 })
173 }
174 pub fn files(&self) -> Vec<FileMetadata> {
175 match self {
176 Manifest::V1(manifest) => manifest.file_metadata.clone(),
177 }
178 }
179 pub fn epoch_num(&self) -> u64 {
180 match self {
181 Manifest::V1(manifest) => manifest.epoch,
182 }
183 }
184 pub fn next_checkpoint_seq_num(&self) -> u64 {
185 match self {
186 Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
187 }
188 }
189 pub fn next_checkpoint_after_epoch(&self, epoch_num: u64) -> u64 {
190 match self {
191 Manifest::V1(manifest) => {
192 let mut summary_files: Vec<_> = manifest
193 .file_metadata
194 .clone()
195 .into_iter()
196 .filter(|f| f.file_type == FileType::CheckpointSummary)
197 .collect();
198 summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
199 assert!(
200 summary_files
201 .windows(2)
202 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
203 );
204 assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
205 summary_files
206 .iter()
207 .find(|f| f.epoch_num > epoch_num)
208 .map(|f| f.checkpoint_seq_range.start)
209 .unwrap_or(u64::MAX)
210 }
211 }
212 }
213
214 pub fn get_all_end_of_epoch_checkpoint_seq_numbers(&self) -> Result<Vec<u64>> {
215 match self {
216 Manifest::V1(manifest) => {
217 let mut summary_files: Vec<_> = manifest
218 .file_metadata
219 .clone()
220 .into_iter()
221 .filter(|f| f.file_type == FileType::CheckpointSummary)
222 .collect();
223 summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
224 assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
225 let res = summary_files.windows(2).filter_map(|w| {
227 assert_eq!(
228 w[1].checkpoint_seq_range.start,
229 w[0].checkpoint_seq_range.end
230 );
231 if w[1].epoch_num == w[0].epoch_num + 1 {
232 Some(w[0].checkpoint_seq_range.end - 1)
233 } else {
234 None
235 }
236 });
237 Ok(res.collect())
238 }
239 }
240 }
241
242 pub fn update(
243 &mut self,
244 epoch_num: u64,
245 checkpoint_sequence_number: u64,
246 checkpoint_file_metadata: FileMetadata,
247 summary_file_metadata: FileMetadata,
248 ) {
249 match self {
250 Manifest::V1(manifest) => {
251 manifest.epoch = epoch_num;
252 manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
253 if let Some(last_file_metadata) = manifest.file_metadata.last() {
254 if last_file_metadata.checkpoint_seq_range
255 == checkpoint_file_metadata.checkpoint_seq_range
256 {
257 return;
258 }
259 }
260 manifest
261 .file_metadata
262 .extend(vec![checkpoint_file_metadata, summary_file_metadata]);
263 }
264 }
265 }
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
269pub struct CheckpointUpdates {
270 checkpoint_file_metadata: FileMetadata,
271 summary_file_metadata: FileMetadata,
272 manifest: Manifest,
273}
274
275impl CheckpointUpdates {
276 pub fn new(
277 epoch_num: u64,
278 checkpoint_sequence_number: u64,
279 checkpoint_file_metadata: FileMetadata,
280 summary_file_metadata: FileMetadata,
281 manifest: &mut Manifest,
282 ) -> Self {
283 manifest.update(
284 epoch_num,
285 checkpoint_sequence_number,
286 checkpoint_file_metadata.clone(),
287 summary_file_metadata.clone(),
288 );
289 CheckpointUpdates {
290 checkpoint_file_metadata,
291 summary_file_metadata,
292 manifest: manifest.clone(),
293 }
294 }
295 pub fn content_file_path(&self) -> Path {
296 self.checkpoint_file_metadata.file_path()
297 }
298 pub fn summary_file_path(&self) -> Path {
299 self.summary_file_metadata.file_path()
300 }
301 pub fn manifest_file_path(&self) -> Path {
302 Path::from(MANIFEST_FILENAME)
303 }
304}
305
306pub fn create_file_metadata(
307 file_path: &std::path::Path,
308 file_type: FileType,
309 epoch_num: u64,
310 checkpoint_seq_range: Range<u64>,
311) -> Result<FileMetadata> {
312 let sha3_digest = compute_sha3_checksum(file_path)?;
313 let file_metadata = FileMetadata {
314 file_type,
315 epoch_num,
316 checkpoint_seq_range,
317 sha3_digest,
318 };
319 Ok(file_metadata)
320}
321
322pub fn create_file_metadata_from_bytes(
323 bytes: Bytes,
324 file_type: FileType,
325 epoch_num: u64,
326 checkpoint_seq_range: Range<u64>,
327) -> Result<FileMetadata> {
328 let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
329 let file_metadata = FileMetadata {
330 file_type,
331 epoch_num,
332 checkpoint_seq_range,
333 sha3_digest,
334 };
335 Ok(file_metadata)
336}
337
338pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
340 let manifest_file_path = Path::from(MANIFEST_FILENAME);
341 let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
342 read_manifest_from_bytes(vec)
343}
344
345pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
348 let manifest_file_size = vec.len();
349 let mut manifest_reader = Cursor::new(vec);
350
351 manifest_reader.rewind()?;
354 let magic = manifest_reader.read_u32::<BigEndian>()?;
355 if magic != MANIFEST_FILE_MAGIC {
356 bail!("Unexpected magic byte in manifest: {}", magic);
357 }
358
359 manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
362 let mut sha3_digest = [0u8; SHA3_BYTES];
363 manifest_reader.read_exact(&mut sha3_digest)?;
364
365 manifest_reader.rewind()?;
368 let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
369 manifest_reader.read_exact(&mut content_buf)?;
370 let mut hasher = Sha3_256::default();
371 hasher.update(&content_buf);
372 let computed_digest = hasher.finalize().digest;
373 if computed_digest != sha3_digest {
374 bail!(
375 "Manifest corrupted, computed checksum: {:?}, stored checksum: {:?}",
376 computed_digest,
377 sha3_digest
378 );
379 }
380 manifest_reader.rewind()?;
381 manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
382 Blob::read(&mut manifest_reader)?.decode()
383}
384
385pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
387 let mut buf = BufWriter::new(vec![]);
388 buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
389 let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
390 blob.write(&mut buf)?;
391 buf.flush()?;
392 let mut hasher = Sha3_256::default();
393 hasher.update(buf.get_ref());
394 let computed_digest = hasher.finalize().digest;
395 buf.write_all(&computed_digest)?;
396 Ok(Bytes::from(buf.into_inner()?))
397}
398
399pub async fn write_manifest<S: ObjectStorePutExt>(
401 manifest: Manifest,
402 remote_store: S,
403) -> Result<()> {
404 let path = Path::from(MANIFEST_FILENAME);
405 let bytes = finalize_manifest(manifest)?;
406 put(&remote_store, &path, bytes).await?;
407 Ok(())
408}
409
410pub async fn read_manifest_as_json(remote_store_config: ObjectStoreConfig) -> Result<String> {
411 let metrics = ArchiveReaderMetrics::new(&Registry::default());
412 let config = ArchiveReaderConfig {
413 remote_store_config,
414 download_concurrency: NonZeroUsize::new(1).unwrap(),
415 use_for_pruning_watermark: false,
416 };
417 let archive_reader = ArchiveReader::new(config, &metrics)?;
418 archive_reader.sync_manifest_once().await?;
419 let manifest = archive_reader.get_manifest().await?;
420 let json = serde_json::to_string(&manifest).expect("Failed to serialize object");
421 Ok(json)
422}
423
424pub async fn write_manifest_from_json(
425 remote_store_config: ObjectStoreConfig,
426 json_manifest_path: std::path::PathBuf,
427) -> Result<()> {
428 let manifest: Manifest = serde_json::from_str(&fs::read_to_string(json_manifest_path)?)?;
429 let store = remote_store_config.make()?;
430 write_manifest(manifest, store).await?;
431 Ok(())
432}
433
434pub async fn verify_archive_with_genesis_config(
437 genesis: &std::path::Path,
438 remote_store_config: ObjectStoreConfig,
439 concurrency: usize,
440 interactive: bool,
441 num_retries: u32,
442) -> Result<()> {
443 let genesis = Genesis::load(genesis).unwrap();
444 let genesis_committee = genesis.committee()?;
445 let mut store = SingleCheckpointSharedInMemoryStore::default();
446 let contents = genesis.checkpoint_contents();
447 let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
448 contents.clone(),
449 std::iter::once(ExecutionData::new(
450 genesis.transaction().clone(),
451 genesis.effects().clone(),
452 )),
453 );
454 store.insert_genesis_state(
455 genesis.checkpoint(),
456 VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
457 genesis_committee,
458 );
459
460 let num_retries = std::cmp::max(num_retries, 1);
461 for _ in 0..num_retries {
462 match verify_archive_with_local_store(
463 store.clone(),
464 remote_store_config.clone(),
465 concurrency,
466 interactive,
467 )
468 .await
469 {
470 Ok(_) => return Ok(()),
471 Err(e) => {
472 error!("Error while verifying archive: {}", e);
473 tokio::time::sleep(Duration::from_secs(10)).await;
474 }
475 }
476 }
477
478 bail!("Failed to verify archive after {} retries", num_retries)
479}
480
481pub async fn verify_archive_with_checksums(
482 remote_store_config: ObjectStoreConfig,
483 concurrency: usize,
484) -> Result<()> {
485 let metrics = ArchiveReaderMetrics::new(&Registry::default());
486 let config = ArchiveReaderConfig {
487 remote_store_config,
488 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
489 use_for_pruning_watermark: false,
490 };
491 let archive_reader = ArchiveReader::new(config, &metrics)?;
493 archive_reader.sync_manifest_once().await?;
494 let manifest = archive_reader.get_manifest().await?;
495 info!(
496 "Next checkpoint in archive store: {}",
497 manifest.next_checkpoint_seq_num()
498 );
499
500 let file_metadata = archive_reader.verify_manifest(manifest).await?;
501 let num_files = file_metadata.len() * 2;
503 archive_reader
504 .verify_file_consistency(file_metadata)
505 .await?;
506 info!("All {} files are valid", num_files);
507 Ok(())
508}
509
510pub async fn verify_archive_with_local_store<S>(
513 store: S,
514 remote_store_config: ObjectStoreConfig,
515 concurrency: usize,
516 interactive: bool,
517) -> Result<()>
518where
519 S: WriteStore + Clone + Send + 'static,
520{
521 let metrics = ArchiveReaderMetrics::new(&Registry::default());
522 let config = ArchiveReaderConfig {
523 remote_store_config,
524 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
525 use_for_pruning_watermark: false,
526 };
527 let archive_reader = ArchiveReader::new(config, &metrics)?;
528 archive_reader.sync_manifest_once().await?;
530 let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
531 info!(
532 "Latest available checkpoint in archive store: {}",
533 latest_checkpoint_in_archive
534 );
535 let latest_checkpoint = store
536 .try_get_highest_synced_checkpoint()
537 .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
538 .sequence_number;
539 info!("Highest synced checkpoint in db: {latest_checkpoint}");
540 let txn_counter = Arc::new(AtomicU64::new(0));
541 let checkpoint_counter = Arc::new(AtomicU64::new(0));
542 let progress_bar = if interactive {
543 let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
544 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
545 .unwrap(),
546 );
547 let cloned_progress_bar = progress_bar.clone();
548 let cloned_counter = txn_counter.clone();
549 let cloned_checkpoint_counter = checkpoint_counter.clone();
550 let instant = Instant::now();
551 tokio::spawn(async move {
552 loop {
553 let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
554 let total_checkpoints_per_sec =
555 total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
556 let total_txns_per_sec =
557 cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
558 cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
559 cloned_progress_bar.set_message(format!(
560 "checkpoints/s: {total_checkpoints_per_sec}, txns/s: {total_txns_per_sec}"
561 ));
562 tokio::time::sleep(Duration::from_secs(1)).await;
563 }
564 });
565 Some(progress_bar)
566 } else {
567 let cloned_store = store.clone();
568 tokio::spawn(async move {
569 loop {
570 let latest_checkpoint = cloned_store
571 .try_get_highest_synced_checkpoint()
572 .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
573 .sequence_number;
574 let percent = (latest_checkpoint * 100) / latest_checkpoint_in_archive;
575 info!("done = {percent}%");
576 tokio::time::sleep(Duration::from_secs(60)).await;
577 if percent >= 100 {
578 break;
579 }
580 }
581 Ok::<(), anyhow::Error>(())
582 });
583 None
584 };
585 archive_reader
588 .read(
589 store.clone(),
590 (latest_checkpoint + 1)..u64::MAX,
591 txn_counter,
592 checkpoint_counter,
593 true,
594 )
595 .await?;
596 progress_bar.iter().for_each(|p| p.finish_and_clear());
597 let end = store
598 .try_get_highest_synced_checkpoint()
599 .map_err(|_| anyhow!("Failed to read watermark"))?
600 .sequence_number;
601 info!("Highest verified checkpoint: {}", end);
602 Ok(())
603}