1#![allow(dead_code)]
6
7pub mod reader;
8pub mod writer;
9
10#[cfg(test)]
11mod tests;
12
13use std::{
14 fs,
15 io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write},
16 num::NonZeroUsize,
17 ops::Range,
18 sync::{
19 Arc,
20 atomic::{AtomicU64, Ordering},
21 },
22 time::{Duration, Instant},
23};
24
25use anyhow::{Result, anyhow};
26use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
27use bytes::Bytes;
28use fastcrypto::hash::{HashFunction, Sha3_256};
29use indicatif::{ProgressBar, ProgressStyle};
30use iota_config::{
31 genesis::Genesis, node::ArchiveReaderConfig, object_storage_config::ObjectStoreConfig,
32};
33use iota_storage::{
34 SHA3_BYTES,
35 blob::{Blob, BlobEncoding},
36 compute_sha3_checksum, compute_sha3_checksum_for_bytes,
37 object_store::{
38 ObjectStoreGetExt, ObjectStorePutExt,
39 util::{get, put},
40 },
41};
42use iota_types::{
43 base_types::ExecutionData,
44 messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents},
45 storage::{SingleCheckpointSharedInMemoryStore, WriteStore},
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use prometheus::Registry;
50use serde::{Deserialize, Serialize};
51use tracing::{error, info};
52
53use crate::reader::{ArchiveReader, ArchiveReaderMetrics};
54
55#[expect(rustdoc::invalid_html_tags)]
56pub const CHECKPOINT_FILE_MAGIC: u32 = 0x0000DEAD;
111pub const SUMMARY_FILE_MAGIC: u32 = 0x0000CAFE;
112const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
113const MAGIC_BYTES: usize = 4;
114const CHECKPOINT_FILE_SUFFIX: &str = "chk";
115const SUMMARY_FILE_SUFFIX: &str = "sum";
116const EPOCH_DIR_PREFIX: &str = "epoch_";
117const MANIFEST_FILENAME: &str = "MANIFEST";
118
119#[derive(
120 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
121)]
122#[repr(u8)]
123pub enum FileType {
124 CheckpointContent = 0,
125 CheckpointSummary,
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
129pub struct FileMetadata {
130 pub file_type: FileType,
131 pub epoch_num: u64,
132 pub checkpoint_seq_range: Range<u64>,
133 pub sha3_digest: [u8; 32],
134}
135
136impl FileMetadata {
137 pub fn file_path(&self) -> Path {
138 let dir_path = Path::from(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num));
139 match self.file_type {
140 FileType::CheckpointContent => dir_path.child(&*format!(
141 "{}.{CHECKPOINT_FILE_SUFFIX}",
142 self.checkpoint_seq_range.start
143 )),
144 FileType::CheckpointSummary => dir_path.child(&*format!(
145 "{}.{SUMMARY_FILE_SUFFIX}",
146 self.checkpoint_seq_range.start
147 )),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
153pub struct ManifestV1 {
154 pub archive_version: u8,
155 pub next_checkpoint_seq_num: u64,
156 pub file_metadata: Vec<FileMetadata>,
157 pub epoch: u64,
158}
159
160#[derive(Debug, Serialize, Deserialize, Clone, Eq, PartialEq)]
161pub enum Manifest {
162 V1(ManifestV1),
163}
164
165impl Manifest {
166 pub fn new(epoch: u64, next_checkpoint_seq_num: u64) -> Self {
167 Manifest::V1(ManifestV1 {
168 archive_version: 1,
169 next_checkpoint_seq_num,
170 file_metadata: vec![],
171 epoch,
172 })
173 }
174 pub fn files(&self) -> Vec<FileMetadata> {
175 match self {
176 Manifest::V1(manifest) => manifest.file_metadata.clone(),
177 }
178 }
179 pub fn epoch_num(&self) -> u64 {
180 match self {
181 Manifest::V1(manifest) => manifest.epoch,
182 }
183 }
184 pub fn next_checkpoint_seq_num(&self) -> u64 {
185 match self {
186 Manifest::V1(manifest) => manifest.next_checkpoint_seq_num,
187 }
188 }
189 pub fn next_checkpoint_after_epoch(&self, epoch_num: u64) -> u64 {
190 match self {
191 Manifest::V1(manifest) => {
192 let mut summary_files: Vec<_> = manifest
193 .file_metadata
194 .clone()
195 .into_iter()
196 .filter(|f| f.file_type == FileType::CheckpointSummary)
197 .collect();
198 summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
199 assert!(
200 summary_files
201 .windows(2)
202 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
203 );
204 assert_eq!(summary_files.first().unwrap().checkpoint_seq_range.start, 0);
205 summary_files
206 .iter()
207 .find(|f| f.epoch_num > epoch_num)
208 .map(|f| f.checkpoint_seq_range.start)
209 .unwrap_or(u64::MAX)
210 }
211 }
212 }
213 pub fn update(
214 &mut self,
215 epoch_num: u64,
216 checkpoint_sequence_number: u64,
217 checkpoint_file_metadata: FileMetadata,
218 summary_file_metadata: FileMetadata,
219 ) {
220 match self {
221 Manifest::V1(manifest) => {
222 manifest
223 .file_metadata
224 .extend(vec![checkpoint_file_metadata, summary_file_metadata]);
225 manifest.epoch = epoch_num;
226 manifest.next_checkpoint_seq_num = checkpoint_sequence_number;
227 }
228 }
229 }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
233pub struct CheckpointUpdates {
234 checkpoint_file_metadata: FileMetadata,
235 summary_file_metadata: FileMetadata,
236 manifest: Manifest,
237}
238
239impl CheckpointUpdates {
240 pub fn new(
241 epoch_num: u64,
242 checkpoint_sequence_number: u64,
243 checkpoint_file_metadata: FileMetadata,
244 summary_file_metadata: FileMetadata,
245 manifest: &mut Manifest,
246 ) -> Self {
247 manifest.update(
248 epoch_num,
249 checkpoint_sequence_number,
250 checkpoint_file_metadata.clone(),
251 summary_file_metadata.clone(),
252 );
253 CheckpointUpdates {
254 checkpoint_file_metadata,
255 summary_file_metadata,
256 manifest: manifest.clone(),
257 }
258 }
259 pub fn content_file_path(&self) -> Path {
260 self.checkpoint_file_metadata.file_path()
261 }
262 pub fn summary_file_path(&self) -> Path {
263 self.summary_file_metadata.file_path()
264 }
265 pub fn manifest_file_path(&self) -> Path {
266 Path::from(MANIFEST_FILENAME)
267 }
268}
269
270pub fn create_file_metadata(
271 file_path: &std::path::Path,
272 file_type: FileType,
273 epoch_num: u64,
274 checkpoint_seq_range: Range<u64>,
275) -> Result<FileMetadata> {
276 let sha3_digest = compute_sha3_checksum(file_path)?;
277 let file_metadata = FileMetadata {
278 file_type,
279 epoch_num,
280 checkpoint_seq_range,
281 sha3_digest,
282 };
283 Ok(file_metadata)
284}
285
286pub fn create_file_metadata_from_bytes(
287 bytes: Bytes,
288 file_type: FileType,
289 epoch_num: u64,
290 checkpoint_seq_range: Range<u64>,
291) -> Result<FileMetadata> {
292 let sha3_digest = compute_sha3_checksum_for_bytes(bytes)?;
293 let file_metadata = FileMetadata {
294 file_type,
295 epoch_num,
296 checkpoint_seq_range,
297 sha3_digest,
298 };
299 Ok(file_metadata)
300}
301
302pub async fn read_manifest<S: ObjectStoreGetExt>(remote_store: S) -> Result<Manifest> {
304 let manifest_file_path = Path::from(MANIFEST_FILENAME);
305 let vec = get(&remote_store, &manifest_file_path).await?.to_vec();
306 read_manifest_from_bytes(vec)
307}
308
309pub fn read_manifest_from_bytes(vec: Vec<u8>) -> Result<Manifest> {
312 let manifest_file_size = vec.len();
313 let mut manifest_reader = Cursor::new(vec);
314
315 manifest_reader.rewind()?;
318 let magic = manifest_reader.read_u32::<BigEndian>()?;
319 if magic != MANIFEST_FILE_MAGIC {
320 return Err(anyhow!("Unexpected magic byte in manifest: {}", magic));
321 }
322
323 manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
326 let mut sha3_digest = [0u8; SHA3_BYTES];
327 manifest_reader.read_exact(&mut sha3_digest)?;
328
329 manifest_reader.rewind()?;
332 let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
333 manifest_reader.read_exact(&mut content_buf)?;
334 let mut hasher = Sha3_256::default();
335 hasher.update(&content_buf);
336 let computed_digest = hasher.finalize().digest;
337 if computed_digest != sha3_digest {
338 return Err(anyhow!(
339 "Manifest corrupted, computed checksum: {:?}, stored checksum: {:?}",
340 computed_digest,
341 sha3_digest
342 ));
343 }
344 manifest_reader.rewind()?;
345 manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
346 Blob::read(&mut manifest_reader)?.decode()
347}
348
349pub fn finalize_manifest(manifest: Manifest) -> Result<Bytes> {
351 let mut buf = BufWriter::new(vec![]);
352 buf.write_u32::<BigEndian>(MANIFEST_FILE_MAGIC)?;
353 let blob = Blob::encode(&manifest, BlobEncoding::Bcs)?;
354 blob.write(&mut buf)?;
355 buf.flush()?;
356 let mut hasher = Sha3_256::default();
357 hasher.update(buf.get_ref());
358 let computed_digest = hasher.finalize().digest;
359 buf.write_all(&computed_digest)?;
360 Ok(Bytes::from(buf.into_inner()?))
361}
362
363pub async fn write_manifest<S: ObjectStorePutExt>(
365 manifest: Manifest,
366 remote_store: S,
367) -> Result<()> {
368 let path = Path::from(MANIFEST_FILENAME);
369 let bytes = finalize_manifest(manifest)?;
370 put(&remote_store, &path, bytes).await?;
371 Ok(())
372}
373
374pub async fn read_manifest_as_json(remote_store_config: ObjectStoreConfig) -> Result<String> {
375 let metrics = ArchiveReaderMetrics::new(&Registry::default());
376 let config = ArchiveReaderConfig {
377 remote_store_config,
378 download_concurrency: NonZeroUsize::new(1).unwrap(),
379 use_for_pruning_watermark: false,
380 };
381 let archive_reader = ArchiveReader::new(config, &metrics)?;
382 archive_reader.sync_manifest_once().await?;
383 let manifest = archive_reader.get_manifest().await?;
384 let json = serde_json::to_string(&manifest).expect("Failed to serialize object");
385 Ok(json)
386}
387
388pub async fn write_manifest_from_json(
389 remote_store_config: ObjectStoreConfig,
390 json_manifest_path: std::path::PathBuf,
391) -> Result<()> {
392 let manifest: Manifest = serde_json::from_str(&fs::read_to_string(json_manifest_path)?)?;
393 let store = remote_store_config.make()?;
394 write_manifest(manifest, store).await?;
395 Ok(())
396}
397
398pub async fn verify_archive_with_genesis_config(
401 genesis: &std::path::Path,
402 remote_store_config: ObjectStoreConfig,
403 concurrency: usize,
404 interactive: bool,
405 num_retries: u32,
406) -> Result<()> {
407 let genesis = Genesis::load(genesis).unwrap();
408 let genesis_committee = genesis.committee()?;
409 let mut store = SingleCheckpointSharedInMemoryStore::default();
410 let contents = genesis.checkpoint_contents();
411 let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
412 contents.clone(),
413 std::iter::once(ExecutionData::new(
414 genesis.transaction().clone(),
415 genesis.effects().clone(),
416 )),
417 );
418 store.insert_genesis_state(
419 genesis.checkpoint(),
420 VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
421 genesis_committee,
422 );
423
424 let num_retries = std::cmp::max(num_retries, 1);
425 for _ in 0..num_retries {
426 match verify_archive_with_local_store(
427 store.clone(),
428 remote_store_config.clone(),
429 concurrency,
430 interactive,
431 )
432 .await
433 {
434 Ok(_) => return Ok(()),
435 Err(e) => {
436 error!("Error while verifying archive: {}", e);
437 tokio::time::sleep(Duration::from_secs(10)).await;
438 }
439 }
440 }
441
442 Err::<(), anyhow::Error>(anyhow!(
443 "Failed to verify archive after {} retries",
444 num_retries
445 ))
446}
447
448pub async fn verify_archive_with_checksums(
449 remote_store_config: ObjectStoreConfig,
450 concurrency: usize,
451) -> Result<()> {
452 let metrics = ArchiveReaderMetrics::new(&Registry::default());
453 let config = ArchiveReaderConfig {
454 remote_store_config,
455 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
456 use_for_pruning_watermark: false,
457 };
458 let archive_reader = ArchiveReader::new(config, &metrics)?;
460 archive_reader.sync_manifest_once().await?;
461 let manifest = archive_reader.get_manifest().await?;
462 info!(
463 "Next checkpoint in archive store: {}",
464 manifest.next_checkpoint_seq_num()
465 );
466
467 let file_metadata = archive_reader.verify_manifest(manifest).await?;
468 let num_files = file_metadata.len() * 2;
470 archive_reader
471 .verify_file_consistency(file_metadata)
472 .await?;
473 info!("All {} files are valid", num_files);
474 Ok(())
475}
476
477pub async fn verify_archive_with_local_store<S>(
480 store: S,
481 remote_store_config: ObjectStoreConfig,
482 concurrency: usize,
483 interactive: bool,
484) -> Result<()>
485where
486 S: WriteStore + Clone + Send + 'static,
487{
488 let metrics = ArchiveReaderMetrics::new(&Registry::default());
489 let config = ArchiveReaderConfig {
490 remote_store_config,
491 download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
492 use_for_pruning_watermark: false,
493 };
494 let archive_reader = ArchiveReader::new(config, &metrics)?;
495 archive_reader.sync_manifest_once().await?;
497 let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
498 info!(
499 "Latest available checkpoint in archive store: {}",
500 latest_checkpoint_in_archive
501 );
502 let latest_checkpoint = store
503 .get_highest_synced_checkpoint()
504 .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
505 .sequence_number;
506 info!("Highest synced checkpoint in db: {latest_checkpoint}");
507 let txn_counter = Arc::new(AtomicU64::new(0));
508 let checkpoint_counter = Arc::new(AtomicU64::new(0));
509 let progress_bar = if interactive {
510 let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
511 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
512 .unwrap(),
513 );
514 let cloned_progress_bar = progress_bar.clone();
515 let cloned_counter = txn_counter.clone();
516 let cloned_checkpoint_counter = checkpoint_counter.clone();
517 let instant = Instant::now();
518 tokio::spawn(async move {
519 loop {
520 let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
521 let total_checkpoints_per_sec =
522 total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
523 let total_txns_per_sec =
524 cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
525 cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
526 cloned_progress_bar.set_message(format!(
527 "checkpoints/s: {}, txns/s: {}",
528 total_checkpoints_per_sec, total_txns_per_sec
529 ));
530 tokio::time::sleep(Duration::from_secs(1)).await;
531 }
532 });
533 Some(progress_bar)
534 } else {
535 let cloned_store = store.clone();
536 tokio::spawn(async move {
537 loop {
538 let latest_checkpoint = cloned_store
539 .get_highest_synced_checkpoint()
540 .map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
541 .sequence_number;
542 let percent = (latest_checkpoint * 100) / latest_checkpoint_in_archive;
543 info!("done = {percent}%");
544 tokio::time::sleep(Duration::from_secs(60)).await;
545 if percent >= 100 {
546 break;
547 }
548 }
549 Ok::<(), anyhow::Error>(())
550 });
551 None
552 };
553 archive_reader
556 .read(
557 store.clone(),
558 (latest_checkpoint + 1)..u64::MAX,
559 txn_counter,
560 checkpoint_counter,
561 true,
562 )
563 .await?;
564 progress_bar.iter().for_each(|p| p.finish_and_clear());
565 let end = store
566 .get_highest_synced_checkpoint()
567 .map_err(|_| anyhow!("Failed to read watermark"))?
568 .sequence_number;
569 info!("Highest verified checkpoint: {}", end);
570 Ok(())
571}