iota_data_ingestion_core/reader/
v1.rs1use std::{
6 collections::BTreeMap,
7 fs,
8 path::{Path, PathBuf},
9 sync::Arc,
10 time::Duration,
11};
12
13use backoff::backoff::Backoff;
14use futures::StreamExt;
15use iota_metrics::spawn_monitored_task;
16use iota_types::{
17 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
18};
19use object_store::ObjectStore;
20use tap::pipe::Pipe;
21use tokio::{
22 sync::{
23 mpsc::{self, error::TryRecvError},
24 oneshot,
25 },
26 time::timeout,
27};
28use tracing::{debug, error, info};
29
30#[cfg(not(target_os = "macos"))]
31use crate::reader::fetch::init_watcher;
32use crate::{
33 IngestionError, IngestionResult, MAX_CHECKPOINTS_IN_PROGRESS, create_remote_store_client,
34 reader::fetch::{
35 CheckpointResult, LocalRead, ReadSource, fetch_from_full_node, fetch_from_object_store,
36 },
37};
38
39pub struct CheckpointReader {
43 path: PathBuf,
44 remote_store_url: Option<String>,
45 remote_store_options: Vec<(String, String)>,
46 current_checkpoint_number: CheckpointSequenceNumber,
47 last_pruned_watermark: CheckpointSequenceNumber,
48 checkpoint_sender: mpsc::Sender<Arc<CheckpointData>>,
49 processed_receiver: mpsc::Receiver<CheckpointSequenceNumber>,
50 remote_fetcher_receiver: Option<mpsc::Receiver<CheckpointResult>>,
51 exit_receiver: oneshot::Receiver<()>,
52 options: ReaderOptions,
53 data_limiter: DataLimiter,
54}
55
56impl LocalRead for CheckpointReader {
57 fn exceeds_capacity(&self, checkpoint_number: CheckpointSequenceNumber) -> bool {
58 ((MAX_CHECKPOINTS_IN_PROGRESS as u64 + self.last_pruned_watermark) <= checkpoint_number)
59 || self.data_limiter.exceeds()
60 }
61
62 fn path(&self) -> &Path {
63 &self.path
64 }
65
66 fn current_checkpoint_number(&self) -> CheckpointSequenceNumber {
67 self.current_checkpoint_number
68 }
69
70 fn update_last_pruned_watermark(&mut self, watermark: CheckpointSequenceNumber) {
71 self.last_pruned_watermark = watermark;
72 }
73}
74
75#[derive(Clone)]
77pub struct ReaderOptions {
78 pub tick_interval_ms: u64,
83 pub timeout_secs: u64,
87 pub batch_size: usize,
92 pub data_limit: usize,
97}
98
99impl Default for ReaderOptions {
100 fn default() -> Self {
101 Self {
102 tick_interval_ms: 100,
103 timeout_secs: 5,
104 batch_size: 10,
105 data_limit: 0,
106 }
107 }
108}
109
110enum RemoteStore {
111 ObjectStore(Box<dyn ObjectStore>),
112 Fullnode(iota_rest_api::Client),
113 Hybrid(Box<dyn ObjectStore>, iota_rest_api::Client),
114}
115
116impl CheckpointReader {
117 async fn remote_fetch_checkpoint_internal(
118 store: &RemoteStore,
119 checkpoint_number: CheckpointSequenceNumber,
120 ) -> CheckpointResult {
121 match store {
122 RemoteStore::ObjectStore(store) => {
123 fetch_from_object_store(store, checkpoint_number).await
124 }
125 RemoteStore::Fullnode(client) => fetch_from_full_node(client, checkpoint_number).await,
126 RemoteStore::Hybrid(store, client) => {
127 match fetch_from_full_node(client, checkpoint_number).await {
128 Ok(result) => Ok(result),
129 Err(_) => fetch_from_object_store(store, checkpoint_number).await,
130 }
131 }
132 }
133 }
134
135 async fn remote_fetch_checkpoint(
136 store: &RemoteStore,
137 checkpoint_number: CheckpointSequenceNumber,
138 ) -> CheckpointResult {
139 let mut backoff = backoff::ExponentialBackoff::default();
140 backoff.max_elapsed_time = Some(Duration::from_secs(60));
141 backoff.initial_interval = Duration::from_millis(100);
142 backoff.current_interval = backoff.initial_interval;
143 backoff.multiplier = 1.0;
144 loop {
145 match Self::remote_fetch_checkpoint_internal(store, checkpoint_number).await {
146 Ok(data) => return Ok(data),
147 Err(err) => match backoff.next_backoff() {
148 Some(duration) => {
149 if !err.to_string().contains("404") {
150 debug!(
151 "remote reader retry in {} ms. Error is {err:?}",
152 duration.as_millis(),
153 );
154 }
155 tokio::time::sleep(duration).await
156 }
157 None => return Err(err),
158 },
159 }
160 }
161 }
162
163 fn start_remote_fetcher(&mut self) -> mpsc::Receiver<CheckpointResult> {
164 let batch_size = self.options.batch_size;
165 let start_checkpoint = self.current_checkpoint_number;
166 let (sender, receiver) = mpsc::channel(batch_size);
167 let url = self
168 .remote_store_url
169 .clone()
170 .expect("remote store url must be set");
171 let store = if let Some((fn_url, remote_url)) = url.split_once('|') {
172 let object_store = create_remote_store_client(
173 remote_url.to_string(),
174 self.remote_store_options.clone(),
175 self.options.timeout_secs,
176 )
177 .expect("failed to create remote store client");
178 RemoteStore::Hybrid(object_store, iota_rest_api::Client::new(fn_url))
179 } else if url.ends_with("/api/v1") {
180 RemoteStore::Fullnode(iota_rest_api::Client::new(url))
181 } else {
182 let object_store = create_remote_store_client(
183 url,
184 self.remote_store_options.clone(),
185 self.options.timeout_secs,
186 )
187 .expect("failed to create remote store client");
188 RemoteStore::ObjectStore(object_store)
189 };
190
191 spawn_monitored_task!(async move {
192 let mut checkpoint_stream = (start_checkpoint..u64::MAX)
193 .map(|checkpoint_number| Self::remote_fetch_checkpoint(&store, checkpoint_number))
194 .pipe(futures::stream::iter)
195 .buffered(batch_size);
196
197 while let Some(checkpoint) = checkpoint_stream.next().await {
198 if sender.send(checkpoint).await.is_err() {
199 info!("remote reader dropped");
200 break;
201 }
202 }
203 });
204 receiver
205 }
206
207 fn remote_fetch(&mut self) -> Vec<Arc<CheckpointData>> {
208 let mut checkpoints = vec![];
209 if self.remote_fetcher_receiver.is_none() {
210 self.remote_fetcher_receiver = Some(self.start_remote_fetcher());
211 }
212 while !self.exceeds_capacity(self.current_checkpoint_number + checkpoints.len() as u64) {
213 match self.remote_fetcher_receiver.as_mut().unwrap().try_recv() {
214 Ok(Ok((checkpoint, size))) => {
215 self.data_limiter.add(&checkpoint, size);
216 checkpoints.push(checkpoint);
217 }
218 Ok(Err(err)) => {
219 error!("remote reader transient error {:?}", err);
220 self.remote_fetcher_receiver = None;
221 break;
222 }
223 Err(TryRecvError::Disconnected) => {
224 error!("remote reader channel disconnect error");
225 self.remote_fetcher_receiver = None;
226 break;
227 }
228 Err(TryRecvError::Empty) => break,
229 }
230 }
231 checkpoints
232 }
233
234 async fn sync(&mut self) -> IngestionResult<()> {
235 let mut checkpoints = self.read_local_files_with_retry().await?;
236
237 let mut read_source = ReadSource::Local;
238 if self.remote_store_url.is_some()
239 && (checkpoints.is_empty()
240 || checkpoints[0].checkpoint_summary.sequence_number
241 > self.current_checkpoint_number)
242 {
243 checkpoints = self.remote_fetch();
244 read_source = ReadSource::Remote;
245 } else {
246 self.remote_fetcher_receiver = None;
248 }
249
250 info!(
251 "Read from {read_source}. Current checkpoint number: {}, pruning watermark: {}, new updates: {:?}",
252 self.current_checkpoint_number,
253 self.last_pruned_watermark,
254 checkpoints.len(),
255 );
256 for checkpoint in checkpoints {
257 if matches!(read_source, ReadSource::Local)
258 && self.is_checkpoint_ahead(&checkpoint, self.current_checkpoint_number)
259 {
260 break;
261 }
262 assert_eq!(
263 checkpoint.checkpoint_summary.sequence_number,
264 self.current_checkpoint_number
265 );
266 self.checkpoint_sender.send(checkpoint).await.map_err(|_| {
267 IngestionError::Channel(
268 "unable to send checkpoint to executor, receiver half closed".to_owned(),
269 )
270 })?;
271 self.current_checkpoint_number += 1;
272 }
273 Ok(())
274 }
275
276 pub fn initialize(
277 path: PathBuf,
278 starting_checkpoint_number: CheckpointSequenceNumber,
279 remote_store_url: Option<String>,
280 remote_store_options: Vec<(String, String)>,
281 options: ReaderOptions,
282 ) -> (
283 Self,
284 mpsc::Receiver<Arc<CheckpointData>>,
285 mpsc::Sender<CheckpointSequenceNumber>,
286 oneshot::Sender<()>,
287 ) {
288 let (checkpoint_sender, checkpoint_recv) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
289 let (processed_sender, processed_receiver) = mpsc::channel(MAX_CHECKPOINTS_IN_PROGRESS);
290 let (exit_sender, exit_receiver) = oneshot::channel();
291 let reader = Self {
292 path,
293 remote_store_url,
294 remote_store_options,
295 current_checkpoint_number: starting_checkpoint_number,
296 last_pruned_watermark: starting_checkpoint_number,
297 checkpoint_sender,
298 processed_receiver,
299 remote_fetcher_receiver: None,
300 exit_receiver,
301 data_limiter: DataLimiter::new(options.data_limit),
302 options,
303 };
304 (reader, checkpoint_recv, processed_sender, exit_sender)
305 }
306
307 pub async fn run(mut self) -> IngestionResult<()> {
308 let (_inotify_sender, mut inotify_recv) = mpsc::channel::<()>(1);
309 fs::create_dir_all(self.path()).expect("failed to create a directory");
310
311 #[cfg(not(target_os = "macos"))]
312 let _watcher = init_watcher(_inotify_sender, self.path());
313
314 self.data_limiter.gc(self.last_pruned_watermark);
315 self.gc_processed_files(self.last_pruned_watermark)
316 .expect("failed to clean the directory");
317 loop {
318 tokio::select! {
319 _ = &mut self.exit_receiver => break,
320 Some(gc_checkpoint_number) = self.processed_receiver.recv() => {
321 self.data_limiter.gc(gc_checkpoint_number);
322 self.gc_processed_files(gc_checkpoint_number).expect("failed to clean the directory");
323 }
324 Ok(Some(_)) | Err(_) = timeout(Duration::from_millis(self.options.tick_interval_ms), inotify_recv.recv()) => {
325 self.sync().await.expect("failed to read checkpoint files");
326 }
327 }
328 }
329 Ok(())
330 }
331}
332
333pub struct DataLimiter {
341 limit: usize,
344 queue: BTreeMap<CheckpointSequenceNumber, usize>,
346 in_progress: usize,
348}
349
350impl DataLimiter {
351 pub fn new(limit: usize) -> Self {
353 Self {
354 limit,
355 queue: BTreeMap::new(),
356 in_progress: 0,
357 }
358 }
359
360 pub fn exceeds(&self) -> bool {
363 self.limit > 0 && self.in_progress >= self.limit
364 }
365
366 pub fn add(&mut self, checkpoint: &CheckpointData, size: usize) {
368 if self.limit == 0 {
369 return;
370 }
371 self.in_progress += size;
372 self.queue
373 .insert(checkpoint.checkpoint_summary.sequence_number, size);
374 }
375
376 pub fn gc(&mut self, watermark: CheckpointSequenceNumber) {
380 if self.limit == 0 {
381 return;
382 }
383 self.queue = self.queue.split_off(&watermark);
384 self.in_progress = self.queue.values().sum();
385 }
386}