1use std::{
6 fs,
7 ops::Range,
8 path::PathBuf,
9 sync::Arc,
10 time::{Duration, Instant},
11};
12
13use anyhow::{Context, Result};
14use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
15use iota_data_ingestion_core::Worker;
16use iota_rest_api::CheckpointData;
17use iota_storage::object_store::util::{copy_file, path_to_filesystem};
18use iota_types::messages_checkpoint::CheckpointSequenceNumber;
19use object_store::{DynObjectStore, path::Path};
20use serde::Serialize;
21use tokio::sync::{Mutex, mpsc, oneshot};
22use tracing::{error, info};
23
24use crate::{
25 AnalyticsIndexerConfig, EPOCH_DIR_PREFIX, FileMetadata, MaxCheckpointReader, ParquetSchema,
26 analytics_metrics::AnalyticsMetrics, handlers::AnalyticsHandler, join_paths,
27 writers::AnalyticsWriter,
28};
29
30struct State<S: Serialize + ParquetSchema> {
31 current_epoch: u64,
32 current_checkpoint_range: Range<u64>,
33 last_commit_instant: Instant,
34 num_checkpoint_iterations: u64,
35 writer: Box<dyn AnalyticsWriter<S>>,
36}
37
38pub struct AnalyticsProcessor<S: Serialize + ParquetSchema> {
39 handler: Box<dyn AnalyticsHandler<S>>,
40 state: Mutex<State<S>>,
41 metrics: AnalyticsMetrics,
42 config: AnalyticsIndexerConfig,
43 sender: mpsc::Sender<FileMetadata>,
44 #[expect(dead_code)]
45 kill_sender: oneshot::Sender<()>,
46 #[expect(dead_code)]
47 max_checkpoint_sender: oneshot::Sender<()>,
48}
49
50const CHECK_FILE_SIZE_ITERATION_CYCLE: u64 = 50;
51
52#[async_trait::async_trait]
53impl<S: Serialize + ParquetSchema + 'static> Worker for AnalyticsProcessor<S> {
54 type Message = ();
55 type Error = anyhow::Error;
56
57 async fn process_checkpoint(
58 &self,
59 checkpoint_data: Arc<CheckpointData>,
60 ) -> Result<Self::Message, Self::Error> {
61 let epoch: u64 = checkpoint_data.checkpoint_summary.epoch();
64 let checkpoint_num: u64 = *checkpoint_data.checkpoint_summary.sequence_number();
65 let timestamp: u64 = checkpoint_data.checkpoint_summary.data().timestamp_ms;
66 info!("Processing checkpoint {checkpoint_num}, epoch {epoch}, timestamp {timestamp}");
67 let mut state = self.state.lock().await;
68 if epoch > state.current_epoch {
69 self.cut(&mut state).await?;
70 self.update_to_next_epoch(epoch, &mut state);
71 self.create_epoch_dirs(&state)?;
72 self.reset(&mut state)?;
73 }
74
75 assert_eq!(epoch, state.current_epoch);
76
77 assert_eq!(checkpoint_num, state.current_checkpoint_range.end);
78
79 let num_checkpoints_processed =
80 state.current_checkpoint_range.end - state.current_checkpoint_range.start;
81 let cut_new_files = (num_checkpoints_processed >= self.config.checkpoint_interval)
82 || (state.last_commit_instant.elapsed().as_secs() > self.config.time_interval_s)
83 || (state.num_checkpoint_iterations % CHECK_FILE_SIZE_ITERATION_CYCLE == 0
84 && state.writer.file_size()?.unwrap_or(0)
85 > self.config.max_file_size_mb * 1024 * 1024);
86 if cut_new_files {
87 self.cut(&mut state).await?;
88 self.reset(&mut state)?;
89 }
90 self.metrics
91 .total_received
92 .with_label_values(&[self.name()])
93 .inc();
94 self.handler.process_checkpoint(checkpoint_data).await?;
95 let rows = self.handler.read().await?;
96 state.writer.write(&rows)?;
97 state.current_checkpoint_range.end = state
98 .current_checkpoint_range
99 .end
100 .checked_add(1)
101 .context("Checkpoint sequence num overflow")?;
102 state.num_checkpoint_iterations += 1;
103 Ok(())
104 }
105}
106
107impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
108 pub async fn new(
109 handler: Box<dyn AnalyticsHandler<S>>,
110 writer: Box<dyn AnalyticsWriter<S>>,
111 max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
112 next_checkpoint_seq_num: CheckpointSequenceNumber,
113 metrics: AnalyticsMetrics,
114 config: AnalyticsIndexerConfig,
115 ) -> Result<Self> {
116 let local_store_config = ObjectStoreConfig {
117 directory: Some(config.checkpoint_dir.clone()),
118 object_store: Some(ObjectStoreType::File),
119 ..Default::default()
120 };
121 let local_object_store = local_store_config.make()?;
122 let remote_object_store = config.remote_store_config.make()?;
123 let (kill_sender, kill_receiver) = oneshot::channel::<()>();
124 let (sender, receiver) = mpsc::channel::<FileMetadata>(100);
125 let name: String = handler.name().parse()?;
126 let checkpoint_dir = config.checkpoint_dir.clone();
127 let cloned_metrics = metrics.clone();
128 tokio::task::spawn(Self::start_syncing_with_remote(
129 remote_object_store,
130 local_object_store.clone(),
131 checkpoint_dir,
132 config.remote_store_path_prefix.clone(),
133 receiver,
134 kill_receiver,
135 cloned_metrics,
136 name.clone(),
137 ));
138 let (max_checkpoint_sender, max_checkpoint_receiver) = oneshot::channel::<()>();
139 tokio::task::spawn(Self::setup_max_checkpoint_metrics_updates(
140 max_checkpoint_reader,
141 metrics.clone(),
142 max_checkpoint_receiver,
143 name,
144 ));
145 let state = State {
146 current_epoch: 0,
147 current_checkpoint_range: next_checkpoint_seq_num..next_checkpoint_seq_num,
148 last_commit_instant: Instant::now(),
149 num_checkpoint_iterations: 0,
150 writer,
151 };
152 Ok(Self {
153 handler,
154 state: Mutex::new(state),
155 kill_sender,
156 sender,
157 max_checkpoint_sender,
158 metrics,
159 config,
160 })
161 }
162
163 fn name(&self) -> &str {
164 self.handler.name()
165 }
166
167 async fn cut(&self, state: &mut State<S>) -> anyhow::Result<()> {
168 if !state.current_checkpoint_range.is_empty()
169 && state.writer.flush(state.current_checkpoint_range.end)?
170 {
171 let file_metadata = FileMetadata::new(
172 self.config.file_type,
173 self.config.file_format,
174 state.current_epoch,
175 state.current_checkpoint_range.clone(),
176 );
177 self.sender.send(file_metadata).await?;
178 tokio::task::yield_now().await;
179 }
180 Ok(())
181 }
182
183 fn update_to_next_epoch(&self, epoch: u64, state: &mut State<S>) {
184 state.current_epoch = epoch;
185 }
186
187 fn epoch_dir(&self, state: &State<S>) -> Result<PathBuf> {
188 let path = path_to_filesystem(
189 self.config.checkpoint_dir.to_path_buf(),
190 &self.config.file_type.dir_prefix(),
191 )?
192 .join(format!("{}{}", EPOCH_DIR_PREFIX, state.current_epoch));
193 Ok(path)
194 }
195
196 fn create_epoch_dirs(&self, state: &State<S>) -> Result<()> {
197 let epoch_dir = self.epoch_dir(state)?;
198 if epoch_dir.exists() {
199 fs::remove_dir_all(&epoch_dir)?;
200 }
201 fs::create_dir_all(&epoch_dir)?;
202 Ok(())
203 }
204
205 fn reset(&self, state: &mut State<S>) -> Result<()> {
206 self.reset_checkpoint_range(state);
207 state
208 .writer
209 .reset(state.current_epoch, state.current_checkpoint_range.start)?;
210 self.reset_last_commit_ts(state);
211 Ok(())
212 }
213
214 fn reset_checkpoint_range(&self, state: &mut State<S>) {
215 state.current_checkpoint_range =
216 state.current_checkpoint_range.end..state.current_checkpoint_range.end
217 }
218
219 fn reset_last_commit_ts(&self, state: &mut State<S>) {
220 state.last_commit_instant = Instant::now();
221 }
222
223 async fn start_syncing_with_remote(
224 remote_object_store: Arc<DynObjectStore>,
225 local_object_store: Arc<DynObjectStore>,
226 local_staging_root_dir: PathBuf,
227 remote_store_path_prefix: Option<Path>,
228 mut file_recv: mpsc::Receiver<FileMetadata>,
229 mut recv: oneshot::Receiver<()>,
230 metrics: AnalyticsMetrics,
231 name: String,
232 ) -> Result<()> {
233 loop {
234 tokio::select! {
235 _ = &mut recv => break,
236 file = file_recv.recv() => {
237 if let Some(file_metadata) = file {
238 info!("Received {name} file with checkpoints: {:?}", &file_metadata.checkpoint_seq_range);
239 let checkpoint_seq_num = file_metadata.checkpoint_seq_range.end;
240 Self::sync_file_to_remote(
241 local_staging_root_dir.clone(),
242 file_metadata.file_path(),
243 remote_store_path_prefix.clone(),
244 local_object_store.clone(),
245 remote_object_store.clone()
246 )
247 .await
248 .expect("Syncing checkpoint should not fail");
249 metrics.last_uploaded_checkpoint.with_label_values(&[&name]).set(checkpoint_seq_num as i64);
250 } else {
251 info!("Terminating upload sync loop");
252 break;
253 }
254 },
255 }
256 }
257 Ok(())
258 }
259
260 async fn setup_max_checkpoint_metrics_updates(
261 max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
262 analytics_metrics: AnalyticsMetrics,
263 mut recv: oneshot::Receiver<()>,
264 handler_name: String,
265 ) -> Result<()> {
266 let mut interval = tokio::time::interval(Duration::from_secs(300));
267 loop {
268 tokio::select! {
269 _ = &mut recv => break,
270 _ = interval.tick() => {
271 let max_checkpoint = max_checkpoint_reader.max_checkpoint().await;
272 if let Ok(max_checkpoint) = max_checkpoint {
273 analytics_metrics
274 .max_checkpoint_on_store
275 .with_label_values(&[&handler_name])
276 .set(max_checkpoint);
277 } else {
278 error!("Failed to read max checkpoint for {} with err: {}", handler_name, max_checkpoint.unwrap_err());
279 }
280
281 }
282 }
283 }
284 Ok(())
285 }
286
287 async fn sync_file_to_remote(
288 dir: PathBuf,
289 path: Path,
290 prefix: Option<Path>,
291 from: Arc<DynObjectStore>,
292 to: Arc<DynObjectStore>,
293 ) -> Result<()> {
294 let remote_dest = join_paths(prefix, &path);
295 info!("Syncing file to remote: {:?}", &remote_dest);
296 copy_file(&path, &remote_dest, &from, &to).await?;
297 fs::remove_file(path_to_filesystem(dir, &path)?)?;
298 Ok(())
299 }
300}