1use std::{
6 borrow::Borrow,
7 future,
8 ops::Range,
9 sync::{
10 Arc,
11 atomic::{AtomicU64, Ordering},
12 },
13 time::Duration,
14};
15
16use anyhow::{Context, Result, anyhow};
17use bytes::{Buf, Bytes, buf::Reader};
18use futures::{StreamExt, TryStreamExt};
19use iota_config::node::ArchiveReaderConfig;
20use iota_storage::{
21 compute_sha3_checksum_for_bytes, make_iterator,
22 object_store::{ObjectStoreGetExt, http::HttpDownloaderBuilder, util::get},
23 verify_checkpoint,
24};
25use iota_types::{
26 messages_checkpoint::{
27 CertifiedCheckpointSummary, CheckpointSequenceNumber,
28 FullCheckpointContents as CheckpointContents, VerifiedCheckpoint,
29 VerifiedCheckpointContents,
30 },
31 storage::WriteStore,
32};
33use prometheus::{IntCounterVec, Registry, register_int_counter_vec_with_registry};
34use rand::seq::SliceRandom;
35use tokio::sync::{Mutex, oneshot, oneshot::Sender};
36use tracing::info;
37
38use crate::{
39 CHECKPOINT_FILE_MAGIC, FileMetadata, FileType, Manifest, SUMMARY_FILE_MAGIC, read_manifest,
40};
41
42#[derive(Debug)]
43pub struct ArchiveReaderMetrics {
44 pub archive_txns_read: IntCounterVec,
45 pub archive_checkpoints_read: IntCounterVec,
46}
47
48impl ArchiveReaderMetrics {
49 pub fn new(registry: &Registry) -> Arc<Self> {
50 let this = Self {
51 archive_txns_read: register_int_counter_vec_with_registry!(
52 "archive_txns_read",
53 "Number of transactions read from archive",
54 &["bucket"],
55 registry
56 )
57 .unwrap(),
58 archive_checkpoints_read: register_int_counter_vec_with_registry!(
59 "archive_checkpoints_read",
60 "Number of checkpoints read from archive",
61 &["bucket"],
62 registry
63 )
64 .unwrap(),
65 };
66 Arc::new(this)
67 }
68}
69
70#[derive(Default, Clone)]
73pub struct ArchiveReaderBalancer {
74 readers: Vec<Arc<ArchiveReader>>,
75}
76
77impl ArchiveReaderBalancer {
78 pub fn new(configs: Vec<ArchiveReaderConfig>, registry: &Registry) -> Result<Self> {
79 let mut readers = vec![];
80 let metrics = ArchiveReaderMetrics::new(registry);
81 for config in configs.into_iter() {
82 readers.push(Arc::new(ArchiveReader::new(config.clone(), &metrics)?));
83 }
84 Ok(ArchiveReaderBalancer { readers })
85 }
86 pub async fn get_archive_watermark(&self) -> Result<Option<u64>> {
87 let mut checkpoints: Vec<Result<CheckpointSequenceNumber>> = vec![];
88 for reader in self
89 .readers
90 .iter()
91 .filter(|r| r.use_for_pruning_watermark())
92 {
93 let latest_checkpoint = reader.latest_available_checkpoint().await;
94 info!(
95 "Latest archived checkpoint in remote store: {:?} is: {:?}",
96 reader.remote_store_identifier(),
97 latest_checkpoint
98 );
99 checkpoints.push(latest_checkpoint)
100 }
101 let checkpoints: Result<Vec<CheckpointSequenceNumber>> = checkpoints.into_iter().collect();
102 checkpoints.map(|vec| vec.into_iter().min())
103 }
104 pub async fn pick_one_random(
105 &self,
106 checkpoint_range: Range<CheckpointSequenceNumber>,
107 ) -> Option<Arc<ArchiveReader>> {
108 let mut archives_with_complete_range = vec![];
109 for reader in self.readers.iter() {
110 let latest_checkpoint = reader.latest_available_checkpoint().await.unwrap_or(0);
111 if latest_checkpoint >= checkpoint_range.end {
112 archives_with_complete_range.push(reader.clone());
113 }
114 }
115 if !archives_with_complete_range.is_empty() {
116 return Some(
117 archives_with_complete_range
118 .choose(&mut rand::thread_rng())
119 .unwrap()
120 .clone(),
121 );
122 }
123 let mut archives_with_partial_range = vec![];
124 for reader in self.readers.iter() {
125 let latest_checkpoint = reader.latest_available_checkpoint().await.unwrap_or(0);
126 if latest_checkpoint >= checkpoint_range.start {
127 archives_with_partial_range.push(reader.clone());
128 }
129 }
130 if !archives_with_partial_range.is_empty() {
131 return Some(
132 archives_with_partial_range
133 .choose(&mut rand::thread_rng())
134 .unwrap()
135 .clone(),
136 );
137 }
138 None
139 }
140}
141
142#[derive(Clone)]
143pub struct ArchiveReader {
144 bucket: String,
145 concurrency: usize,
146 sender: Arc<Sender<()>>,
147 manifest: Arc<Mutex<Manifest>>,
148 use_for_pruning_watermark: bool,
149 remote_object_store: Arc<dyn ObjectStoreGetExt>,
150 archive_reader_metrics: Arc<ArchiveReaderMetrics>,
151}
152
153impl ArchiveReader {
154 pub fn new(config: ArchiveReaderConfig, metrics: &Arc<ArchiveReaderMetrics>) -> Result<Self> {
155 let bucket = config
156 .remote_store_config
157 .bucket
158 .clone()
159 .unwrap_or("unknown".to_string());
160 let remote_object_store = if config.remote_store_config.no_sign_request {
161 config.remote_store_config.make_http()?
162 } else {
163 config.remote_store_config.make().map(Arc::new)?
164 };
165 let (sender, recv) = oneshot::channel();
166 let manifest = Arc::new(Mutex::new(Manifest::new(0, 0)));
167 Self::spawn_manifest_sync_task(remote_object_store.clone(), manifest.clone(), recv);
169 Ok(ArchiveReader {
170 bucket,
171 manifest,
172 sender: Arc::new(sender),
173 remote_object_store,
174 use_for_pruning_watermark: config.use_for_pruning_watermark,
175 concurrency: config.download_concurrency.get(),
176 archive_reader_metrics: metrics.clone(),
177 })
178 }
179
180 pub async fn verify_manifest(
184 &self,
185 manifest: Manifest,
186 ) -> Result<Vec<(FileMetadata, FileMetadata)>> {
187 let files = manifest.files();
188 if files.is_empty() {
189 return Err(anyhow!("Unexpected empty archive store"));
190 }
191
192 let mut summary_files: Vec<_> = files
193 .clone()
194 .into_iter()
195 .filter(|f| f.file_type == FileType::CheckpointSummary)
196 .collect();
197 let mut contents_files: Vec<_> = files
198 .into_iter()
199 .filter(|f| f.file_type == FileType::CheckpointContent)
200 .collect();
201 assert_eq!(summary_files.len(), contents_files.len());
202
203 summary_files.sort_by_key(|f| f.checkpoint_seq_range.start);
204 contents_files.sort_by_key(|f| f.checkpoint_seq_range.start);
205
206 assert!(
207 summary_files
208 .windows(2)
209 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
210 );
211 assert!(
212 contents_files
213 .windows(2)
214 .all(|w| w[1].checkpoint_seq_range.start == w[0].checkpoint_seq_range.end)
215 );
216
217 let files: Vec<(FileMetadata, FileMetadata)> = summary_files
218 .into_iter()
219 .zip(contents_files.into_iter())
220 .map(|(s, c)| {
221 assert_eq!(s.checkpoint_seq_range, c.checkpoint_seq_range);
222 (s, c)
223 })
224 .collect();
225
226 assert_eq!(files.first().unwrap().0.checkpoint_seq_range.start, 0);
227
228 Ok(files)
229 }
230
231 pub async fn verify_file_consistency(
234 &self,
235 files: Vec<(FileMetadata, FileMetadata)>,
236 ) -> Result<()> {
237 let remote_object_store = self.remote_object_store.clone();
238 futures::stream::iter(files.iter())
239 .enumerate()
240 .map(|(_, (summary_metadata, content_metadata))| {
241 let remote_object_store = remote_object_store.clone();
242 async move {
243 let summary_data =
244 get(&remote_object_store, &summary_metadata.file_path()).await?;
245 let content_data =
246 get(&remote_object_store, &content_metadata.file_path()).await?;
247 Ok::<((Bytes, &FileMetadata), (Bytes, &FileMetadata)), anyhow::Error>((
248 (summary_data, summary_metadata),
249 (content_data, content_metadata),
250 ))
251 }
252 })
253 .boxed()
254 .buffer_unordered(self.concurrency)
255 .try_for_each(
256 |((summary_data, summary_metadata), (content_data, content_metadata))| {
257 let checksums = compute_sha3_checksum_for_bytes(summary_data).and_then(|s| {
258 compute_sha3_checksum_for_bytes(content_data).map(|c| (s, c))
259 });
260 let result = checksums.and_then(|(summary_checksum, content_checksum)| {
261 (summary_checksum == summary_metadata.sha3_digest)
262 .then_some(())
263 .ok_or(anyhow!(
264 "Summary checksum doesn't match for file: {:?}",
265 summary_metadata.file_path()
266 ))?;
267 (content_checksum == content_metadata.sha3_digest)
268 .then_some(())
269 .ok_or(anyhow!(
270 "Content checksum doesn't match for file: {:?}",
271 content_metadata.file_path()
272 ))?;
273 Ok::<(), anyhow::Error>(())
274 });
275 futures::future::ready(result)
276 },
277 )
278 .await
279 }
280
281 pub async fn read_summaries_for_range_no_verify<S>(
285 &self,
286 store: S,
287 checkpoint_range: Range<CheckpointSequenceNumber>,
288 checkpoint_counter: Arc<AtomicU64>,
289 ) -> Result<()>
290 where
291 S: WriteStore + Clone,
292 {
293 let (summary_files, start_index, end_index) = self
294 .get_summary_files_for_range(checkpoint_range.clone())
295 .await?;
296 let remote_object_store = self.remote_object_store.clone();
297 let stream = futures::stream::iter(summary_files.iter())
298 .enumerate()
299 .filter(|(index, _s)| future::ready(*index >= start_index && *index < end_index))
300 .map(|(_, summary_metadata)| {
301 let remote_object_store = remote_object_store.clone();
302 async move {
303 let summary_data =
304 get(&remote_object_store, &summary_metadata.file_path()).await?;
305 Ok::<Bytes, anyhow::Error>(summary_data)
306 }
307 })
308 .boxed();
309 stream
310 .buffer_unordered(self.concurrency)
311 .try_for_each(|summary_data| {
312 let result: Result<(), anyhow::Error> =
313 make_iterator::<CertifiedCheckpointSummary, Reader<Bytes>>(
314 SUMMARY_FILE_MAGIC,
315 summary_data.reader(),
316 )
317 .and_then(|summary_iter| {
318 summary_iter
319 .filter(|s| {
320 s.sequence_number >= checkpoint_range.start
321 && s.sequence_number < checkpoint_range.end
322 })
323 .try_for_each(|summary| {
324 Self::insert_certified_checkpoint(&store, summary)?;
325 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
326 Ok::<(), anyhow::Error>(())
327 })
328 });
329 futures::future::ready(result)
330 })
331 .await
332 }
333
334 pub async fn read_summaries_for_list_no_verify<S>(
337 &self,
338 store: S,
339 skiplist: Vec<CheckpointSequenceNumber>,
340 checkpoint_counter: Arc<AtomicU64>,
341 ) -> Result<()>
342 where
343 S: WriteStore + Clone,
344 {
345 let summary_files = self.get_summary_files_for_list(skiplist.clone()).await?;
346 let remote_object_store = self.remote_object_store.clone();
347 let stream = futures::stream::iter(summary_files.iter())
348 .map(|summary_metadata| {
349 let remote_object_store = remote_object_store.clone();
350 async move {
351 let summary_data =
352 get(&remote_object_store, &summary_metadata.file_path()).await?;
353 Ok::<Bytes, anyhow::Error>(summary_data)
354 }
355 })
356 .boxed();
357
358 stream
359 .buffer_unordered(self.concurrency)
360 .try_for_each(|summary_data| {
361 let result: Result<(), anyhow::Error> =
362 make_iterator::<CertifiedCheckpointSummary, Reader<Bytes>>(
363 SUMMARY_FILE_MAGIC,
364 summary_data.reader(),
365 )
366 .and_then(|summary_iter| {
367 summary_iter
368 .filter(|s| skiplist.contains(&s.sequence_number))
369 .try_for_each(|summary| {
370 Self::insert_certified_checkpoint(&store, summary)?;
371 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
372 Ok::<(), anyhow::Error>(())
373 })
374 });
375 futures::future::ready(result)
376 })
377 .await
378 }
379
380 pub async fn read<S>(
386 &self,
387 store: S,
388 checkpoint_range: Range<CheckpointSequenceNumber>,
389 txn_counter: Arc<AtomicU64>,
390 checkpoint_counter: Arc<AtomicU64>,
391 verify: bool,
392 ) -> Result<()>
393 where
394 S: WriteStore + Clone,
395 {
396 let manifest = self.manifest.lock().await.clone();
397
398 let latest_available_checkpoint = manifest
399 .next_checkpoint_seq_num()
400 .checked_sub(1)
401 .context("Checkpoint seq num underflow")?;
402
403 if checkpoint_range.start > latest_available_checkpoint {
404 return Err(anyhow!(
405 "Latest available checkpoint is: {}",
406 latest_available_checkpoint
407 ));
408 }
409
410 let files: Vec<(FileMetadata, FileMetadata)> = self.verify_manifest(manifest).await?;
411
412 let start_index = match files.binary_search_by_key(&checkpoint_range.start, |(s, _c)| {
413 s.checkpoint_seq_range.start
414 }) {
415 Ok(index) => index,
416 Err(index) => index - 1,
417 };
418
419 let end_index = match files.binary_search_by_key(&checkpoint_range.end, |(s, _c)| {
420 s.checkpoint_seq_range.start
421 }) {
422 Ok(index) => index,
423 Err(index) => index,
424 };
425
426 let remote_object_store = self.remote_object_store.clone();
427 futures::stream::iter(files.iter())
428 .enumerate()
429 .filter(|(index, (_s, _c))| future::ready(*index >= start_index && *index < end_index))
430 .map(|(_, (summary_metadata, content_metadata))| {
431 let remote_object_store = remote_object_store.clone();
432 async move {
433 let summary_data =
434 get(&remote_object_store, &summary_metadata.file_path()).await?;
435 let content_data =
436 get(&remote_object_store, &content_metadata.file_path()).await?;
437 Ok::<(Bytes, Bytes), anyhow::Error>((summary_data, content_data))
438 }
439 })
440 .boxed()
441 .buffered(self.concurrency)
442 .try_for_each(|(summary_data, content_data)| {
443 let result: Result<(), anyhow::Error> = make_iterator::<
444 CertifiedCheckpointSummary,
445 Reader<Bytes>,
446 >(
447 SUMMARY_FILE_MAGIC, summary_data.reader()
448 )
449 .and_then(|s| {
450 make_iterator::<CheckpointContents, Reader<Bytes>>(
451 CHECKPOINT_FILE_MAGIC,
452 content_data.reader(),
453 )
454 .map(|c| (s, c))
455 })
456 .and_then(|(summary_iter, content_iter)| {
457 summary_iter
458 .zip(content_iter)
459 .filter(|(s, _c)| {
460 s.sequence_number >= checkpoint_range.start
461 && s.sequence_number < checkpoint_range.end
462 })
463 .try_for_each(|(summary, contents)| {
464 let verified_checkpoint =
465 Self::get_or_insert_verified_checkpoint(&store, summary, verify)?;
466 let digest = verified_checkpoint.content_digest;
468 contents.verify_digests(digest)?;
469 let verified_contents =
470 VerifiedCheckpointContents::new_unchecked(contents.clone());
471 store
473 .insert_checkpoint_contents(&verified_checkpoint, verified_contents)
474 .map_err(|e| anyhow!("Failed to insert content: {e}"))?;
475 store
477 .update_highest_synced_checkpoint(&verified_checkpoint)
478 .map_err(|e| anyhow!("Failed to update watermark: {e}"))?;
479 txn_counter.fetch_add(contents.size() as u64, Ordering::Relaxed);
480 self.archive_reader_metrics
481 .archive_txns_read
482 .with_label_values(&[&self.bucket])
483 .inc_by(contents.size() as u64);
484 checkpoint_counter.fetch_add(1, Ordering::Relaxed);
485 self.archive_reader_metrics
486 .archive_checkpoints_read
487 .with_label_values(&[&self.bucket])
488 .inc_by(1);
489 Ok::<(), anyhow::Error>(())
490 })
491 });
492 futures::future::ready(result)
493 })
494 .await
495 }
496
497 pub async fn latest_available_checkpoint(&self) -> Result<CheckpointSequenceNumber> {
499 let manifest = self.manifest.lock().await.clone();
500 manifest
501 .next_checkpoint_seq_num()
502 .checked_sub(1)
503 .context("No checkpoint data in archive")
504 }
505
506 pub fn use_for_pruning_watermark(&self) -> bool {
507 self.use_for_pruning_watermark
508 }
509
510 pub fn remote_store_identifier(&self) -> String {
511 self.remote_object_store.to_string()
512 }
513
514 pub async fn sync_manifest_once(&self) -> Result<()> {
516 Self::sync_manifest(self.remote_object_store.clone(), self.manifest.clone()).await?;
517 Ok(())
518 }
519
520 pub async fn get_manifest(&self) -> Result<Manifest> {
521 Ok(self.manifest.lock().await.clone())
522 }
523
524 async fn sync_manifest(
526 remote_store: Arc<dyn ObjectStoreGetExt>,
527 manifest: Arc<Mutex<Manifest>>,
528 ) -> Result<()> {
529 let new_manifest = read_manifest(remote_store.clone()).await?;
530 let mut locked = manifest.lock().await;
531 *locked = new_manifest;
532 Ok(())
533 }
534
535 fn insert_certified_checkpoint<S>(
537 store: &S,
538 certified_checkpoint: CertifiedCheckpointSummary,
539 ) -> Result<()>
540 where
541 S: WriteStore + Clone,
542 {
543 store
544 .insert_checkpoint(VerifiedCheckpoint::new_unchecked(certified_checkpoint).borrow())
545 .map_err(|e| anyhow!("Failed to insert checkpoint: {e}"))
546 }
547
548 fn get_or_insert_verified_checkpoint<S>(
550 store: &S,
551 certified_checkpoint: CertifiedCheckpointSummary,
552 verify: bool,
553 ) -> Result<VerifiedCheckpoint>
554 where
555 S: WriteStore + Clone,
556 {
557 store
558 .get_checkpoint_by_sequence_number(certified_checkpoint.sequence_number)
559 .map_err(|e| anyhow!("Store op failed: {e}"))?
560 .map(Ok::<VerifiedCheckpoint, anyhow::Error>)
561 .unwrap_or_else(|| {
562 let verified_checkpoint = if verify {
563 let prev_checkpoint_seq_num = certified_checkpoint
565 .sequence_number
566 .checked_sub(1)
567 .context("Checkpoint seq num underflow")?;
568 let prev_checkpoint = store
569 .get_checkpoint_by_sequence_number(prev_checkpoint_seq_num)
570 .map_err(|e| anyhow!("Store op failed: {e}"))?
571 .context(format!(
572 "Missing previous checkpoint {} in store",
573 prev_checkpoint_seq_num
574 ))?;
575
576 verify_checkpoint(&prev_checkpoint, store, certified_checkpoint)
577 .map_err(|_| anyhow!("Checkpoint verification failed"))?
578 } else {
579 VerifiedCheckpoint::new_unchecked(certified_checkpoint)
580 };
581 store
583 .insert_checkpoint(&verified_checkpoint)
584 .map_err(|e| anyhow!("Failed to insert checkpoint: {e}"))?;
585 store
587 .update_highest_verified_checkpoint(&verified_checkpoint)
588 .expect("store operation should not fail");
589 Ok::<VerifiedCheckpoint, anyhow::Error>(verified_checkpoint)
590 })
591 .map_err(|e| anyhow!("Failed to get verified checkpoint: {:?}", e))
592 }
593
594 async fn get_summary_files_for_range(
595 &self,
596 checkpoint_range: Range<CheckpointSequenceNumber>,
597 ) -> Result<(Vec<FileMetadata>, usize, usize)> {
598 let manifest = self.manifest.lock().await.clone();
599
600 let latest_available_checkpoint = manifest
601 .next_checkpoint_seq_num()
602 .checked_sub(1)
603 .context("Checkpoint seq num underflow")?;
604
605 if checkpoint_range.start > latest_available_checkpoint {
606 return Err(anyhow!(
607 "Latest available checkpoint is: {}",
608 latest_available_checkpoint
609 ));
610 }
611
612 let summary_files: Vec<FileMetadata> = self
613 .verify_manifest(manifest)
614 .await?
615 .iter()
616 .map(|(s, _)| s.clone())
617 .collect();
618
619 let start_index = match summary_files
620 .binary_search_by_key(&checkpoint_range.start, |s| s.checkpoint_seq_range.start)
621 {
622 Ok(index) => index,
623 Err(index) => index - 1,
624 };
625
626 let end_index = match summary_files
627 .binary_search_by_key(&checkpoint_range.end, |s| s.checkpoint_seq_range.start)
628 {
629 Ok(index) => index,
630 Err(index) => index,
631 };
632
633 Ok((summary_files, start_index, end_index))
634 }
635
636 async fn get_summary_files_for_list(
637 &self,
638 checkpoints: Vec<CheckpointSequenceNumber>,
639 ) -> Result<Vec<FileMetadata>> {
640 assert!(!checkpoints.is_empty());
641 let manifest = self.manifest.lock().await.clone();
642 let latest_available_checkpoint = manifest
643 .next_checkpoint_seq_num()
644 .checked_sub(1)
645 .context("Checkpoint seq num underflow")?;
646
647 let mut ordered_checkpoints = checkpoints;
648 ordered_checkpoints.sort();
649 if *ordered_checkpoints.first().unwrap() > latest_available_checkpoint {
650 return Err(anyhow!(
651 "Latest available checkpoint is: {}",
652 latest_available_checkpoint
653 ));
654 }
655
656 let summary_files: Vec<FileMetadata> = self
657 .verify_manifest(manifest)
658 .await?
659 .iter()
660 .map(|(s, _)| s.clone())
661 .collect();
662
663 let mut summaries_filtered = vec![];
664 for checkpoint in ordered_checkpoints.iter() {
665 let index = summary_files
666 .binary_search_by(|s| {
667 if checkpoint < &s.checkpoint_seq_range.start {
668 std::cmp::Ordering::Greater
669 } else if checkpoint >= &s.checkpoint_seq_range.end {
670 std::cmp::Ordering::Less
671 } else {
672 std::cmp::Ordering::Equal
673 }
674 })
675 .unwrap_or_else(|_| panic!("Archive does not contain checkpoint {checkpoint}"));
676 summaries_filtered.push(summary_files[index].clone());
677 }
678
679 Ok(summaries_filtered)
680 }
681
682 fn spawn_manifest_sync_task<S: ObjectStoreGetExt + Clone>(
683 remote_store: S,
684 manifest: Arc<Mutex<Manifest>>,
685 mut recv: oneshot::Receiver<()>,
686 ) {
687 tokio::task::spawn(async move {
688 let mut interval = tokio::time::interval(Duration::from_secs(60));
689 loop {
690 tokio::select! {
691 _ = interval.tick() => {
692 let new_manifest = read_manifest(remote_store.clone()).await?;
693 let mut locked = manifest.lock().await;
694 *locked = new_manifest;
695 }
696 _ = &mut recv => break,
697 }
698 }
699 info!("Terminating the manifest sync loop");
700 Ok::<(), anyhow::Error>(())
701 });
702 }
703}