iota_data_ingestion_core/reader/
v1.rs1use std::{
6 collections::BTreeMap,
7 path::{Path, PathBuf},
8 sync::Arc,
9 time::Duration,
10};
11
12use backoff::backoff::Backoff;
13use futures::StreamExt;
14use iota_metrics::spawn_monitored_task;
15use iota_types::{
16 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
17};
18use object_store::ObjectStore;
19use tap::pipe::Pipe;
20use tokio::{
21 sync::{
22 mpsc::{self, error::TryRecvError},
23 oneshot,
24 },
25 time::timeout,
26};
27use tracing::{debug, error, info};
28
29use crate::{
30 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
31 reader::fetch::{
32 CheckpointResult, LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store,
33 },
34};
35
36pub struct CheckpointReader {
40 path: PathBuf,
41 remote_store_url: Option<String>,
42 remote_store_options: Vec<(String, String)>,
43 current_checkpoint_number: CheckpointSequenceNumber,
44 last_pruned_watermark: CheckpointSequenceNumber,
45 checkpoint_sender: mpsc::Sender<Arc<CheckpointData>>,
46 processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
47 remote_fetcher_receiver: Option<mpsc::Receiver<CheckpointResult>>,
48 exit_receiver: oneshot::Receiver<()>,
49 options: ReaderOptions,
50 data_limiter: DataLimiter,
51}
52
53impl LocalRead for CheckpointReader {
54 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
55 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
56 || self.data_limiter.exceeds()
57 }
58
59 fn path(&self) -> &Path {
60 &self.path
61 }
62
63 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
64 self.current_checkpoint_number
65 }
66
67 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
68 self.last_pruned_watermark = watermark;
69 }
70}
71
72#[derive(Clone)]
74pub struct ReaderOptions {
75 pub tick_interval_ms: u64,
80 pub timeout_secs: u64,
84 pub batch_size: usize,
89 pub data_limit: usize,
94}
95
96impl Default for ReaderOptions {
97 fn default() -> Self {
98 Self {
99 tick_interval_ms: 100,
100 timeout_secs: 5,
101 batch_size: 10,
102 data_limit: 0,
103 }
104 }
105}
106
107enum RemoteStore {
108 ObjectStore(Box<dyn ObjectStore>),
109 Fullnode(iota_rest_api::Client),
110 Hybrid(Box<dyn ObjectStore>, iota_rest_api::Client),
111}
112
113impl CheckpointReader {
114 async fn remote_fetch_checkpoint_internal(
115 store: &RemoteStore,
116 checkpoint_number: CheckpointSequenceNumber,
117 ) -> CheckpointResult {
118 match store {
119 RemoteStore::ObjectStore(store) => {
120 fetch_from_object_store(store, checkpoint_number).await
121 }
122 RemoteStore::Fullnode(client) => fetch_from_full_node(client, checkpoint_number).await,
123 RemoteStore::Hybrid(store, client) => {
124 match fetch_from_full_node(client, checkpoint_number).await {
125 Ok(result) => Ok(result),
126 Err(_) => fetch_from_object_store(store, checkpoint_number).await,
127 }
128 }
129 }
130 }
131
132 async fn remote_fetch_checkpoint(
133 store: &RemoteStore,
134 checkpoint_number: CheckpointSequenceNumber,
135 ) -> CheckpointResult {
136 let mut backoff = backoff::ExponentialBackoff::default();
137 backoff.max_elapsed_time = Some(Duration::from_secs(60));
138 backoff.initial_interval = Duration::from_millis(100);
139 backoff.current_interval = backoff.initial_interval;
140 backoff.multiplier = 1.0;
141 loop {
142 match Self::remote_fetch_checkpoint_internal(store, checkpoint_number).await {
143 Ok(data) => return Ok(data),
144 Err(err) => match backoff.next_backoff() {
145 Some(duration) => {
146 if !err.to_string().contains("404") {
147 debug!(
148 "remote reader retry in {} ms. Error is {:?}",
149 duration.as_millis(),
150 err
151 );
152 }
153 tokio::time::sleep(duration).await
154 }
155 None => return Err(err),
156 },
157 }
158 }
159 }
160
161 fn start_remote_fetcher(&mut self) -> mpsc::Receiver<CheckpointResult> {
162 let batch_size = self.options.batch_size;
163 let start_checkpoint = self.current_checkpoint_number;
164 let (sender, receiver) = mpsc::channel(batch_size);
165 let url = self
166 .remote_store_url
167 .clone()
168 .expect("remote store url must be set");
169 let store = if let Some((fn_url, remote_url)) = url.split_once('|') {
170 let object_store = create_remote_store_client(
171 remote_url.to_string(),
172 self.remote_store_options.clone(),
173 self.options.timeout_secs,
174 )
175 .expect("failed to create remote store client");
176 RemoteStore::Hybrid(object_store, iota_rest_api::Client::new(fn_url))
177 } else if url.ends_with("/api/v1") {
178 RemoteStore::Fullnode(iota_rest_api::Client::new(url))
179 } else {
180 let object_store = create_remote_store_client(
181 url,
182 self.remote_store_options.clone(),
183 self.options.timeout_secs,
184 )
185 .expect("failed to create remote store client");
186 RemoteStore::ObjectStore(object_store)
187 };
188
189 spawn_monitored_task!(async move {
190 let mut checkpoint_stream = (start_checkpoint..u64::MAX)
191 .map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
192 .pipe(futures::stream::iter)
193 .buffered(batch_size);
194
195 while let Some(checkpoint) = checkpoint_stream.next().await {
196 if sender.send(checkpoint).await.is_err() {
197 info!("remote reader dropped");
198 break;
199 }
200 }
201 });
202 receiver
203 }
204
205 fn remote_fetch(&mut self) -> Vec<Arc<CheckpointData>> {
206 let mut checkpoints = vec![];
207 if self.remote_fetcher_receiver.is_none() {
208 self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
209 }
210 while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
211 match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
212 Ok(Ok((checkpoint, size))) => {
213 self.data_limiter.add(&checkpoint, size);
214 checkpoints.push(checkpoint);
215 }
216 Ok(Err(err)) => {
217 error!("remote reader transient error {:?}", err);
218 self.remote_fetcher_receiver = None;
219 break;
220 }
221 Err(TryRecvError::Disconnected) => {
222 error!("remote reader channel disconnect error");
223 self.remote_fetcher_receiver = None;
224 break;
225 }
226 Err(TryRecvError::Empty) => break,
227 }
228 }
229 checkpoints
230 }
231
232 async fn sync(&mut self) -> IngestionResult<()> {
233 let mut checkpoints = self.read_local_files_with_retry().await?;
234
235 let mut read_source = ReadSource::Local;
236 if self.remote_store_url.is_some()
237 && (checkpoints.is_empty()
238 || checkpoints[0].checkpoint_summary.sequence_number
239 > self.current_checkpoint_number)
240 {
241 checkpoints = self.remote_fetch();
242 read_source = ReadSource::Remote;
243 } else {
244 self.remote_fetcher_receiver = None;
246 }
247
248 info!(
249 "Read from {read_source}. Current checkpoint number: {}, pruning watermark: {}, new updates: {:?}",
250 self.current_checkpoint_number,
251 self.last_pruned_watermark,
252 checkpoints.len(),
253 );
254 for checkpoint in checkpoints {
255 if matches!(read_source, ReadSource::Local)
256 && self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number)
257 {
258 break;
259 }
260 assert_eq!(
261 checkpoint.checkpoint_summary.sequence_number,
262 self.current_checkpoint_number
263 );
264 self.checkpoint_sender.send(checkpoint).await.map_err(|_| {
265 IngestionError::Channel(
266 "unable to send checkpoint to executor, receiver half closed".to_owned(),
267 )
268 })?;
269 self.current_checkpoint_number += 1;
270 }
271 Ok(())
272 }
273
274 pub fn initialize(
275 path: PathBuf,
276 starting_checkpoint_number: CheckpointSequenceNumber,
277 remote_store_url: Option<String>,
278 remote_store_options: Vec<(String, String)>,
279 options: ReaderOptions,
280 ) -> (
281 Self,
282 mpsc::Receiver<Arc<CheckpointData>>,
283 mpsc::Sender<CheckpointSequenceNumber>,
284 oneshot::Sender<()>,
285 ) {
286 let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
287 let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
288 let (exit_sender, exit_receiver) = oneshot::channel();
289 let reader = Self {
290 path,
291 remote_store_url,
292 remote_store_options,
293 current_checkpoint_number: starting_checkpoint_number,
294 last_pruned_watermark: starting_checkpoint_number,
295 checkpoint_sender,
296 processed_receiver,
297 remote_fetcher_receiver: None,
298 exit_receiver,
299 data_limiter: DataLimiter::new(options.data_limit),
300 options,
301 };
302 (reader, checkpoint_recv, processed_sender, exit_sender)
303 }
304
305 pub async fn run(mut self) -> IngestionResult<()> {
306 let (_watcher, mut inotify_recv) = self.setup_directory_watcher();
307 self.data_limiter.gc(self.last_pruned_watermark);
308 self.gc_processed_files(self.last_pruned_watermark)
309 .expect("Failed to clean the directory");
310 loop {
311 tokio::select! {
312 _ = &mut self.exit_receiver => break,
313 Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
314 self.data_limiter.gc(gc_checkpoint_number);
315 self.gc_processed_files(gc_checkpoint_number).expect("Failed to clean the directory");
316 }
317 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.options.tick_interval_ms), inotify_recv.recv()) => {
318 self.sync().await.expect("Failed to read checkpoint files");
319 }
320 }
321 }
322 Ok(())
323 }
324}
325
326pub struct DataLimiter {
334 limit: usize,
337 queue: BTreeMap<CheckpointSequenceNumber, usize>,
339 in_progress: usize,
341}
342
343impl DataLimiter {
344 pub fn new(limit: usize) -> Self {
346 Self {
347 limit,
348 queue: BTreeMap::new(),
349 in_progress: 0,
350 }
351 }
352
353 pub fn exceeds(&self) -> bool {
356 self.limit > 0 && self.in_progress >= self.limit
357 }
358
359 pub fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
361 if self.limit == 0 {
362 return;
363 }
364 self.in_progress += size;
365 self.queue
366 .insert(checkpoint.checkpoint_summary.sequence_number, size);
367 }
368
369 pub fn gc(&mut self, watermark: CheckpointSequenceNumber) {
373 if self.limit == 0 {
374 return;
375 }
376 self.queue = self.queue.split_off(&watermark);
377 self.in_progress = self.queue.values().sum();
378 }
379}