1use std::{
5 num::NonZeroUsize,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::{StreamExt, TryStreamExt};
13use iota_config::{
14 node::ArchiveReaderConfig,
15 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_grpc_client::Client as GrpcClient;
18use iota_metrics::spawn_monitored_task;
19use iota_types::{
20 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
21};
22use object_store::ObjectStore;
23use serde::{Deserialize, Serialize};
24use tap::Pipe;
25use tokio::{
26 sync::mpsc::{self},
27 task::JoinHandle,
28 time::timeout,
29};
30use tokio_util::sync::CancellationToken;
31use tracing::{debug, error, info};
32
33#[cfg(not(target_os = "macos"))]
34use crate::reader::fetch::init_watcher;
35use crate::{
36 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
37 history::reader::HistoricalReader,
38 reader::{
39 ReaderOptions,
40 common::DataLimiter,
41 fetch::{
42 GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES, LocalRead, ReadSource, fetch_from_object_store,
43 },
44 },
45};
46
47#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
54pub enum RemoteUrl {
55 Fullnode(String),
63 HybridHistoricalStore {
66 historical_url: String,
74 live_url: Option<String>,
82 },
83}
84
85enum RemoteStore {
91 Fullnode(GrpcClient),
92 HybridHistoricalStore {
93 historical: HistoricalReader,
94 live: Option<Box<dyn ObjectStore>>,
95 },
96}
97
98impl RemoteStore {
99 async fn new(
100 remote_url: RemoteUrl,
101 batch_size: usize,
102 timeout_secs: u64,
103 ) -> IngestionResult<Self> {
104 let store = match remote_url {
105 RemoteUrl::Fullnode(ref url) => {
106 let grpc_client = GrpcClient::connect(url).await.map(|client| {
107 client.with_max_decoding_message_size(GRPC_MAX_DECODING_MESSAGE_SIZE_BYTES)
108 })?;
109 RemoteStore::Fullnode(grpc_client)
110 }
111 RemoteUrl::HybridHistoricalStore {
112 historical_url,
113 live_url,
114 } => {
115 let config = ArchiveReaderConfig {
116 download_concurrency: NonZeroUsize::new(batch_size)
117 .expect("batch size must be greater than zero"),
118 remote_store_config: ObjectStoreConfig {
119 object_store: Some(ObjectStoreType::S3),
120 object_store_connection_limit: 20,
121 aws_endpoint: Some(historical_url),
122 aws_virtual_hosted_style_request: true,
123 no_sign_request: true,
124 ..Default::default()
125 },
126 use_for_pruning_watermark: false,
127 };
128 let historical = HistoricalReader::new(config)
129 .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
130
131 let live = live_url
132 .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
133 .transpose()?;
134
135 RemoteStore::HybridHistoricalStore { historical, live }
136 }
137 };
138 Ok(store)
139 }
140}
141
142#[derive(Default, Clone)]
145pub struct CheckpointReaderConfig {
146 pub reader_options: ReaderOptions,
148 pub ingestion_path: Option<PathBuf>,
151 pub remote_store_url: Option<RemoteUrl>,
153}
154
155struct CheckpointReaderActor {
179 path: PathBuf,
181 current_checkpoint_number: CheckpointSequenceNumber,
183 last_pruned_watermark: CheckpointSequenceNumber,
186 checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
188 gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
191 remote_store: Option<Arc<RemoteStore>>,
193 token: CancellationToken,
195 reader_options: ReaderOptions,
197 data_limiter: DataLimiter,
199}
200
201impl LocalRead for CheckpointReaderActor {
202 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
203 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
204 || self.data_limiter.exceeds()
205 }
206
207 fn path(&self) -> &Path {
208 &self.path
209 }
210
211 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
212 self.current_checkpoint_number
213 }
214
215 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
216 self.last_pruned_watermark = watermark;
217 }
218}
219
220impl CheckpointReaderActor {
221 fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
222 self.remote_store.is_some()
223 && (checkpoints.is_empty()
224 || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
225 }
226
227 async fn relay_from_historical(
230 &mut self,
231 historical_reader: &HistoricalReader,
232 ) -> IngestionResult<()> {
233 if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
237 timeout(
238 Duration::from_secs(self.reader_options.timeout_secs),
239 historical_reader.sync_manifest_once(),
240 )
241 .await
242 .map_err(|_| {
243 IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
244 })??;
245
246 if self.current_checkpoint_number
249 > historical_reader.latest_available_checkpoint().await?
250 {
251 return Err(IngestionError::CheckpointNotAvailableYet);
252 }
253 }
254
255 let manifest = historical_reader.get_manifest().await;
256
257 let files = historical_reader.verify_and_get_manifest_files(manifest)?;
258
259 let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
260 s.checkpoint_seq_range.start
261 }) {
262 Ok(index) => index,
263 Err(index) => index - 1,
264 };
265
266 for metadata in files
267 .into_iter()
268 .enumerate()
269 .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
270 {
271 let checkpoints = timeout(
272 Duration::from_secs(self.reader_options.timeout_secs),
273 historical_reader.iter_for_file(metadata.file_path()),
274 )
275 .await
276 .map_err(|_| {
277 IngestionError::HistoryRead(format!(
278 "reading checkpoint {} exceeded the timeout",
279 metadata.file_path()
280 ))
281 })??
282 .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
283 .collect::<Vec<CheckpointData>>();
284
285 for checkpoint in checkpoints {
286 let size = bcs::serialized_size(&checkpoint)?;
287 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
288 .await?;
289 }
290 }
291
292 Ok(())
293 }
294
295 async fn relay_from_live(
298 &mut self,
299 batch_size: usize,
300 live: &dyn ObjectStore,
301 ) -> IngestionResult<()> {
302 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
303 .map(|checkpoint_number| fetch_from_object_store(live, checkpoint_number))
304 .pipe(futures::stream::iter)
305 .buffered(batch_size);
306 while let Some((checkpoint, size)) = self
307 .token
308 .run_until_cancelled(checkpoint_stream.try_next())
309 .await
310 .transpose()?
311 .flatten()
312 {
313 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
314 .await?;
315 }
316 Ok(())
317 }
318
319 async fn relay_from_fullnode(&mut self, client: &mut GrpcClient) -> IngestionResult<()> {
322 let mut checkpoints_stream = client
323 .stream_checkpoints(
324 Some(self.current_checkpoint_number),
325 None,
326 Some(iota_grpc_client::CHECKPOINT_RESPONSE_CHECKPOINT_DATA),
327 None,
328 None,
329 )
330 .await
331 .map_err(|e| {
332 IngestionError::Grpc(format!("failed to initialize the checkpoint stream: {e}"))
333 })?
334 .into_inner();
335
336 while let Some(grpc_checkpoint) = self
337 .token
338 .run_until_cancelled(checkpoints_stream.try_next())
339 .await
340 .transpose()?
341 .flatten()
342 {
343 let checkpoint = grpc_checkpoint.checkpoint_data()?.try_into()?;
344 let size = bcs::serialized_size(&checkpoint)?;
345 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
346 .await?;
347 }
348
349 Ok(())
350 }
351
352 async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
359 let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
360 return Ok(());
361 };
362 let batch_size = self.reader_options.batch_size;
363 match remote_store.as_ref() {
364 RemoteStore::Fullnode(client) => {
365 self.relay_from_fullnode(&mut client.clone()).await?;
366 }
367 RemoteStore::HybridHistoricalStore { historical, live } => {
368 if let Some(Err(err)) = self
369 .token
370 .clone()
371 .run_until_cancelled(self.relay_from_historical(historical))
372 .await
373 {
374 if matches!(err, IngestionError::CheckpointNotAvailableYet) {
375 let live = live.as_ref().ok_or(err)?;
376 return self.relay_from_live(batch_size, live).await;
377 }
378 return Err(err);
379 }
380 }
381 };
382 Ok(())
383 }
384
385 async fn fetch_and_send_to_channel_with_retry(&mut self) {
389 let mut backoff = backoff::ExponentialBackoff::default();
390 backoff.max_elapsed_time = Some(Duration::from_secs(60));
391 backoff.initial_interval = Duration::from_millis(100);
392 backoff.current_interval = backoff.initial_interval;
393 backoff.multiplier = 1.0;
394
395 loop {
396 match self.fetch_and_send_to_channel().await {
397 Ok(_) => break,
398 Err(IngestionError::MaxCheckpointsCapacityReached) => break,
399 Err(IngestionError::CheckpointNotAvailableYet) => {
400 break info!("historical reader does not have the requested checkpoint yet");
401 }
402 Err(err) => match backoff.next_backoff() {
403 Some(duration) => {
404 if !err.to_string().to_lowercase().contains("not found") {
405 debug!(
406 "remote reader retry in {} ms. Error is {err:?}",
407 duration.as_millis(),
408 );
409 }
410 if self
411 .token
412 .run_until_cancelled(tokio::time::sleep(duration))
413 .await
414 .is_none()
415 {
416 break;
417 }
418 }
419 None => {
420 break error!("remote reader transient error {err:?}");
421 }
422 },
423 }
424 }
425 }
426
427 async fn send_remote_checkpoint_with_capacity_check(
435 &mut self,
436 checkpoint: Arc<CheckpointData>,
437 size: usize,
438 ) -> IngestionResult<()> {
439 if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
440 return Err(IngestionError::MaxCheckpointsCapacityReached);
441 }
442 self.data_limiter.add(&checkpoint, size);
443 self.send_checkpoint_to_channel(checkpoint).await
444 }
445
446 async fn send_local_checkpoints_to_channel(
453 &mut self,
454 checkpoints: Vec<Arc<CheckpointData>>,
455 ) -> IngestionResult<()> {
456 for checkpoint in checkpoints {
457 if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
458 break;
459 }
460 self.send_checkpoint_to_channel(checkpoint).await?;
461 }
462 Ok(())
463 }
464
465 async fn send_checkpoint_to_channel(
472 &mut self,
473 checkpoint: Arc<CheckpointData>,
474 ) -> IngestionResult<()> {
475 assert_eq!(
476 checkpoint.checkpoint_summary.sequence_number,
477 self.current_checkpoint_number
478 );
479 self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
480 IngestionError::Channel(
481 "unable to send checkpoint to executor, receiver half closed".to_owned(),
482 )
483 })?;
484 self.current_checkpoint_number += 1;
485 Ok(())
486 }
487
488 async fn sync(&mut self) -> IngestionResult<()> {
491 let mut remote_source = ReadSource::Local;
492 let checkpoints = self.read_local_files_with_retry().await?;
493 let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
494
495 if should_fetch_from_remote {
496 remote_source = ReadSource::Remote;
497 self.fetch_and_send_to_channel_with_retry().await;
498 } else {
499 self.send_local_checkpoints_to_channel(checkpoints).await?;
500 }
501
502 info!(
503 "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
504 self.current_checkpoint_number, self.last_pruned_watermark,
505 );
506
507 Ok(())
508 }
509
510 async fn run(mut self) {
512 let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
513 std::fs::create_dir_all(self.path()).expect("failed to create a directory");
514
515 #[cfg(not(target_os = "macos"))]
516 let _watcher = init_watcher(_inotify_tx, self.path());
517
518 self.data_limiter.gc(self.last_pruned_watermark);
519 self.gc_processed_files(self.last_pruned_watermark)
520 .expect("failed to clean the directory");
521
522 loop {
523 tokio::select! {
524 _ = self.token.cancelled() => break,
525 Some(watermark) = self.gc_signal_rx.recv() => {
526 self.data_limiter.gc(watermark);
527 self.gc_processed_files(watermark).expect("failed to clean the directory");
528 }
529 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv()) => {
530 self.sync().await.expect("failed to read checkpoint files");
531 }
532 }
533 }
534 }
535}
536
537pub(crate) struct CheckpointReader {
544 handle: JoinHandle<()>,
545 gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
546 checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
547 token: CancellationToken,
548}
549
550impl CheckpointReader {
551 pub(crate) async fn new(
552 starting_checkpoint_number: CheckpointSequenceNumber,
553 config: CheckpointReaderConfig,
554 ) -> IngestionResult<Self> {
555 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
556 let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
557
558 let remote_store = if let Some(url) = config.remote_store_url {
559 Some(Arc::new(
560 RemoteStore::new(
561 url,
562 config.reader_options.batch_size,
563 config.reader_options.timeout_secs,
564 )
565 .await?,
566 ))
567 } else {
568 None
569 };
570
571 let path = match config.ingestion_path {
572 Some(p) => p,
573 None => tempfile::tempdir()?.keep(),
574 };
575 let token = CancellationToken::new();
576 let reader = CheckpointReaderActor {
577 path,
578 current_checkpoint_number: starting_checkpoint_number,
579 last_pruned_watermark: starting_checkpoint_number,
580 checkpoint_tx,
581 gc_signal_rx,
582 remote_store,
583 token: token.clone(),
584 data_limiter: DataLimiter::new(config.reader_options.data_limit),
585 reader_options: config.reader_options,
586 };
587
588 let handle = spawn_monitored_task!(reader.run());
589
590 Ok(Self {
591 handle,
592 gc_signal_tx,
593 checkpoint_rx,
594 token,
595 })
596 }
597
598 pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
600 self.checkpoint_rx.recv().await
601 }
602
603 pub(crate) async fn send_gc_signal(
610 &self,
611 watermark: CheckpointSequenceNumber,
612 ) -> IngestionResult<()> {
613 self.gc_signal_tx.send(watermark).await.map_err(|_| {
614 IngestionError::Channel(
615 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
616 )
617 })
618 }
619
620 pub(crate) async fn shutdown(self) -> IngestionResult<()> {
626 self.token.cancel();
627 self.handle.await.map_err(|err| IngestionError::Shutdown {
628 component: "checkpoint reader".into(),
629 msg: err.to_string(),
630 })
631 }
632}