iota_archival/
reader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    borrow::Borrow,
7    future,
8    ops::Range,
9    sync::{
10        Arc,
11        atomic::{AtomicU64, Ordering},
12    },
13    time::Duration,
14};
15
16use anyhow::{Context, Result, anyhow};
17use bytes::{Buf, Bytes, buf::Reader};
18use futures::{StreamExt, TryStreamExt};
19use iota_config::node::ArchiveReaderConfig;
20use iota_storage::{
21    compute_sha3_checksum_for_bytes, make_iterator,
22    object_store::{ObjectStoreGetExt, http::HttpDownloaderBuilder, util::get},
23    verify_checkpoint,
24};
25use iota_types::{
26    messages_checkpoint::{
27        CertifiedCheckpointSummary, CheckpointSequenceNumber,
28        FullCheckpointContents as CheckpointContents, VerifiedCheckpoint,
29        VerifiedCheckpointContents,
30    },
31    storage::WriteStore,
32};
33use prometheus::{IntCounterVec, Registry, register_int_counter_vec_with_registry};
34use rand::seq::SliceRandom;
35use tokio::sync::{Mutex, oneshot, oneshot::Sender};
36use tracing::info;
37
38use crate::{
39    CHECKPOINT_FILE_MAGIC, FileMetadata, FileType, Manifest, SUMMARY_FILE_MAGIC, read_manifest,
40};
41
42#[derive(Debug)]
43pub struct ArchiveReaderMetrics {
44    pub archive_txns_read: IntCounterVec,
45    pub archive_checkpoints_read: IntCounterVec,
46}
47
48impl ArchiveReaderMetrics {
49    pub fn new(registry: &Registry) -> Arc<Self> {
50        let this = Self {
51            archive_txns_read: register_int_counter_vec_with_registry!(
52                "archive_txns_read",
53                "Number of transactions read from archive",
54                &["bucket"],
55                registry
56            )
57            .unwrap(),
58            archive_checkpoints_read: register_int_counter_vec_with_registry!(
59                "archive_checkpoints_read",
60                "Number of checkpoints read from archive",
61                &["bucket"],
62                registry
63            )
64            .unwrap(),
65        };
66        Arc::new(this)
67    }
68}
69
70// ArchiveReaderBalancer selects archives for reading based on whether they can
71// fulfill a checkpoint request
72#[derive(Default, Clone)]
73pub struct ArchiveReaderBalancer {
74    readers: Vec<Arc<ArchiveReader>>,
75}
76
77impl ArchiveReaderBalancer {
78    pub fn new(configs: Vec<ArchiveReaderConfig>, registry: &Registry) -> Result<Self> {
79        let mut readers = vec![];
80        let metrics = ArchiveReaderMetrics::new(registry);
81        for config in configs.into_iter() {
82            readers.push(Arc::new(ArchiveReader::new(config.clone(), &metrics)?));
83        }
84        Ok(ArchiveReaderBalancer { readers })
85    }
86    pub async fn get_archive_watermark(&self) -> Result<Option<u64>> {
87        let mut checkpoints: Vec<Result<CheckpointSequenceNumber>> = vec![];
88        for reader in self
89            .readers
90            .iter()
91            .filter(|r| r.use_for_pruning_watermark())
92        {
93            let latest_checkpoint = reader.latest_available_checkpoint().await;
94            info!(
95                "Latest archived checkpoint in remote store: {:?} is: {:?}",
96                reader.remote_store_identifier(),
97                latest_checkpoint
98            );
99            checkpoints.push(latest_checkpoint)
100        }
101        let checkpoints: Result<Vec<CheckpointSequenceNumber>> = checkpoints.into_iter().collect();
102        checkpoints.map(|vec| vec.into_iter().min())
103    }
104    pub async fn pick_one_random(
105        &self,
106        checkpoint_range: Range<CheckpointSequenceNumber>,
107    ) -> Option<Arc<ArchiveReader>> {
108        let mut archives_with_complete_range = vec![];
109        for reader in self.readers.iter() {
110            let latest_checkpoint = reader.latest_available_checkpoint().await.unwrap_or(0);
111            if latest_checkpoint >= checkpoint_range.end {
112                archives_with_complete_range.push(reader.clone());
113            }
114        }
115        if !archives_with_complete_range.is_empty() {
116            return Some(
117                archives_with_complete_range
118                    .choose(&mut rand::thread_rng())
119                    .unwrap()
120                    .clone(),
121            );
122        }
123        let mut archives_with_partial_range = vec![];
124        for reader in self.readers.iter() {
125            let latest_checkpoint = reader.latest_available_checkpoint().await.unwrap_or(0);
126            if latest_checkpoint >= checkpoint_range.start {
127                archives_with_partial_range.push(reader.clone());
128            }
129        }
130        if !archives_with_partial_range.is_empty() {
131            return Some(
132                archives_with_partial_range
133                    .choose(&mut rand::thread_rng())
134                    .unwrap()
135                    .clone(),
136            );
137        }
138        None
139    }
140}
141
142#[derive(Clone)]
143pub struct ArchiveReader {
144    bucket: String,
145    concurrency: usize,
146    sender: Arc<Sender<()>>,
147    manifest: Arc<Mutex<Manifest>>,
148    use_for_pruning_watermark: bool,
149    remote_object_store: Arc<dyn ObjectStoreGetExt>,
150    archive_reader_metrics: Arc<ArchiveReaderMetrics>,
151}
152
153impl ArchiveReader {
154    pub fn new(config: ArchiveReaderConfig, metrics: &Arc<ArchiveReaderMetrics>) -> Result<Self> {
155        let bucket = config
156            .remote_store_config
157            .bucket
158            .clone()
159            .unwrap_or("unknown".to_string());
160        let remote_object_store = if config.remote_store_config.no_sign_request {
161            config.remote_store_config.make_http()?
162        } else {
163            config.remote_store_config.make().map(Arc::new)?
164        };
165        let (sender, recv) = oneshot::channel();
166        let manifest = Arc::new(Mutex::new(Manifest::new(0, 0)));
167        // Start a background tokio task to keep local manifest in sync with remote
168        Self::spawn_manifest_sync_task(remote_object_store.clone(), manifest.clone(), recv);
169        Ok(ArchiveReader {
170            bucket,
171            manifest,
172            sender: Arc::new(sender),
173            remote_object_store,
174            use_for_pruning_watermark: config.use_for_pruning_watermark,
175            concurrency: config.download_concurrency.get(),
176            archive_reader_metrics: metrics.clone(),
177        })
178    }
179
180    /// This function verifies that the files in archive cover the entire range
181    /// of checkpoints from sequence number 0 until the latest available
182    /// checkpoint with no missing checkpoint
183    pub async fn verify_manifest(
184        &self,
185        manifest: Manifest,
186    ) -> Result<Vec<(FileMetadata, FileMetadata)>> {
187        let files = manifest.files();
188        if files.is_empty() {
189            return Err(anyhow!("Unexpected empty archive store"));
190        }
191
192        let mut summary_files: Vec<_> = files
193            .clone()
194            .into_iter()
195            .filter(|f| f.file_type == FileType::CheckpointSummary)
196            .collect();
197        let mut contents_files: Vec<_> = files
198            .into_iter()
199            .filter(|f| f.file_type == FileType::CheckpointContent)
200            .collect();
201        assert_eq!(summary_files.len(), contents_files.len());
202
203        summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
204        contents_files.sort_by_key(|f| f.checkpoint_seq_range.start);
205
206        assert!(
207            summary_files
208                .windows(2)
209                .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
210        );
211        assert!(
212            contents_files
213                .windows(2)
214                .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
215        );
216
217        let files: Vec<(FileMetadata, FileMetadata)> = summary_files
218            .into_iter()
219            .zip(contents_files.into_iter())
220            .map(|(s, c)| {
221                assert_eq!(s.checkpoint_seq_range, c.checkpoint_seq_range);
222                (s, c)
223            })
224            .collect();
225
226        assert_eq!(files.first().unwrap().0.checkpoint_seq_range.start, 0);
227
228        Ok(files)
229    }
230
231    /// This function downloads summary and content files and ensures their
232    /// computed checksum matches the one in manifest
233    pub async fn verify_file_consistency(
234        &self,
235        files: Vec<(FileMetadata, FileMetadata)>,
236    ) -> Result<()> {
237        let remote_object_store = self.remote_object_store.clone();
238        futures::stream::iter(files.iter())
239            .enumerate()
240            .map(|(_, (summary_metadata, content_metadata))| {
241                let remote_object_store = remote_object_store.clone();
242                async move {
243                    let summary_data =
244                        get(&remote_object_store, &summary_metadata.file_path()).await?;
245                    let content_data =
246                        get(&remote_object_store, &content_metadata.file_path()).await?;
247                    Ok::<((Bytes, &FileMetadata), (Bytes, &FileMetadata)), anyhow::Error>((
248                        (summary_data, summary_metadata),
249                        (content_data, content_metadata),
250                    ))
251                }
252            })
253            .boxed()
254            .buffer_unordered(self.concurrency)
255            .try_for_each(
256                |((summary_data, summary_metadata), (content_data, content_metadata))| {
257                    let checksums = compute_sha3_checksum_for_bytes(summary_data).and_then(|s| {
258                        compute_sha3_checksum_for_bytes(content_data).map(|c| (s, c))
259                    });
260                    let result = checksums.and_then(|(summary_checksum, content_checksum)| {
261                        (summary_checksum == summary_metadata.sha3_digest)
262                            .then_some(())
263                            .ok_or(anyhow!(
264                                "Summary checksum doesn't match for file: {:?}",
265                                summary_metadata.file_path()
266                            ))?;
267                        (content_checksum == content_metadata.sha3_digest)
268                            .then_some(())
269                            .ok_or(anyhow!(
270                                "Content checksum doesn't match for file: {:?}",
271                                content_metadata.file_path()
272                            ))?;
273                        Ok::<(), anyhow::Error>(())
274                    });
275                    futures::future::ready(result)
276                },
277            )
278            .await
279    }
280
281    /// Load checkpoints from archive into the input store `S` for the given
282    /// checkpoint range. Summaries are downloaded out of order and inserted
283    /// without verification
284    pub async fn read_summaries_for_range_no_verify<S>(
285        &self,
286        store: S,
287        checkpoint_range: Range<CheckpointSequenceNumber>,
288        checkpoint_counter: Arc<AtomicU64>,
289    ) -> Result<()>
290    where
291        S: WriteStore + Clone,
292    {
293        let (summary_files, start_index, end_index) = self
294            .get_summary_files_for_range(checkpoint_range.clone())
295            .await?;
296        let remote_object_store = self.remote_object_store.clone();
297        let stream = futures::stream::iter(summary_files.iter())
298            .enumerate()
299            .filter(|(index, _s)| future::ready(*index >= start_index && *index < end_index))
300            .map(|(_, summary_metadata)| {
301                let remote_object_store = remote_object_store.clone();
302                async move {
303                    let summary_data =
304                        get(&remote_object_store, &summary_metadata.file_path()).await?;
305                    Ok::<Bytes, anyhow::Error>(summary_data)
306                }
307            })
308            .boxed();
309        stream
310            .buffer_unordered(self.concurrency)
311            .try_for_each(|summary_data| {
312                let result: Result<(), anyhow::Error> =
313                    make_iterator::<CertifiedCheckpointSummary, Reader<Bytes>>(
314                        SUMMARY_FILE_MAGIC,
315                        summary_data.reader(),
316                    )
317                    .and_then(|summary_iter| {
318                        summary_iter
319                            .filter(|s| {
320                                s.sequence_number >= checkpoint_range.start
321                                    && s.sequence_number < checkpoint_range.end
322                            })
323                            .try_for_each(|summary| {
324                                Self::insert_certified_checkpoint(&store, summary)?;
325                                checkpoint_counter.fetch_add(1, Ordering::Relaxed);
326                                Ok::<(), anyhow::Error>(())
327                            })
328                    });
329                futures::future::ready(result)
330            })
331            .await
332    }
333
334    /// Load given list of checkpoints from archive into the input store `S`.
335    /// Summaries are downloaded out of order and inserted without verification
336    pub async fn read_summaries_for_list_no_verify<S>(
337        &self,
338        store: S,
339        skiplist: Vec<CheckpointSequenceNumber>,
340        checkpoint_counter: Arc<AtomicU64>,
341    ) -> Result<()>
342    where
343        S: WriteStore + Clone,
344    {
345        let summary_files = self.get_summary_files_for_list(skiplist.clone()).await?;
346        let remote_object_store = self.remote_object_store.clone();
347        let stream = futures::stream::iter(summary_files.iter())
348            .map(|summary_metadata| {
349                let remote_object_store = remote_object_store.clone();
350                async move {
351                    let summary_data =
352                        get(&remote_object_store, &summary_metadata.file_path()).await?;
353                    Ok::<Bytes, anyhow::Error>(summary_data)
354                }
355            })
356            .boxed();
357
358        stream
359            .buffer_unordered(self.concurrency)
360            .try_for_each(|summary_data| {
361                let result: Result<(), anyhow::Error> =
362                    make_iterator::<CertifiedCheckpointSummary, Reader<Bytes>>(
363                        SUMMARY_FILE_MAGIC,
364                        summary_data.reader(),
365                    )
366                    .and_then(|summary_iter| {
367                        summary_iter
368                            .filter(|s| skiplist.contains(&s.sequence_number))
369                            .try_for_each(|summary| {
370                                Self::insert_certified_checkpoint(&store, summary)?;
371                                checkpoint_counter.fetch_add(1, Ordering::Relaxed);
372                                Ok::<(), anyhow::Error>(())
373                            })
374                    });
375                futures::future::ready(result)
376            })
377            .await
378    }
379
380    /// Load checkpoints+txns+effects from archive into the input store `S` for
381    /// the given checkpoint range. If latest available checkpoint in
382    /// archive is older than the start of the input range then this call
383    /// fails with an error otherwise we load as many checkpoints as
384    /// possible until the end of the provided checkpoint range.
385    pub async fn read<S>(
386        &self,
387        store: S,
388        checkpoint_range: Range<CheckpointSequenceNumber>,
389        txn_counter: Arc<AtomicU64>,
390        checkpoint_counter: Arc<AtomicU64>,
391        verify: bool,
392    ) -> Result<()>
393    where
394        S: WriteStore + Clone,
395    {
396        let manifest = self.manifest.lock().await.clone();
397
398        let latest_available_checkpoint = manifest
399            .next_checkpoint_seq_num()
400            .checked_sub(1)
401            .context("Checkpoint seq num underflow")?;
402
403        if checkpoint_range.start > latest_available_checkpoint {
404            return Err(anyhow!(
405                "Latest available checkpoint is: {}",
406                latest_available_checkpoint
407            ));
408        }
409
410        let files: Vec<(FileMetadata, FileMetadata)> = self.verify_manifest(manifest).await?;
411
412        let start_index = match files.binary_search_by_key(&checkpoint_range.start, |(s, _c)| {
413            s.checkpoint_seq_range.start
414        }) {
415            Ok(index) => index,
416            Err(index) => index - 1,
417        };
418
419        let end_index = match files.binary_search_by_key(&checkpoint_range.end, |(s, _c)| {
420            s.checkpoint_seq_range.start
421        }) {
422            Ok(index) => index,
423            Err(index) => index,
424        };
425
426        let remote_object_store = self.remote_object_store.clone();
427        futures::stream::iter(files.iter())
428            .enumerate()
429            .filter(|(index, (_s, _c))| future::ready(*index >= start_index && *index < end_index))
430            .map(|(_, (summary_metadata, content_metadata))| {
431                let remote_object_store = remote_object_store.clone();
432                async move {
433                    let summary_data =
434                        get(&remote_object_store, &summary_metadata.file_path()).await?;
435                    let content_data =
436                        get(&remote_object_store, &content_metadata.file_path()).await?;
437                    Ok::<(Bytes, Bytes), anyhow::Error>((summary_data, content_data))
438                }
439            })
440            .boxed()
441            .buffered(self.concurrency)
442            .try_for_each(|(summary_data, content_data)| {
443                let result: Result<(), anyhow::Error> = make_iterator::<
444                    CertifiedCheckpointSummary,
445                    Reader<Bytes>,
446                >(
447                    SUMMARY_FILE_MAGIC, summary_data.reader()
448                )
449                .and_then(|s| {
450                    make_iterator::<CheckpointContents, Reader<Bytes>>(
451                        CHECKPOINT_FILE_MAGIC,
452                        content_data.reader(),
453                    )
454                    .map(|c| (s, c))
455                })
456                .and_then(|(summary_iter, content_iter)| {
457                    summary_iter
458                        .zip(content_iter)
459                        .filter(|(s, _c)| {
460                            s.sequence_number >= checkpoint_range.start
461                                && s.sequence_number < checkpoint_range.end
462                        })
463                        .try_for_each(|(summary, contents)| {
464                            let verified_checkpoint =
465                                Self::get_or_insert_verified_checkpoint(&store, summary, verify)?;
466                            // Verify content
467                            let digest = verified_checkpoint.content_digest;
468                            contents.verify_digests(digest)?;
469                            let verified_contents =
470                                VerifiedCheckpointContents::new_unchecked(contents.clone());
471                            // Insert content
472                            store
473                                .insert_checkpoint_contents(&verified_checkpoint, verified_contents)
474                                .map_err(|e| anyhow!("Failed to insert content: {e}"))?;
475                            // Update highest synced watermark
476                            store
477                                .update_highest_synced_checkpoint(&verified_checkpoint)
478                                .map_err(|e| anyhow!("Failed to update watermark: {e}"))?;
479                            txn_counter.fetch_add(contents.size() as u64, Ordering::Relaxed);
480                            self.archive_reader_metrics
481                                .archive_txns_read
482                                .with_label_values(&[&self.bucket])
483                                .inc_by(contents.size() as u64);
484                            checkpoint_counter.fetch_add(1, Ordering::Relaxed);
485                            self.archive_reader_metrics
486                                .archive_checkpoints_read
487                                .with_label_values(&[&self.bucket])
488                                .inc_by(1);
489                            Ok::<(), anyhow::Error>(())
490                        })
491                });
492                futures::future::ready(result)
493            })
494            .await
495    }
496
497    /// Return latest available checkpoint in archive
498    pub async fn latest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
499        let manifest = self.manifest.lock().await.clone();
500        manifest
501            .next_checkpoint_seq_num()
502            .checked_sub(1)
503            .context("No checkpoint data in archive")
504    }
505
506    pub fn use_for_pruning_watermark(&self) -> bool {
507        self.use_for_pruning_watermark
508    }
509
510    pub fn remote_store_identifier(&self) -> String {
511        self.remote_object_store.to_string()
512    }
513
514    /// Syncs the Manifest from remote store.
515    pub async fn sync_manifest_once(&self) -> Result<()> {
516        Self::sync_manifest(self.remote_object_store.clone(), self.manifest.clone()).await?;
517        Ok(())
518    }
519
520    pub async fn get_manifest(&self) -> Result<Manifest> {
521        Ok(self.manifest.lock().await.clone())
522    }
523
524    /// Copies Manifest from remote store to the given Manifest.
525    async fn sync_manifest(
526        remote_store: Arc<dyn ObjectStoreGetExt>,
527        manifest: Arc<Mutex<Manifest>>,
528    ) -> Result<()> {
529        let new_manifest = read_manifest(remote_store.clone()).await?;
530        let mut locked = manifest.lock().await;
531        *locked = new_manifest;
532        Ok(())
533    }
534
535    /// Insert checkpoint summary without verifying it
536    fn insert_certified_checkpoint<S>(
537        store: &S,
538        certified_checkpoint: CertifiedCheckpointSummary,
539    ) -> Result<()>
540    where
541        S: WriteStore + Clone,
542    {
543        store
544            .insert_checkpoint(VerifiedCheckpoint::new_unchecked(certified_checkpoint).borrow())
545            .map_err(|e| anyhow!("Failed to insert checkpoint: {e}"))
546    }
547
548    /// Insert checkpoint summary if it doesn't already exist after verifying it
549    fn get_or_insert_verified_checkpoint<S>(
550        store: &S,
551        certified_checkpoint: CertifiedCheckpointSummary,
552        verify: bool,
553    ) -> Result<VerifiedCheckpoint>
554    where
555        S: WriteStore + Clone,
556    {
557        store
558            .get_checkpoint_by_sequence_number(certified_checkpoint.sequence_number)
559            .map_err(|e| anyhow!("Store op failed: {e}"))?
560            .map(Ok::<VerifiedCheckpoint, anyhow::Error>)
561            .unwrap_or_else(|| {
562                let verified_checkpoint = if verify {
563                    // Verify checkpoint summary
564                    let prev_checkpoint_seq_num = certified_checkpoint
565                        .sequence_number
566                        .checked_sub(1)
567                        .context("Checkpoint seq num underflow")?;
568                    let prev_checkpoint = store
569                        .get_checkpoint_by_sequence_number(prev_checkpoint_seq_num)
570                        .map_err(|e| anyhow!("Store op failed: {e}"))?
571                        .context(format!(
572                            "Missing previous checkpoint {} in store",
573                            prev_checkpoint_seq_num
574                        ))?;
575
576                    verify_checkpoint(&prev_checkpoint, store, certified_checkpoint)
577                        .map_err(|_| anyhow!("Checkpoint verification failed"))?
578                } else {
579                    VerifiedCheckpoint::new_unchecked(certified_checkpoint)
580                };
581                // Insert checkpoint summary
582                store
583                    .insert_checkpoint(&verified_checkpoint)
584                    .map_err(|e| anyhow!("Failed to insert checkpoint: {e}"))?;
585                // Update highest verified checkpoint watermark
586                store
587                    .update_highest_verified_checkpoint(&verified_checkpoint)
588                    .expect("store operation should not fail");
589                Ok::<VerifiedCheckpoint, anyhow::Error>(verified_checkpoint)
590            })
591            .map_err(|e| anyhow!("Failed to get verified checkpoint: {:?}", e))
592    }
593
594    async fn get_summary_files_for_range(
595        &self,
596        checkpoint_range: Range<CheckpointSequenceNumber>,
597    ) -> Result<(Vec<FileMetadata>, usize, usize)> {
598        let manifest = self.manifest.lock().await.clone();
599
600        let latest_available_checkpoint = manifest
601            .next_checkpoint_seq_num()
602            .checked_sub(1)
603            .context("Checkpoint seq num underflow")?;
604
605        if checkpoint_range.start > latest_available_checkpoint {
606            return Err(anyhow!(
607                "Latest available checkpoint is: {}",
608                latest_available_checkpoint
609            ));
610        }
611
612        let summary_files: Vec<FileMetadata> = self
613            .verify_manifest(manifest)
614            .await?
615            .iter()
616            .map(|(s, _)| s.clone())
617            .collect();
618
619        let start_index = match summary_files
620            .binary_search_by_key(&checkpoint_range.start, |s| s.checkpoint_seq_range.start)
621        {
622            Ok(index) => index,
623            Err(index) => index - 1,
624        };
625
626        let end_index = match summary_files
627            .binary_search_by_key(&checkpoint_range.end, |s| s.checkpoint_seq_range.start)
628        {
629            Ok(index) => index,
630            Err(index) => index,
631        };
632
633        Ok((summary_files, start_index, end_index))
634    }
635
636    async fn get_summary_files_for_list(
637        &self,
638        checkpoints: Vec<CheckpointSequenceNumber>,
639    ) -> Result<Vec<FileMetadata>> {
640        assert!(!checkpoints.is_empty());
641        let manifest = self.manifest.lock().await.clone();
642        let latest_available_checkpoint = manifest
643            .next_checkpoint_seq_num()
644            .checked_sub(1)
645            .context("Checkpoint seq num underflow")?;
646
647        let mut ordered_checkpoints = checkpoints;
648        ordered_checkpoints.sort();
649        if *ordered_checkpoints.first().unwrap() > latest_available_checkpoint {
650            return Err(anyhow!(
651                "Latest available checkpoint is: {}",
652                latest_available_checkpoint
653            ));
654        }
655
656        let summary_files: Vec<FileMetadata> = self
657            .verify_manifest(manifest)
658            .await?
659            .iter()
660            .map(|(s, _)| s.clone())
661            .collect();
662
663        let mut summaries_filtered = vec![];
664        for checkpoint in ordered_checkpoints.iter() {
665            let index = summary_files
666                .binary_search_by(|s| {
667                    if checkpoint < &s.checkpoint_seq_range.start {
668                        std::cmp::Ordering::Greater
669                    } else if checkpoint >= &s.checkpoint_seq_range.end {
670                        std::cmp::Ordering::Less
671                    } else {
672                        std::cmp::Ordering::Equal
673                    }
674                })
675                .unwrap_or_else(|_| panic!("Archive does not contain checkpoint {checkpoint}"));
676            summaries_filtered.push(summary_files[index].clone());
677        }
678
679        Ok(summaries_filtered)
680    }
681
682    fn spawn_manifest_sync_task<S: ObjectStoreGetExt + Clone>(
683        remote_store: S,
684        manifest: Arc<Mutex<Manifest>>,
685        mut recv: oneshot::Receiver<()>,
686    ) {
687        tokio::task::spawn(async move {
688            let mut interval = tokio::time::interval(Duration::from_secs(60));
689            loop {
690                tokio::select! {
691                    _ = interval.tick() => {
692                        let new_manifest = read_manifest(remote_store.clone()).await?;
693                        let mut locked = manifest.lock().await;
694                        *locked = new_manifest;
695                    }
696                    _ = &mut recv => break,
697                }
698            }
699            info!("Terminating the manifest sync loop");
700            Ok::<(), anyhow::Error>(())
701        });
702    }
703}