iota_data_ingestion_core/reader/
v2.rs1use std::{
5 num::NonZeroUsize,
6 path::{Path, PathBuf},
7 sync::Arc,
8 time::Duration,
9};
10
11use backoff::backoff::Backoff;
12use futures::StreamExt;
13use iota_config::{
14 node::ArchiveReaderConfig,
15 object_storage_config::{ObjectStoreConfig, ObjectStoreType},
16};
17use iota_metrics::spawn_monitored_task;
18use iota_rest_api::CheckpointData;
19use iota_types::messages_checkpoint::CheckpointSequenceNumber;
20use object_store::ObjectStore;
21use serde::{Deserialize, Serialize};
22use tap::Pipe;
23use tokio::{
24 sync::{
25 mpsc::{self},
26 oneshot,
27 },
28 task::JoinHandle,
29 time::timeout,
30};
31use tracing::{debug, error, info};
32
33#[cfg(not(target_os = "macos"))]
34use crate::reader::fetch::init_watcher;
35use crate::{
36 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
37 history::reader::HistoricalReader,
38 reader::{
39 fetch::{LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store},
40 v1::{DataLimiter, ReaderOptions},
41 },
42};
43
44#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
51pub enum RemoteUrl {
52 Fullnode(String),
60 HybridHistoricalStore {
63 historical_url: String,
71 live_url: Option<String>,
79 },
80}
81
82enum RemoteStore {
88 Fullnode(iota_rest_api::Client),
89 HybridHistoricalStore {
90 historical: HistoricalReader,
91 live: Option<Box<dyn ObjectStore>>,
92 },
93}
94
95impl RemoteStore {
96 async fn new(
97 remote_url: RemoteUrl,
98 batch_size: usize,
99 timeout_secs: u64,
100 ) -> IngestionResult<Self> {
101 let store = match remote_url {
102 RemoteUrl::Fullnode(url) => RemoteStore::Fullnode(iota_rest_api::Client::new(url)),
103 RemoteUrl::HybridHistoricalStore {
104 historical_url,
105 live_url,
106 } => {
107 let config = ArchiveReaderConfig {
108 download_concurrency: NonZeroUsize::new(batch_size)
109 .expect("batch size must be greater than zero"),
110 remote_store_config: ObjectStoreConfig {
111 object_store: Some(ObjectStoreType::S3),
112 object_store_connection_limit: 20,
113 aws_endpoint: Some(historical_url),
114 aws_virtual_hosted_style_request: true,
115 no_sign_request: true,
116 ..Default::default()
117 },
118 use_for_pruning_watermark: false,
119 };
120 let historical = HistoricalReader::new(config)
121 .inspect_err(|e| error!("unable to instantiate historical reader: {e}"))?;
122
123 let live = live_url
124 .map(|url| create_remote_store_client(url, Default::default(), timeout_secs))
125 .transpose()?;
126
127 RemoteStore::HybridHistoricalStore { historical, live }
128 }
129 };
130 Ok(store)
131 }
132}
133
134#[derive(Default, Clone)]
137pub struct CheckpointReaderConfig {
138 pub reader_options: ReaderOptions,
140 pub ingestion_path: Option<PathBuf>,
143 pub remote_store_url: Option<RemoteUrl>,
145}
146
147struct CheckpointReaderActor {
171 path: PathBuf,
173 current_checkpoint_number: CheckpointSequenceNumber,
175 last_pruned_watermark: CheckpointSequenceNumber,
178 checkpoint_tx: mpsc::Sender<Arc<CheckpointData>>,
180 gc_signal_rx: mpsc::Receiver<CheckpointSequenceNumber>,
183 remote_store: Option<Arc<RemoteStore>>,
185 shutdown_rx: oneshot::Receiver<()>,
187 reader_options: ReaderOptions,
189 data_limiter: DataLimiter,
191}
192
193impl LocalRead for CheckpointReaderActor {
194 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
195 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
196 || self.data_limiter.exceeds()
197 }
198
199 fn path(&self) -> &Path {
200 &self.path
201 }
202
203 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
204 self.current_checkpoint_number
205 }
206
207 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
208 self.last_pruned_watermark = watermark;
209 }
210}
211
212impl CheckpointReaderActor {
213 fn should_fetch_from_remote(&self, checkpoints: &[Arc<CheckpointData>]) -> bool {
214 self.remote_store.is_some()
215 && (checkpoints.is_empty()
216 || self.is_checkpoint_ahead(&checkpoints[0], self.current_checkpoint_number))
217 }
218
219 async fn fetch_historical(
222 &mut self,
223 historical_reader: &HistoricalReader,
224 ) -> IngestionResult<()> {
225 if self.current_checkpoint_number > historical_reader.latest_available_checkpoint().await? {
229 timeout(
230 Duration::from_secs(self.reader_options.timeout_secs),
231 historical_reader.sync_manifest_once(),
232 )
233 .await
234 .map_err(|_| {
235 IngestionError::HistoryRead("reading manifest exceeded the timeout".into())
236 })??;
237
238 if self.current_checkpoint_number
241 > historical_reader.latest_available_checkpoint().await?
242 {
243 return Err(IngestionError::CheckpointNotAvailableYet);
244 }
245 }
246
247 let manifest = historical_reader.get_manifest().await;
248
249 let files = historical_reader.verify_and_get_manifest_files(manifest)?;
250
251 let start_index = match files.binary_search_by_key(&self.current_checkpoint_number, |s| {
252 s.checkpoint_seq_range.start
253 }) {
254 Ok(index) => index,
255 Err(index) => index - 1,
256 };
257
258 for metadata in files
259 .into_iter()
260 .enumerate()
261 .filter_map(|(index, metadata)| (index >= start_index).then_some(metadata))
262 {
263 let checkpoints = timeout(
264 Duration::from_secs(self.reader_options.timeout_secs),
265 historical_reader.iter_for_file(metadata.file_path()),
266 )
267 .await
268 .map_err(|_| {
269 IngestionError::HistoryRead(format!(
270 "reading checkpoint {} exceeded the timeout",
271 metadata.file_path()
272 ))
273 })??
274 .filter(|c| c.checkpoint_summary.sequence_number >= self.current_checkpoint_number)
275 .collect::<Vec<CheckpointData>>();
276
277 for checkpoint in checkpoints {
278 let size = bcs::serialized_size(&checkpoint)?;
279 self.send_remote_checkpoint_with_capacity_check(Arc::new(checkpoint), size)
280 .await?;
281 }
282 }
283
284 Ok(())
285 }
286
287 async fn fetch_and_send_to_channel(&mut self) -> IngestionResult<()> {
294 let Some(remote_store) = self.remote_store.as_ref().map(Arc::clone) else {
295 return Ok(());
296 };
297 let batch_size = self.reader_options.batch_size;
298 match remote_store.as_ref() {
299 RemoteStore::Fullnode(client) => {
300 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
301 .map(|checkpoint_number| fetch_from_full_node(client, checkpoint_number))
302 .pipe(futures::stream::iter)
303 .buffered(batch_size);
304
305 while let Some(checkpoint_result) = checkpoint_stream.next().await {
306 let (checkpoint, size) = checkpoint_result?;
307 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
308 .await?;
309 }
310 }
311 RemoteStore::HybridHistoricalStore { historical, live } => {
312 if let Err(err) = self.fetch_historical(historical).await {
313 if matches!(err, IngestionError::CheckpointNotAvailableYet) {
314 let live = live.as_ref().ok_or(err)?;
315 let mut checkpoint_stream = (self.current_checkpoint_number..u64::MAX)
316 .map(|checkpoint_number| {
317 fetch_from_object_store(live, checkpoint_number)
318 })
319 .pipe(futures::stream::iter)
320 .buffered(batch_size);
321
322 while let Some(checkpoint_result) = checkpoint_stream.next().await {
323 let (checkpoint, size) = checkpoint_result?;
324 self.send_remote_checkpoint_with_capacity_check(checkpoint, size)
325 .await?;
326 }
327 return Ok(());
328 }
329 return Err(err);
330 }
331 }
332 };
333 Ok(())
334 }
335
336 async fn fetch_and_send_to_channel_with_retry(&mut self) {
340 let mut backoff = backoff::ExponentialBackoff::default();
341 backoff.max_elapsed_time = Some(Duration::from_secs(60));
342 backoff.initial_interval = Duration::from_millis(100);
343 backoff.current_interval = backoff.initial_interval;
344 backoff.multiplier = 1.0;
345
346 loop {
347 match self.fetch_and_send_to_channel().await {
348 Ok(_) => break,
349 Err(IngestionError::MaxCheckpointsCapacityReached) => break,
350 Err(IngestionError::CheckpointNotAvailableYet) => {
351 break info!("historical reader does not have the requested checkpoint yet");
352 }
353 Err(err) => match backoff.next_backoff() {
354 Some(duration) => {
355 if !err.to_string().to_lowercase().contains("not found") {
356 debug!(
357 "remote reader retry in {} ms. Error is {err:?}",
358 duration.as_millis(),
359 );
360 }
361 tokio::time::sleep(duration).await
362 }
363 None => {
364 break error!("remote reader transient error {err:?}");
365 }
366 },
367 }
368 }
369 }
370
371 async fn send_remote_checkpoint_with_capacity_check(
379 &mut self,
380 checkpoint: Arc<CheckpointData>,
381 size: usize,
382 ) -> IngestionResult<()> {
383 if self.exceeds_capacity(checkpoint.checkpoint_summary.sequence_number) {
384 return Err(IngestionError::MaxCheckpointsCapacityReached);
385 }
386 self.data_limiter.add(&checkpoint, size);
387 self.send_checkpoint_to_channel(checkpoint).await
388 }
389
390 async fn send_local_checkpoints_to_channel(
397 &mut self,
398 checkpoints: Vec<Arc<CheckpointData>>,
399 ) -> IngestionResult<()> {
400 for checkpoint in checkpoints {
401 if self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number) {
402 break;
403 }
404 self.send_checkpoint_to_channel(checkpoint).await?;
405 }
406 Ok(())
407 }
408
409 async fn send_checkpoint_to_channel(
416 &mut self,
417 checkpoint: Arc<CheckpointData>,
418 ) -> IngestionResult<()> {
419 assert_eq!(
420 checkpoint.checkpoint_summary.sequence_number,
421 self.current_checkpoint_number
422 );
423 self.checkpoint_tx.send(checkpoint).await.map_err(|_| {
424 IngestionError::Channel(
425 "unable to send checkpoint to executor, receiver half closed".to_owned(),
426 )
427 })?;
428 self.current_checkpoint_number += 1;
429 Ok(())
430 }
431
432 async fn sync(&mut self) -> IngestionResult<()> {
435 let mut remote_source = ReadSource::Local;
436 let checkpoints = self.read_local_files_with_retry().await?;
437 let should_fetch_from_remote = self.should_fetch_from_remote(&checkpoints);
438
439 if should_fetch_from_remote {
440 remote_source = ReadSource::Remote;
441 self.fetch_and_send_to_channel_with_retry().await;
442 } else {
443 self.send_local_checkpoints_to_channel(checkpoints).await?;
444 }
445
446 info!(
447 "Read from {remote_source}. Current checkpoint number: {}, pruning watermark: {}",
448 self.current_checkpoint_number, self.last_pruned_watermark,
449 );
450
451 Ok(())
452 }
453
454 async fn run(mut self) {
456 let (_inotify_tx, mut inotify_rx) = mpsc::channel::<()>(1);
457 std::fs::create_dir_all(self.path()).expect("failed to create a directory");
458
459 #[cfg(not(target_os = "macos"))]
460 let _watcher = init_watcher(_inotify_tx, self.path());
461
462 self.data_limiter.gc(self.last_pruned_watermark);
463 self.gc_processed_files(self.last_pruned_watermark)
464 .expect("failed to clean the directory");
465
466 loop {
467 tokio::select! {
468 _ = &mut self.shutdown_rx => break,
469 Some(watermark) = self.gc_signal_rx.recv() => {
470 self.data_limiter.gc(watermark);
471 self.gc_processed_files(watermark).expect("failed to clean the directory");
472 }
473 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.reader_options.tick_interval_ms), inotify_rx.recv()) => {
474 self.sync().await.expect("failed to read checkpoint files");
475 }
476 }
477 }
478 }
479}
480
481pub(crate) struct CheckpointReader {
488 handle: JoinHandle<()>,
489 shutdown_tx: oneshot::Sender<()>,
490 gc_signal_tx: mpsc::Sender<CheckpointSequenceNumber>,
491 checkpoint_rx: mpsc::Receiver<Arc<CheckpointData>>,
492}
493
494impl CheckpointReader {
495 pub(crate) async fn new(
496 starting_checkpoint_number: CheckpointSequenceNumber,
497 config: CheckpointReaderConfig,
498 ) -> IngestionResult<Self> {
499 let (checkpoint_tx, checkpoint_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
500 let (gc_signal_tx, gc_signal_rx) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
501 let (shutdown_tx, shutdown_rx) = oneshot::channel();
502
503 let remote_store = if let Some(url) = config.remote_store_url {
504 Some(Arc::new(
505 RemoteStore::new(
506 url,
507 config.reader_options.batch_size,
508 config.reader_options.timeout_secs,
509 )
510 .await?,
511 ))
512 } else {
513 None
514 };
515
516 let path = match config.ingestion_path {
517 Some(p) => p,
518 None => tempfile::tempdir()?.keep(),
519 };
520
521 let reader = CheckpointReaderActor {
522 path,
523 current_checkpoint_number: starting_checkpoint_number,
524 last_pruned_watermark: starting_checkpoint_number,
525 checkpoint_tx,
526 gc_signal_rx,
527 remote_store,
528 shutdown_rx,
529 data_limiter: DataLimiter::new(config.reader_options.data_limit),
530 reader_options: config.reader_options,
531 };
532
533 let handle = spawn_monitored_task!(reader.run());
534
535 Ok(Self {
536 handle,
537 gc_signal_tx,
538 shutdown_tx,
539 checkpoint_rx,
540 })
541 }
542
543 pub(crate) async fn checkpoint(&mut self) -> Option<Arc<CheckpointData>> {
545 self.checkpoint_rx.recv().await
546 }
547
548 pub(crate) async fn send_gc_signal(
555 &self,
556 watermark: CheckpointSequenceNumber,
557 ) -> IngestionResult<()> {
558 self.gc_signal_tx.send(watermark).await.map_err(|_| {
559 IngestionError::Channel(
560 "unable to send GC operation to checkpoint reader, receiver half closed".into(),
561 )
562 })
563 }
564
565 pub(crate) async fn shutdown(self) -> IngestionResult<()> {
571 _ = self.shutdown_tx.send(());
572 self.handle.await.map_err(|err| IngestionError::Shutdown {
573 component: "checkpoint reader".into(),
574 msg: err.to_string(),
575 })
576 }
577}