iota_snapshot/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::BTreeMap,
7    fs,
8    fs::File,
9    io::{BufReader, Read, Seek, SeekFrom},
10    num::NonZeroUsize,
11    path::PathBuf,
12    sync::{
13        Arc,
14        atomic::{AtomicU64, AtomicUsize, Ordering},
15    },
16};
17
18use anyhow::{Context, Result, anyhow};
19use byteorder::{BigEndian, ReadBytesExt};
20use bytes::{Buf, Bytes};
21use fastcrypto::hash::{HashFunction, MultisetHash, Sha3_256};
22use futures::{
23    StreamExt, TryStreamExt,
24    future::{AbortRegistration, Abortable},
25};
26use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
27use integer_encoding::VarIntReader;
28use iota_config::object_storage_config::ObjectStoreConfig;
29use iota_core::authority::{
30    AuthorityStore,
31    authority_store_tables::{AuthorityPerpetualTables, LiveObject},
32};
33use iota_storage::{
34    blob::{Blob, BlobEncoding},
35    object_store::{
36        ObjectStoreGetExt, ObjectStorePutExt,
37        http::HttpDownloaderBuilder,
38        util::{copy_file, copy_files, path_to_filesystem},
39    },
40};
41use iota_types::{
42    accumulator::Accumulator,
43    base_types::{ObjectDigest, ObjectID, ObjectRef, SequenceNumber},
44};
45use object_store::path::Path;
46use tokio::{
47    sync::Mutex,
48    task::JoinHandle,
49    time::{Duration, Instant},
50};
51use tracing::{error, info};
52
53use crate::{
54    FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC, Manifest, OBJECT_FILE_MAGIC,
55    OBJECT_ID_BYTES, OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC, SEQUENCE_NUM_BYTES, SHA3_BYTES,
56};
57
58pub type SnapshotChecksums = (DigestByBucketAndPartition, Accumulator);
59pub type DigestByBucketAndPartition = BTreeMap<u32, BTreeMap<u32, [u8; 32]>>;
60pub struct StateSnapshotReaderV1 {
61    epoch: u64,
62    local_staging_dir_root: PathBuf,
63    remote_object_store: Arc<dyn ObjectStoreGetExt>,
64    local_object_store: Arc<dyn ObjectStorePutExt>,
65    ref_files: BTreeMap<u32, BTreeMap<u32, FileMetadata>>,
66    object_files: BTreeMap<u32, BTreeMap<u32, FileMetadata>>,
67    indirect_objects_threshold: usize,
68    multi_progress_bar: MultiProgress,
69    concurrency: usize,
70}
71
72impl StateSnapshotReaderV1 {
73    /// Downloads the MANIFEST, FileMetadata of objects and references from the
74    /// remote store, then creates a StateSnapshotReaderV1 instance.
75    pub async fn new(
76        epoch: u64,
77        remote_store_config: &ObjectStoreConfig,
78        local_store_config: &ObjectStoreConfig,
79        indirect_objects_threshold: usize,
80        download_concurrency: NonZeroUsize,
81        multi_progress_bar: MultiProgress,
82    ) -> Result<Self> {
83        let epoch_dir = format!("epoch_{}", epoch);
84        let remote_object_store = if remote_store_config.no_sign_request {
85            remote_store_config.make_http()?
86        } else {
87            remote_store_config.make().map(Arc::new)?
88        };
89        let local_object_store: Arc<dyn ObjectStorePutExt> =
90            local_store_config.make().map(Arc::new)?;
91        let local_staging_dir_root = local_store_config
92            .directory
93            .as_ref()
94            .context("No directory specified")?
95            .clone();
96        let local_epoch_dir_path = local_staging_dir_root.join(&epoch_dir);
97        if local_epoch_dir_path.exists() {
98            fs::remove_dir_all(&local_epoch_dir_path)?;
99        }
100        fs::create_dir_all(&local_epoch_dir_path)?;
101        // Downloads MANIFEST from remote store
102        let manifest_file_path = Path::from(epoch_dir.clone()).child("MANIFEST");
103        copy_file(
104            &manifest_file_path,
105            &manifest_file_path,
106            &remote_object_store,
107            &local_object_store,
108        )
109        .await?;
110        let manifest = Self::read_manifest(path_to_filesystem(
111            local_staging_dir_root.clone(),
112            &manifest_file_path,
113        )?)?;
114        // Verifies MANIFEST
115        let snapshot_version = manifest.snapshot_version();
116        if snapshot_version != 1u8 {
117            return Err(anyhow!("Unexpected snapshot version: {}", snapshot_version));
118        }
119        if manifest.address_length() as usize > ObjectID::LENGTH {
120            return Err(anyhow!(
121                "Max possible address length is: {}",
122                ObjectID::LENGTH
123            ));
124        }
125        if manifest.epoch() != epoch {
126            return Err(anyhow!("Download manifest is not for epoch: {}", epoch,));
127        }
128        // Stores the objects and references FileMetadata in MANIFEST to the local
129        // directory
130        let mut object_files = BTreeMap::new();
131        let mut ref_files = BTreeMap::new();
132        for file_metadata in manifest.file_metadata() {
133            match file_metadata.file_type {
134                FileType::Object => {
135                    // Gets the object FileMetadata bucket with the bucket number, or inserts a new
136                    // one if it doesn't exist.
137                    let entry = object_files
138                        .entry(file_metadata.bucket_num)
139                        .or_insert_with(BTreeMap::new);
140                    // Inserts the object FileMetadata with the partition number to the bucket.
141                    entry.insert(file_metadata.part_num, file_metadata.clone());
142                }
143                FileType::Reference => {
144                    // Gets the reference FileMetadata bucket with the bucket number, or inserts a
145                    // new one if it doesn't exist.
146                    let entry = ref_files
147                        .entry(file_metadata.bucket_num)
148                        .or_insert_with(BTreeMap::new);
149                    // Inserts the reference FileMetadata with the partition number to the bucket.
150                    entry.insert(file_metadata.part_num, file_metadata.clone());
151                }
152            }
153        }
154        let epoch_dir_path = Path::from(epoch_dir);
155        // Collects the path of all reference files
156        let files: Vec<Path> = ref_files
157            .values()
158            .flat_map(|entry| {
159                let files: Vec<_> = entry
160                    .values()
161                    .map(|file_metadata| file_metadata.file_path(&epoch_dir_path))
162                    .collect();
163                files
164            })
165            .collect();
166
167        let progress_bar = multi_progress_bar.add(
168            ProgressBar::new(files.len() as u64).with_style(
169                ProgressStyle::with_template(
170                    "[{elapsed_precise}] {wide_bar} {pos} out of {len} .ref files done ({msg})",
171                )
172                .unwrap(),
173            ),
174        );
175        // Downloads all reference files from remote store to local store in parallel
176        // and updates the progress bar accordingly
177        copy_files(
178            &files,
179            &files,
180            &remote_object_store,
181            &local_object_store,
182            download_concurrency,
183            Some(progress_bar.clone()),
184        )
185        .await?;
186        progress_bar.finish_with_message("ref files download complete");
187        Ok(StateSnapshotReaderV1 {
188            epoch,
189            local_staging_dir_root,
190            remote_object_store,
191            local_object_store,
192            ref_files,
193            object_files,
194            indirect_objects_threshold,
195            multi_progress_bar,
196            concurrency: download_concurrency.get(),
197        })
198    }
199
200    pub async fn read(
201        &mut self,
202        perpetual_db: &AuthorityPerpetualTables,
203        abort_registration: AbortRegistration,
204        sender: Option<tokio::sync::mpsc::Sender<(Accumulator, u64)>>,
205    ) -> Result<()> {
206        // This computes and stores the sha3 digest of object references in REFERENCE
207        // file for each bucket partition. When downloading objects, we will
208        // compare sha3 digest of object references per *.obj file against this.
209        // This allows us to pre-fetch object references during restoration,
210        // start building the state accumulator, and fail early if the state root hash
211        // doesn't match. However, we still need to ensure that objects match references
212        // exactly.
213        let sha3_digests: Arc<Mutex<DigestByBucketAndPartition>> =
214            Arc::new(Mutex::new(BTreeMap::new()));
215
216        // Counts the total number of partitions
217        let num_part_files = self
218            .ref_files
219            .values()
220            .map(|part_files| part_files.len())
221            .sum::<usize>();
222
223        info!("Computing checksums");
224        // Creates a progress bar for checksumming
225        let checksum_progress_bar = self.multi_progress_bar.add(
226            ProgressBar::new(num_part_files as u64).with_style(
227                ProgressStyle::with_template(
228                    "[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files checksummed ({msg})",
229                )
230                .unwrap(),
231            ),
232        );
233
234        // Iterates over all FileMetadata in the ref files by partition and build up the
235        // sha3 digests mapping: (bucket, (partition, sha3_digest))
236        for (bucket, part_files) in self.ref_files.clone().iter() {
237            for (part, _part_file) in part_files.iter() {
238                let mut sha3_digests = sha3_digests.lock().await;
239                let ref_iter = self.ref_iter(*bucket, *part)?;
240                let mut hasher = Sha3_256::default();
241                let mut empty = true;
242                // TODO: This can be removed, the same operation is done in ref_iter() in line
243                // 238
244                self.object_files
245                    .get(bucket)
246                    .context(format!("No bucket exists for: {bucket}"))?
247                    .get(part)
248                    .context(format!("No part exists for bucket: {bucket}, part: {part}"))?;
249                // Inserts the sha3 digest of each object into the hasher
250                for object_ref in ref_iter {
251                    hasher.update(object_ref.2.inner());
252                    empty = false;
253                }
254                // Computes the sha3 digest of the partition and insert sit into the
255                // sha3_digests map
256                if !empty {
257                    sha3_digests
258                        .entry(*bucket)
259                        .or_insert(BTreeMap::new())
260                        .entry(*part)
261                        .or_insert(hasher.finalize().digest);
262                }
263                checksum_progress_bar.inc(1);
264                checksum_progress_bar.set_message(format!("Bucket: {}, Part: {}", bucket, part));
265            }
266        }
267        checksum_progress_bar.finish_with_message("Checksumming complete");
268
269        let accum_handle =
270            sender.map(|sender| self.spawn_accumulation_tasks(sender, num_part_files));
271
272        // Downloads all object files from remote in parallel and inserts the objects
273        // into the AuthorityPerpetualTables
274        self.sync_live_objects(perpetual_db, abort_registration, sha3_digests)
275            .await?;
276
277        if let Some(handle) = accum_handle {
278            handle.await?;
279        }
280        Ok(())
281    }
282
283    /// Spawns accumulation tasks to accumulate the sha3 digests of all objects
284    /// then sends the accumulator to the sender.
285    fn spawn_accumulation_tasks(
286        &self,
287        sender: tokio::sync::mpsc::Sender<(Accumulator, u64)>,
288        num_part_files: usize,
289    ) -> JoinHandle<()> {
290        // Spawns accumulation progress bar
291        let concurrency = self.concurrency;
292        let accum_counter = Arc::new(AtomicU64::new(0));
293        let cloned_accum_counter = accum_counter.clone();
294        let accum_progress_bar = self.multi_progress_bar.add(
295             ProgressBar::new(num_part_files as u64).with_style(
296                 ProgressStyle::with_template(
297                     "[{elapsed_precise}] {wide_bar} {pos} out of {len} ref files accumulated from snapshot ({msg})",
298                 )
299                 .unwrap(),
300             ),
301         );
302        let cloned_accum_progress_bar = accum_progress_bar.clone();
303        // Spawns accumulation progress bar update task
304        tokio::spawn(async move {
305            let a_instant = Instant::now();
306            loop {
307                if cloned_accum_progress_bar.is_finished() {
308                    break;
309                }
310                let num_partitions = cloned_accum_counter.load(Ordering::Relaxed);
311                let total_partitions_per_sec =
312                    num_partitions as f64 / a_instant.elapsed().as_secs_f64();
313                cloned_accum_progress_bar.set_position(num_partitions);
314                cloned_accum_progress_bar.set_message(format!(
315                    "file partitions per sec: {}",
316                    total_partitions_per_sec
317                ));
318                tokio::time::sleep(Duration::from_secs(1)).await;
319            }
320        });
321
322        // spawns accumualation task
323        let ref_files = self.ref_files.clone();
324        let epoch_dir = self.epoch_dir();
325        let local_staging_dir_root = self.local_staging_dir_root.clone();
326        tokio::task::spawn(async move {
327            let local_staging_dir_root_clone = local_staging_dir_root.clone();
328            let epoch_dir_clone = epoch_dir.clone();
329            for (bucket, part_files) in ref_files.clone().iter() {
330                futures::stream::iter(part_files.iter())
331                    .map(|(part, _part_files)| {
332                        // TODO depending on concurrency limit here, we may be
333                        // materializing too many refs into memory at once.
334
335                        // Takes the sha3 digests of every object in the partition
336                        // This is only done because ObjectRefIter is not Send
337                        let obj_digests = {
338                            // TODO: Make sure that we can remove this getter, just take _part_files
339                            // here
340                            let file_metadata = ref_files
341                                .get(bucket)
342                                .expect("No ref files found for bucket: {bucket_num}")
343                                .get(part)
344                                .expect(
345                                    "No ref files found for bucket: {bucket_num}, part: {part_num}",
346                                );
347                            ObjectRefIter::new(
348                                file_metadata,
349                                local_staging_dir_root_clone.clone(),
350                                epoch_dir_clone.clone(),
351                            )
352                            .expect("Failed to create object ref iter")
353                        }
354                        .map(|obj_ref| obj_ref.2)
355                        .collect::<Vec<ObjectDigest>>();
356
357                        // Spawns a task to accumulate the sha3 digests and send the accumulator
358                        // to the sender.
359                        let sender_clone = sender.clone();
360                        tokio::spawn(async move {
361                            let mut partial_acc = Accumulator::default();
362                            let num_objects = obj_digests.len();
363                            partial_acc.insert_all(obj_digests);
364                            sender_clone
365                                .send((partial_acc, num_objects as u64))
366                                .await
367                                .expect("Unable to send accumulator from snapshot reader");
368                        })
369                    })
370                    .boxed()
371                    .buffer_unordered(concurrency)
372                    .for_each(|result| {
373                        // Update the progress bar
374                        result.expect("Failed to generate partial accumulator");
375                        accum_counter.fetch_add(1, Ordering::Relaxed);
376                        futures::future::ready(())
377                    })
378                    .await;
379            }
380            accum_progress_bar.finish_with_message("Accumulation complete");
381        })
382    }
383
384    /// Downloads all object files from remote in parallel and inserts the
385    /// objects into the AuthorityPerpetualTables.
386    async fn sync_live_objects(
387        &self,
388        perpetual_db: &AuthorityPerpetualTables,
389        abort_registration: AbortRegistration,
390        sha3_digests: Arc<Mutex<DigestByBucketAndPartition>>,
391    ) -> Result<(), anyhow::Error> {
392        let epoch_dir = self.epoch_dir();
393        let concurrency = self.concurrency;
394        let threshold = self.indirect_objects_threshold;
395        let remote_object_store = self.remote_object_store.clone();
396        // collects a vector of all object FileMetadata in the form of:
397        // (bucket, (partition, File_metadata))
398        let input_files: Vec<_> = self
399            .object_files
400            .iter()
401            .flat_map(|(bucket, parts)| {
402                parts
403                    .clone()
404                    .into_iter()
405                    .map(|entry| (bucket, entry))
406                    .collect::<Vec<_>>()
407            })
408            .collect();
409        // Creates a progress bar for object files
410        let obj_progress_bar = self.multi_progress_bar.add(
411            ProgressBar::new(input_files.len() as u64).with_style(
412                ProgressStyle::with_template(
413                    "[{elapsed_precise}] {wide_bar} {pos} out of {len} .obj files done ({msg})",
414                )
415                .unwrap(),
416            ),
417        );
418        let obj_progress_bar_clone = obj_progress_bar.clone();
419        let instant = Instant::now();
420        let downloaded_bytes = AtomicUsize::new(0);
421
422        let ret = Abortable::new(
423            async move {
424                // Downloads all object files from remote store to local store in parallel
425                // and inserts the objects into the AuthorityPerpetualTables
426                futures::stream::iter(input_files.iter())
427                    .map(|(bucket, (part_num, file_metadata))| {
428                        let epoch_dir = epoch_dir.clone();
429                        let file_path = file_metadata.file_path(&epoch_dir);
430                        let remote_object_store = remote_object_store.clone();
431                        let sha3_digests_cloned = sha3_digests.clone();
432                        async move {
433                            // Downloads object file with retries
434                            let max_timeout = Duration::from_secs(60);
435                            let mut timeout = Duration::from_secs(2);
436                            timeout += timeout / 2;
437                            timeout = std::cmp::min(max_timeout, timeout);
438                            let mut attempts = 0usize;
439                            let bytes = loop {
440                                match remote_object_store.get_bytes(&file_path).await {
441                                    Ok(bytes) => {
442                                        break bytes;
443                                    }
444                                    Err(err) => {
445                                        error!(
446                                            "Obj {} .get failed (attempt {}): {}",
447                                            file_metadata.file_path(&epoch_dir),
448                                            attempts,
449                                            err,
450                                        );
451                                        if timeout > max_timeout {
452                                            panic!(
453                                                "Failed to get obj file {} after {} attempts",
454                                                file_metadata.file_path(&epoch_dir),
455                                                attempts,
456                                            );
457                                        } else {
458                                            attempts += 1;
459                                            tokio::time::sleep(timeout).await;
460                                            timeout += timeout / 2;
461                                            continue;
462                                        }
463                                    }
464                                }
465                            };
466
467                            // Gets the sha3 digest of the partition
468                            let sha3_digest = sha3_digests_cloned.lock().await;
469                            let bucket_map = sha3_digest
470                                .get(bucket)
471                                .expect("Bucket not in digest map")
472                                .clone();
473                            let sha3_digest = *bucket_map
474                                .get(part_num)
475                                .expect("sha3 digest not in bucket map");
476                            Ok::<(Bytes, FileMetadata, [u8; 32]), anyhow::Error>((
477                                bytes,
478                                (*file_metadata).clone(),
479                                sha3_digest,
480                            ))
481                        }
482                    })
483                    .boxed()
484                    .buffer_unordered(concurrency)
485                    .try_for_each(|(bytes, file_metadata, sha3_digest)| {
486                        let bytes_len = bytes.len();
487                        // Inserts live objects into the AuthorityStore
488                        let result: Result<(), anyhow::Error> =
489                            LiveObjectIter::new(&file_metadata, bytes).map(|obj_iter| {
490                                AuthorityStore::bulk_insert_live_objects(
491                                    perpetual_db,
492                                    obj_iter,
493                                    threshold,
494                                    &sha3_digest,
495                                )
496                                .expect("Failed to insert live objects");
497                            });
498                        downloaded_bytes.fetch_add(bytes_len, Ordering::Relaxed);
499                        // Updates the progress bar
500                        obj_progress_bar_clone.inc(1);
501                        obj_progress_bar_clone.set_message(format!(
502                            "Download speed: {} MiB/s",
503                            downloaded_bytes.load(Ordering::Relaxed) as f64
504                                / (1024 * 1024) as f64
505                                / instant.elapsed().as_secs_f64(),
506                        ));
507                        futures::future::ready(result)
508                    })
509                    .await
510            },
511            abort_registration,
512        )
513        .await?;
514        obj_progress_bar.finish_with_message("Objects download complete");
515        ret
516    }
517
518    /// Returns an iterator over all references in a .ref file.
519    pub fn ref_iter(&self, bucket_num: u32, part_num: u32) -> Result<ObjectRefIter> {
520        // Gets the reference file metadata for the {bucket_num}_{part_num}
521        let file_metadata = self
522            .ref_files
523            .get(&bucket_num)
524            .context(format!("No ref files found for bucket: {bucket_num}"))?
525            .get(&part_num)
526            .context(format!(
527                "No ref files found for bucket: {bucket_num}, part: {part_num}"
528            ))?;
529        ObjectRefIter::new(
530            file_metadata,
531            self.local_staging_dir_root.clone(),
532            self.epoch_dir(),
533        )
534    }
535
536    /// Returns a list of all buckets.
537    fn buckets(&self) -> Result<Vec<u32>> {
538        Ok(self.ref_files.keys().copied().collect())
539    }
540
541    fn epoch_dir(&self) -> Path {
542        Path::from(format!("epoch_{}", self.epoch))
543    }
544
545    /// Reads the MANIFEST file, verifies it with the checksum, and returns the
546    /// Manifest.
547    fn read_manifest(path: PathBuf) -> anyhow::Result<Manifest> {
548        let manifest_file = File::open(path)?;
549        let manifest_file_size = manifest_file.metadata()?.len() as usize;
550        let mut manifest_reader = BufReader::new(manifest_file);
551        // Make sure the file is MANIFEST with correct magic bytes
552        manifest_reader.rewind()?;
553        let magic = manifest_reader.read_u32::<BigEndian>()?;
554        if magic != MANIFEST_FILE_MAGIC {
555            return Err(anyhow!("Unexpected magic byte: {}", magic));
556        }
557        // Gets the sha3 digest from the end of the file
558        manifest_reader.seek(SeekFrom::End(-(SHA3_BYTES as i64)))?;
559        let mut sha3_digest = [0u8; SHA3_BYTES];
560        manifest_reader.read_exact(&mut sha3_digest)?;
561        // Rewinds to the beginning of the file and read the contents
562        manifest_reader.rewind()?;
563        let mut content_buf = vec![0u8; manifest_file_size - SHA3_BYTES];
564        manifest_reader.read_exact(&mut content_buf)?;
565        // Computes the sha3 digest of the content and check if it matches the one at
566        // the end
567        let mut hasher = Sha3_256::default();
568        hasher.update(&content_buf);
569        let computed_digest = hasher.finalize().digest;
570        if computed_digest != sha3_digest {
571            return Err(anyhow!(
572                "Checksum: {:?} don't match: {:?}",
573                computed_digest,
574                sha3_digest
575            ));
576        }
577        manifest_reader.rewind()?;
578        manifest_reader.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
579        let manifest = bcs::from_bytes(&content_buf[MAGIC_BYTES..])?;
580        Ok(manifest)
581    }
582}
583
584/// An iterator over all object refs in a .ref file.
585pub struct ObjectRefIter {
586    reader: Box<dyn Read>,
587}
588
589impl ObjectRefIter {
590    pub fn new(file_metadata: &FileMetadata, root_path: PathBuf, dir_path: Path) -> Result<Self> {
591        let file_path = file_metadata.local_file_path(&root_path, &dir_path)?;
592        let mut reader = file_metadata.file_compression.decompress(&file_path)?;
593        let magic = reader.read_u32::<BigEndian>()?;
594        if magic != REFERENCE_FILE_MAGIC {
595            Err(anyhow!(
596                "Unexpected magic string in REFERENCE file: {:?}",
597                magic
598            ))
599        } else {
600            Ok(ObjectRefIter { reader })
601        }
602    }
603
604    fn next_ref(&mut self) -> Result<ObjectRef> {
605        let mut buf = [0u8; OBJECT_REF_BYTES];
606        self.reader.read_exact(&mut buf)?;
607        let object_id = &buf[0..OBJECT_ID_BYTES];
608        let sequence_number = &buf[OBJECT_ID_BYTES..OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES]
609            .reader()
610            .read_u64::<BigEndian>()?;
611        let sha3_digest = &buf[OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES..OBJECT_REF_BYTES];
612        let object_ref: ObjectRef = (
613            ObjectID::from_bytes(object_id)?,
614            SequenceNumber::from_u64(*sequence_number),
615            ObjectDigest::try_from(sha3_digest)?,
616        );
617        Ok(object_ref)
618    }
619}
620
621impl Iterator for ObjectRefIter {
622    type Item = ObjectRef;
623    fn next(&mut self) -> Option<Self::Item> {
624        self.next_ref().ok()
625    }
626}
627
628/// An iterator over all objects in a *.obj file.
629pub struct LiveObjectIter {
630    reader: Box<dyn Read>,
631}
632
633impl LiveObjectIter {
634    pub fn new(file_metadata: &FileMetadata, bytes: Bytes) -> Result<Self> {
635        let mut reader = file_metadata.file_compression.bytes_decompress(bytes)?;
636        let magic = reader.read_u32::<BigEndian>()?;
637        if magic != OBJECT_FILE_MAGIC {
638            Err(anyhow!(
639                "Unexpected magic string in object file: {:?}",
640                magic
641            ))
642        } else {
643            Ok(LiveObjectIter { reader })
644        }
645    }
646
647    fn next_object(&mut self) -> Result<LiveObject> {
648        let len = self.reader.read_varint::<u64>()? as usize;
649        if len == 0 {
650            return Err(anyhow!("Invalid object length of 0 in file"));
651        }
652        let encoding = self.reader.read_u8()?;
653        let mut data = vec![0u8; len];
654        self.reader.read_exact(&mut data)?;
655        let blob = Blob {
656            data,
657            encoding: BlobEncoding::try_from(encoding)?,
658        };
659        blob.decode()
660    }
661}
662
663impl Iterator for LiveObjectIter {
664    type Item = LiveObject;
665    fn next(&mut self) -> Option<Self::Item> {
666        self.next_object().ok()
667    }
668}