1use std::{
6 collections::BTreeMap,
7 fs,
8 fs::File,
9 io::{BufReader, Read, Seek, SeekFrom},
10 num::NonZeroUsize,
11 path::PathBuf,
12 sync::{
13 Arc,
14 atomic::{AtomicU64, AtomicUsize, Ordering},
15 },
16};
17
18use anyhow::{Context, Result, anyhow};
19use byteorder::{BigEndian, ReadBytesExt};
20use bytes::{Buf, Bytes};
21use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
22use futures::{
23 StreamExt, TryStreamExt,
24 future::{AbortRegistration, Abortable},
25};
26use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
27use integer_encoding::VarIntReader;
28use iota_config::object_storage_config::ObjectStoreConfig;
29use iota_core::authority::{
30 AuthorityStore,
31 authority_store_tables::{AuthorityPerpetualTables, LiveObject},
32};
33use iota_storage::{
34 blob::{Blob, BlobEncoding},
35 object_store::{
36 ObjectStoreGetExt, ObjectStorePutExt,
37 http::HttpDownloaderBuilder,
38 util::{copy_file, copy_files, path_to_filesystem},
39 },
40};
41use iota_types::{
42 accumulator::Accumulator,
43 base_types::{ObjectDigest, ObjectID, ObjectRef, SequenceNumber},
44};
45use object_store::path::Path;
46use tokio::{
47 sync::Mutex,
48 task::JoinHandle,
49 time::{Duration, Instant},
50};
51use tracing::{error, info};
52
53use crate::{
54 FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC, Manifest, OBJECT_FILE_MAGIC,
55 OBJECT_ID_BYTES, OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC, SEQUENCE_NUM_BYTES, SHA3_BYTES,
56};
57
58pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator);
59pub type DigestByBucketAndPartition = BTreeMap<u32, BTreeMap<u32, [u8; 32]>>;
60pub struct StateSnapshotReaderV1 {
61 epoch: u64,
62 local_staging_dir_root: PathBuf,
63 remote_object_store: Arc<dyn ObjectStoreGetExt>,
64 local_object_store: Arc<dyn ObjectStorePutExt>,
65 ref_files: BTreeMap<u32, BTreeMap<u32, FileMetadata>>,
66 object_files: BTreeMap<u32, BTreeMap<u32, FileMetadata>>,
67 indirect_objects_threshold: usize,
68 multi_progress_bar: MultiProgress,
69 concurrency: usize,
70}
71
72impl StateSnapshotReaderV1 {
73 pub async fn new(
76 epoch: u64,
77 remote_store_config: &ObjectStoreConfig,
78 local_store_config: &ObjectStoreConfig,
79 indirect_objects_threshold: usize,
80 download_concurrency: NonZeroUsize,
81 multi_progress_bar: MultiProgress,
82 ) -> Result<Self> {
83 let epoch_dir = format!("epoch_{}", epoch);
84 let remote_object_store = if remote_store_config.no_sign_request {
85 remote_store_config.make_http()?
86 } else {
87 remote_store_config.make().map(Arc::new)?
88 };
89 let local_object_store: Arc<dyn ObjectStorePutExt> =
90 local_store_config.make().map(Arc::new)?;
91 let local_staging_dir_root = local_store_config
92 .directory
93 .as_ref()
94 .context("No directory specified")?
95 .clone();
96 let local_epoch_dir_path = local_staging_dir_root.join(&epoch_dir);
97 if local_epoch_dir_path.exists() {
98 fs::remove_dir_all(&local_epoch_dir_path)?;
99 }
100 fs::create_dir_all(&local_epoch_dir_path)?;
101 let manifest_file_path = Path::from(epoch_dir.clone()).child("MANIFEST");
103 copy_file(
104 &manifest_file_path,
105 &manifest_file_path,
106 &remote_object_store,
107 &local_object_store,
108 )
109 .await?;
110 let manifest = Self::read_manifest(path_to_filesystem(
111 local_staging_dir_root.clone(),
112 &manifest_file_path,
113 )?)?;
114 let snapshot_version = manifest.snapshot_version();
116 if snapshot_version != 1u8 {
117 return Err(anyhow!("Unexpected snapshot version: {}", snapshot_version));
118 }
119 if manifest.address_length() as usize > ObjectID::LENGTH {
120 return Err(anyhow!(
121 "Max possible address length is: {}",
122 ObjectID::LENGTH
123 ));
124 }
125 if manifest.epoch() != epoch {
126 return Err(anyhow!("Download manifest is not for epoch: {}", epoch,));
127 }
128 let mut object_files = BTreeMap::new();
131 let mut ref_files = BTreeMap::new();
132 for file_metadata in manifest.file_metadata() {
133 match file_metadata.file_type {
134 FileType::Object => {
135 let entry = object_files
138 .entry(file_metadata.bucket_num)
139 .or_insert_with(BTreeMap::new);
140 entry.insert(file_metadata.part_num, file_metadata.clone());
142 }
143 FileType::Reference => {
144 let entry = ref_files
147 .entry(file_metadata.bucket_num)
148 .or_insert_with(BTreeMap::new);
149 entry.insert(file_metadata.part_num, file_metadata.clone());
151 }
152 }
153 }
154 let epoch_dir_path = Path::from(epoch_dir);
155 let files: Vec<Path> = ref_files
157 .values()
158 .flat_map(|entry| {
159 let files: Vec<_> = entry
160 .values()
161 .map(|file_metadata| file_metadata.file_path(&epoch_dir_path))
162 .collect();
163 files
164 })
165 .collect();
166
167 let progress_bar = multi_progress_bar.add(
168 ProgressBar::new(files.len() as u64).with_style(
169 ProgressStyle::with_template(
170 "[{elapsed_precise}] {wide_bar} {pos} out of {len} .ref files done ({msg})",
171 )
172 .unwrap(),
173 ),
174 );
175 copy_files(
178 &files,
179 &files,
180 &remote_object_store,
181 &local_object_store,
182 download_concurrency,
183 Some(progress_bar.clone()),
184 )
185 .await?;
186 progress_bar.finish_with_message("ref files download complete");
187 Ok(StateSnapshotReaderV1 {
188 epoch,
189 local_staging_dir_root,
190 remote_object_store,
191 local_object_store,
192 ref_files,
193 object_files,
194 indirect_objects_threshold,
195 multi_progress_bar,
196 concurrency: download_concurrency.get(),
197 })
198 }
199
200 pub async fn read(
201 &mut self,
202 perpetual_db: &AuthorityPerpetualTables,
203 abort_registration: AbortRegistration,
204 sender: Option<tokio::sync::mpsc::Sender<(Accumulator, u64)>>,
205 ) -> Result<()> {
206 let sha3_digests: Arc<Mutex<DigestByBucketAndPartition>> =
214 Arc::new(Mutex::new(BTreeMap::new()));
215
216 let num_part_files = self
218 .ref_files
219 .values()
220 .map(|part_files| part_files.len())
221 .sum::<usize>();
222
223 info!("Computing checksums");
224 let checksum_progress_bar = self.multi_progress_bar.add(
226 ProgressBar::new(num_part_files as u64).with_style(
227 ProgressStyle::with_template(
228 "[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files checksummed ({msg})",
229 )
230 .unwrap(),
231 ),
232 );
233
234 for (bucket, part_files) in self.ref_files.clone().iter() {
237 for (part, _part_file) in part_files.iter() {
238 let mut sha3_digests = sha3_digests.lock().await;
239 let ref_iter = self.ref_iter(*bucket, *part)?;
240 let mut hasher = Sha3_256::default();
241 let mut empty = true;
242 self.object_files
245 .get(bucket)
246 .context(format!("No bucket exists for: {bucket}"))?
247 .get(part)
248 .context(format!("No part exists for bucket: {bucket}, part: {part}"))?;
249 for object_ref in ref_iter {
251 hasher.update(object_ref.2.inner());
252 empty = false;
253 }
254 if !empty {
257 sha3_digests
258 .entry(*bucket)
259 .or_insert(BTreeMap::new())
260 .entry(*part)
261 .or_insert(hasher.finalize().digest);
262 }
263 checksum_progress_bar.inc(1);
264 checksum_progress_bar.set_message(format!("Bucket: {}, Part: {}", bucket, part));
265 }
266 }
267 checksum_progress_bar.finish_with_message("Checksumming complete");
268
269 let accum_handle =
270 sender.map(|sender| self.spawn_accumulation_tasks(sender, num_part_files));
271
272 self.sync_live_objects(perpetual_db, abort_registration, sha3_digests)
275 .await?;
276
277 if let Some(handle) = accum_handle {
278 handle.await?;
279 }
280 Ok(())
281 }
282
283 fn spawn_accumulation_tasks(
286 &self,
287 sender: tokio::sync::mpsc::Sender<(Accumulator, u64)>,
288 num_part_files: usize,
289 ) -> JoinHandle<()> {
290 let concurrency = self.concurrency;
292 let accum_counter = Arc::new(AtomicU64::new(0));
293 let cloned_accum_counter = accum_counter.clone();
294 let accum_progress_bar = self.multi_progress_bar.add(
295 ProgressBar::new(num_part_files as u64).with_style(
296 ProgressStyle::with_template(
297 "[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files accumulated from snapshot ({msg})",
298 )
299 .unwrap(),
300 ),
301 );
302 let cloned_accum_progress_bar = accum_progress_bar.clone();
303 tokio::spawn(async move {
305 let a_instant = Instant::now();
306 loop {
307 if cloned_accum_progress_bar.is_finished() {
308 break;
309 }
310 let num_partitions = cloned_accum_counter.load(Ordering::Relaxed);
311 let total_partitions_per_sec =
312 num_partitions as f64 / a_instant.elapsed().as_secs_f64();
313 cloned_accum_progress_bar.set_position(num_partitions);
314 cloned_accum_progress_bar.set_message(format!(
315 "file partitions per sec: {}",
316 total_partitions_per_sec
317 ));
318 tokio::time::sleep(Duration::from_secs(1)).await;
319 }
320 });
321
322 let ref_files = self.ref_files.clone();
324 let epoch_dir = self.epoch_dir();
325 let local_staging_dir_root = self.local_staging_dir_root.clone();
326 tokio::task::spawn(async move {
327 let local_staging_dir_root_clone = local_staging_dir_root.clone();
328 let epoch_dir_clone = epoch_dir.clone();
329 for (bucket, part_files) in ref_files.clone().iter() {
330 futures::stream::iter(part_files.iter())
331 .map(|(part, _part_files)| {
332 let obj_digests = {
338 let file_metadata = ref_files
341 .get(bucket)
342 .expect("No ref files found for bucket: {bucket_num}")
343 .get(part)
344 .expect(
345 "No ref files found for bucket: {bucket_num}, part: {part_num}",
346 );
347 ObjectRefIter::new(
348 file_metadata,
349 local_staging_dir_root_clone.clone(),
350 epoch_dir_clone.clone(),
351 )
352 .expect("Failed to create object ref iter")
353 }
354 .map(|obj_ref| obj_ref.2)
355 .collect::<Vec<ObjectDigest>>();
356
357 let sender_clone = sender.clone();
360 tokio::spawn(async move {
361 let mut partial_acc = Accumulator::default();
362 let num_objects = obj_digests.len();
363 partial_acc.insert_all(obj_digests);
364 sender_clone
365 .send((partial_acc, num_objects as u64))
366 .await
367 .expect("Unable to send accumulator from snapshot reader");
368 })
369 })
370 .boxed()
371 .buffer_unordered(concurrency)
372 .for_each(|result| {
373 result.expect("Failed to generate partial accumulator");
375 accum_counter.fetch_add(1, Ordering::Relaxed);
376 futures::future::ready(())
377 })
378 .await;
379 }
380 accum_progress_bar.finish_with_message("Accumulation complete");
381 })
382 }
383
384 async fn sync_live_objects(
387 &self,
388 perpetual_db: &AuthorityPerpetualTables,
389 abort_registration: AbortRegistration,
390 sha3_digests: Arc<Mutex<DigestByBucketAndPartition>>,
391 ) -> Result<(), anyhow::Error> {
392 let epoch_dir = self.epoch_dir();
393 let concurrency = self.concurrency;
394 let threshold = self.indirect_objects_threshold;
395 let remote_object_store = self.remote_object_store.clone();
396 let input_files: Vec<_> = self
399 .object_files
400 .iter()
401 .flat_map(|(bucket, parts)| {
402 parts
403 .clone()
404 .into_iter()
405 .map(|entry| (bucket, entry))
406 .collect::<Vec<_>>()
407 })
408 .collect();
409 let obj_progress_bar = self.multi_progress_bar.add(
411 ProgressBar::new(input_files.len() as u64).with_style(
412 ProgressStyle::with_template(
413 "[{elapsed_precise}] {wide_bar} {pos} out of {len} .obj files done ({msg})",
414 )
415 .unwrap(),
416 ),
417 );
418 let obj_progress_bar_clone = obj_progress_bar.clone();
419 let instant = Instant::now();
420 let downloaded_bytes = AtomicUsize::new(0);
421
422 let ret = Abortable::new(
423 async move {
424 futures::stream::iter(input_files.iter())
427 .map(|(bucket, (part_num, file_metadata))| {
428 let epoch_dir = epoch_dir.clone();
429 let file_path = file_metadata.file_path(&epoch_dir);
430 let remote_object_store = remote_object_store.clone();
431 let sha3_digests_cloned = sha3_digests.clone();
432 async move {
433 let max_timeout = Duration::from_secs(60);
435 let mut timeout = Duration::from_secs(2);
436 timeout += timeout / 2;
437 timeout = std::cmp::min(max_timeout, timeout);
438 let mut attempts = 0usize;
439 let bytes = loop {
440 match remote_object_store.get_bytes(&file_path).await {
441 Ok(bytes) => {
442 break bytes;
443 }
444 Err(err) => {
445 error!(
446 "Obj {} .get failed (attempt {}): {}",
447 file_metadata.file_path(&epoch_dir),
448 attempts,
449 err,
450 );
451 if timeout > max_timeout {
452 panic!(
453 "Failed to get obj file {} after {} attempts",
454 file_metadata.file_path(&epoch_dir),
455 attempts,
456 );
457 } else {
458 attempts += 1;
459 tokio::time::sleep(timeout).await;
460 timeout += timeout / 2;
461 continue;
462 }
463 }
464 }
465 };
466
467 let sha3_digest = sha3_digests_cloned.lock().await;
469 let bucket_map = sha3_digest
470 .get(bucket)
471 .expect("Bucket not in digest map")
472 .clone();
473 let sha3_digest = *bucket_map
474 .get(part_num)
475 .expect("sha3 digest not in bucket map");
476 Ok::<(Bytes, FileMetadata, [u8; 32]), anyhow::Error>((
477 bytes,
478 (*file_metadata).clone(),
479 sha3_digest,
480 ))
481 }
482 })
483 .boxed()
484 .buffer_unordered(concurrency)
485 .try_for_each(|(bytes, file_metadata, sha3_digest)| {
486 let bytes_len = bytes.len();
487 let result: Result<(), anyhow::Error> =
489 LiveObjectIter::new(&file_metadata, bytes).map(|obj_iter| {
490 AuthorityStore::bulk_insert_live_objects(
491 perpetual_db,
492 obj_iter,
493 threshold,
494 &sha3_digest,
495 )
496 .expect("Failed to insert live objects");
497 });
498 downloaded_bytes.fetch_add(bytes_len, Ordering::Relaxed);
499 obj_progress_bar_clone.inc(1);
501 obj_progress_bar_clone.set_message(format!(
502 "Download speed: {} MiB/s",
503 downloaded_bytes.load(Ordering::Relaxed) as f64
504 / (1024 * 1024) as f64
505 / instant.elapsed().as_secs_f64(),
506 ));
507 futures::future::ready(result)
508 })
509 .await
510 },
511 abort_registration,
512 )
513 .await?;
514 obj_progress_bar.finish_with_message("Objects download complete");
515 ret
516 }
517
518 pub fn ref_iter(&self, bucket_num: u32, part_num: u32) -> Result<ObjectRefIter> {
520 let file_metadata = self
522 .ref_files
523 .get(&bucket_num)
524 .context(format!("No ref files found for bucket: {bucket_num}"))?
525 .get(&part_num)
526 .context(format!(
527 "No ref files found for bucket: {bucket_num}, part: {part_num}"
528 ))?;
529 ObjectRefIter::new(
530 file_metadata,
531 self.local_staging_dir_root.clone(),
532 self.epoch_dir(),
533 )
534 }
535
536 fn buckets(&self) -> Result<Vec<u32>> {
538 Ok(self.ref_files.keys().copied().collect())
539 }
540
541 fn epoch_dir(&self) -> Path {
542 Path::from(format!("epoch_{}", self.epoch))
543 }
544
545 fn read_manifest(path: PathBuf) -> anyhow::Result<Manifest> {
548 let manifest_file = File::open(path)?;
549 let manifest_file_size = manifest_file.metadata()?.len() as usize;
550 let mut manifest_reader = BufReader::new(manifest_file);
551 manifest_reader.rewind()?;
553 let magic = manifest_reader.read_u32::<BigEndian>()?;
554 if magic != MANIFEST_FILE_MAGIC {
555 return Err(anyhow!("Unexpected magic byte: {}", magic));
556 }
557 manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
559 let mut sha3_digest = [0u8; SHA3_BYTES];
560 manifest_reader.read_exact(&mut sha3_digest)?;
561 manifest_reader.rewind()?;
563 let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
564 manifest_reader.read_exact(&mut content_buf)?;
565 let mut hasher = Sha3_256::default();
568 hasher.update(&content_buf);
569 let computed_digest = hasher.finalize().digest;
570 if computed_digest != sha3_digest {
571 return Err(anyhow!(
572 "Checksum: {:?} don't match: {:?}",
573 computed_digest,
574 sha3_digest
575 ));
576 }
577 manifest_reader.rewind()?;
578 manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
579 let manifest = bcs::from_bytes(&content_buf[MAGIC_BYTES..])?;
580 Ok(manifest)
581 }
582}
583
584pub struct ObjectRefIter {
586 reader: Box<dyn Read>,
587}
588
589impl ObjectRefIter {
590 pub fn new(file_metadata: &FileMetadata, root_path: PathBuf, dir_path: Path) -> Result<Self> {
591 let file_path = file_metadata.local_file_path(&root_path, &dir_path)?;
592 let mut reader = file_metadata.file_compression.decompress(&file_path)?;
593 let magic = reader.read_u32::<BigEndian>()?;
594 if magic != REFERENCE_FILE_MAGIC {
595 Err(anyhow!(
596 "Unexpected magic string in REFERENCE file: {:?}",
597 magic
598 ))
599 } else {
600 Ok(ObjectRefIter { reader })
601 }
602 }
603
604 fn next_ref(&mut self) -> Result<ObjectRef> {
605 let mut buf = [0u8; OBJECT_REF_BYTES];
606 self.reader.read_exact(&mut buf)?;
607 let object_id = &buf[0..OBJECT_ID_BYTES];
608 let sequence_number = &buf[OBJECT_ID_BYTES..OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES]
609 .reader()
610 .read_u64::<BigEndian>()?;
611 let sha3_digest = &buf[OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES..OBJECT_REF_BYTES];
612 let object_ref: ObjectRef = (
613 ObjectID::from_bytes(object_id)?,
614 SequenceNumber::from_u64(*sequence_number),
615 ObjectDigest::try_from(sha3_digest)?,
616 );
617 Ok(object_ref)
618 }
619}
620
621impl Iterator for ObjectRefIter {
622 type Item = ObjectRef;
623 fn next(&mut self) -> Option<Self::Item> {
624 self.next_ref().ok()
625 }
626}
627
628pub struct LiveObjectIter {
630 reader: Box<dyn Read>,
631}
632
633impl LiveObjectIter {
634 pub fn new(file_metadata: &FileMetadata, bytes: Bytes) -> Result<Self> {
635 let mut reader = file_metadata.file_compression.bytes_decompress(bytes)?;
636 let magic = reader.read_u32::<BigEndian>()?;
637 if magic != OBJECT_FILE_MAGIC {
638 Err(anyhow!(
639 "Unexpected magic string in object file: {:?}",
640 magic
641 ))
642 } else {
643 Ok(LiveObjectIter { reader })
644 }
645 }
646
647 fn next_object(&mut self) -> Result<LiveObject> {
648 let len = self.reader.read_varint::<u64>()? as usize;
649 if len == 0 {
650 return Err(anyhow!("Invalid object length of 0 in file"));
651 }
652 let encoding = self.reader.read_u8()?;
653 let mut data = vec![0u8; len];
654 self.reader.read_exact(&mut data)?;
655 let blob = Blob {
656 data,
657 encoding: BlobEncoding::try_from(encoding)?,
658 };
659 blob.decode()
660 }
661}
662
663impl Iterator for LiveObjectIter {
664 type Item = LiveObject;
665 fn next(&mut self) -> Option<Self::Item> {
666 self.next_object().ok()
667 }
668}