iota_analytics_indexer/
analytics_processor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fs,
7    ops::Range,
8    path::PathBuf,
9    sync::Arc,
10    time::{Duration, Instant},
11};
12
13use anyhow::{Context, Result};
14use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
15use iota_data_ingestion_core::Worker;
16use iota_rest_api::CheckpointData;
17use iota_storage::object_store::util::{copy_file, path_to_filesystem};
18use iota_types::messages_checkpoint::CheckpointSequenceNumber;
19use object_store::{DynObjectStore, path::Path};
20use serde::Serialize;
21use tokio::sync::{Mutex, mpsc, oneshot};
22use tracing::{error, info};
23
24use crate::{
25    AnalyticsIndexerConfig, EPOCH_DIR_PREFIX, FileMetadata, MaxCheckpointReader, ParquetSchema,
26    analytics_metrics::AnalyticsMetrics, handlers::AnalyticsHandler, join_paths,
27    writers::AnalyticsWriter,
28};
29
30struct State<S: Serialize + ParquetSchema> {
31    current_epoch: u64,
32    current_checkpoint_range: Range<u64>,
33    last_commit_instant: Instant,
34    num_checkpoint_iterations: u64,
35    writer: Box<dyn AnalyticsWriter<S>>,
36}
37
38pub struct AnalyticsProcessor<S: Serialize + ParquetSchema> {
39    handler: Box<dyn AnalyticsHandler<S>>,
40    state: Mutex<State<S>>,
41    metrics: AnalyticsMetrics,
42    config: AnalyticsIndexerConfig,
43    sender: mpsc::Sender<FileMetadata>,
44    #[expect(dead_code)]
45    kill_sender: oneshot::Sender<()>,
46    #[expect(dead_code)]
47    max_checkpoint_sender: oneshot::Sender<()>,
48}
49
50const CHECK_FILE_SIZE_ITERATION_CYCLE: u64 = 50;
51
52#[async_trait::async_trait]
53impl<S: Serialize + ParquetSchema + 'static> Worker for AnalyticsProcessor<S> {
54    type Message = ();
55    type Error = anyhow::Error;
56
57    async fn process_checkpoint(
58        &self,
59        checkpoint_data: Arc<CheckpointData>,
60    ) -> Result<Self::Message, Self::Error> {
61        // get epoch id, checkpoint sequence number and timestamp, those are important
62        // indexes when operating on data
63        let epoch: u64 = checkpoint_data.checkpoint_summary.epoch();
64        let checkpoint_num: u64 = *checkpoint_data.checkpoint_summary.sequence_number();
65        let timestamp: u64 = checkpoint_data.checkpoint_summary.data().timestamp_ms;
66        info!("Processing checkpoint {checkpoint_num}, epoch {epoch}, timestamp {timestamp}");
67        let mut state = self.state.lock().await;
68        if epoch > state.current_epoch {
69            self.cut(&mut state).await?;
70            self.update_to_next_epoch(epoch, &mut state);
71            self.create_epoch_dirs(&state)?;
72            self.reset(&mut state)?;
73        }
74
75        assert_eq!(epoch, state.current_epoch);
76
77        assert_eq!(checkpoint_num, state.current_checkpoint_range.end);
78
79        let num_checkpoints_processed =
80            state.current_checkpoint_range.end - state.current_checkpoint_range.start;
81        let cut_new_files = (num_checkpoints_processed >= self.config.checkpoint_interval)
82            || (state.last_commit_instant.elapsed().as_secs() > self.config.time_interval_s)
83            || (state.num_checkpoint_iterations % CHECK_FILE_SIZE_ITERATION_CYCLE == 0
84                && state.writer.file_size()?.unwrap_or(0)
85                    > self.config.max_file_size_mb * 1024 * 1024);
86        if cut_new_files {
87            self.cut(&mut state).await?;
88            self.reset(&mut state)?;
89        }
90        self.metrics
91            .total_received
92            .with_label_values(&[self.name()])
93            .inc();
94        self.handler.process_checkpoint(checkpoint_data).await?;
95        let rows = self.handler.read().await?;
96        state.writer.write(&rows)?;
97        state.current_checkpoint_range.end = state
98            .current_checkpoint_range
99            .end
100            .checked_add(1)
101            .context("Checkpoint sequence num overflow")?;
102        state.num_checkpoint_iterations += 1;
103        Ok(())
104    }
105}
106
107impl<S: Serialize + ParquetSchema + 'static> AnalyticsProcessor<S> {
108    pub async fn new(
109        handler: Box<dyn AnalyticsHandler<S>>,
110        writer: Box<dyn AnalyticsWriter<S>>,
111        max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
112        next_checkpoint_seq_num: CheckpointSequenceNumber,
113        metrics: AnalyticsMetrics,
114        config: AnalyticsIndexerConfig,
115    ) -> Result<Self> {
116        let local_store_config = ObjectStoreConfig {
117            directory: Some(config.checkpoint_dir.clone()),
118            object_store: Some(ObjectStoreType::File),
119            ..Default::default()
120        };
121        let local_object_store = local_store_config.make()?;
122        let remote_object_store = config.remote_store_config.make()?;
123        let (kill_sender, kill_receiver) = oneshot::channel::<()>();
124        let (sender, receiver) = mpsc::channel::<FileMetadata>(100);
125        let name: String = handler.name().parse()?;
126        let checkpoint_dir = config.checkpoint_dir.clone();
127        let cloned_metrics = metrics.clone();
128        tokio::task::spawn(Self::start_syncing_with_remote(
129            remote_object_store,
130            local_object_store.clone(),
131            checkpoint_dir,
132            config.remote_store_path_prefix.clone(),
133            receiver,
134            kill_receiver,
135            cloned_metrics,
136            name.clone(),
137        ));
138        let (max_checkpoint_sender, max_checkpoint_receiver) = oneshot::channel::<()>();
139        tokio::task::spawn(Self::setup_max_checkpoint_metrics_updates(
140            max_checkpoint_reader,
141            metrics.clone(),
142            max_checkpoint_receiver,
143            name,
144        ));
145        let state = State {
146            current_epoch: 0,
147            current_checkpoint_range: next_checkpoint_seq_num..next_checkpoint_seq_num,
148            last_commit_instant: Instant::now(),
149            num_checkpoint_iterations: 0,
150            writer,
151        };
152        Ok(Self {
153            handler,
154            state: Mutex::new(state),
155            kill_sender,
156            sender,
157            max_checkpoint_sender,
158            metrics,
159            config,
160        })
161    }
162
163    fn name(&self) -> &str {
164        self.handler.name()
165    }
166
167    async fn cut(&self, state: &mut State<S>) -> anyhow::Result<()> {
168        if !state.current_checkpoint_range.is_empty()
169            && state.writer.flush(state.current_checkpoint_range.end)?
170        {
171            let file_metadata = FileMetadata::new(
172                self.config.file_type,
173                self.config.file_format,
174                state.current_epoch,
175                state.current_checkpoint_range.clone(),
176            );
177            self.sender.send(file_metadata).await?;
178            tokio::task::yield_now().await;
179        }
180        Ok(())
181    }
182
183    fn update_to_next_epoch(&self, epoch: u64, state: &mut State<S>) {
184        state.current_epoch = epoch;
185    }
186
187    fn epoch_dir(&self, state: &State<S>) -> Result<PathBuf> {
188        let path = path_to_filesystem(
189            self.config.checkpoint_dir.to_path_buf(),
190            &self.config.file_type.dir_prefix(),
191        )?
192        .join(format!("{}{}", EPOCH_DIR_PREFIX, state.current_epoch));
193        Ok(path)
194    }
195
196    fn create_epoch_dirs(&self, state: &State<S>) -> Result<()> {
197        let epoch_dir = self.epoch_dir(state)?;
198        if epoch_dir.exists() {
199            fs::remove_dir_all(&epoch_dir)?;
200        }
201        fs::create_dir_all(&epoch_dir)?;
202        Ok(())
203    }
204
205    fn reset(&self, state: &mut State<S>) -> Result<()> {
206        self.reset_checkpoint_range(state);
207        state
208            .writer
209            .reset(state.current_epoch, state.current_checkpoint_range.start)?;
210        self.reset_last_commit_ts(state);
211        Ok(())
212    }
213
214    fn reset_checkpoint_range(&self, state: &mut State<S>) {
215        state.current_checkpoint_range =
216            state.current_checkpoint_range.end..state.current_checkpoint_range.end
217    }
218
219    fn reset_last_commit_ts(&self, state: &mut State<S>) {
220        state.last_commit_instant = Instant::now();
221    }
222
223    async fn start_syncing_with_remote(
224        remote_object_store: Arc<DynObjectStore>,
225        local_object_store: Arc<DynObjectStore>,
226        local_staging_root_dir: PathBuf,
227        remote_store_path_prefix: Option<Path>,
228        mut file_recv: mpsc::Receiver<FileMetadata>,
229        mut recv: oneshot::Receiver<()>,
230        metrics: AnalyticsMetrics,
231        name: String,
232    ) -> Result<()> {
233        loop {
234            tokio::select! {
235                _ = &mut recv => break,
236                file = file_recv.recv() => {
237                    if let Some(file_metadata) = file {
238                        info!("Received {name} file with checkpoints: {:?}", &file_metadata.checkpoint_seq_range);
239                        let checkpoint_seq_num = file_metadata.checkpoint_seq_range.end;
240                        Self::sync_file_to_remote(
241                                local_staging_root_dir.clone(),
242                                file_metadata.file_path(),
243                                remote_store_path_prefix.clone(),
244                                local_object_store.clone(),
245                                remote_object_store.clone()
246                            )
247                            .await
248                            .expect("Syncing checkpoint should not fail");
249                        metrics.last_uploaded_checkpoint.with_label_values(&[&name]).set(checkpoint_seq_num as i64);
250                    } else {
251                        info!("Terminating upload sync loop");
252                        break;
253                    }
254                },
255            }
256        }
257        Ok(())
258    }
259
260    async fn setup_max_checkpoint_metrics_updates(
261        max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
262        analytics_metrics: AnalyticsMetrics,
263        mut recv: oneshot::Receiver<()>,
264        handler_name: String,
265    ) -> Result<()> {
266        let mut interval = tokio::time::interval(Duration::from_secs(300));
267        loop {
268            tokio::select! {
269                _ = &mut recv => break,
270                 _ = interval.tick() => {
271                    let max_checkpoint = max_checkpoint_reader.max_checkpoint().await;
272                    if let Ok(max_checkpoint) = max_checkpoint {
273                        analytics_metrics
274                            .max_checkpoint_on_store
275                            .with_label_values(&[&handler_name])
276                            .set(max_checkpoint);
277                    } else {
278                        error!("Failed to read max checkpoint for {} with err: {}", handler_name, max_checkpoint.unwrap_err());
279                    }
280
281                 }
282            }
283        }
284        Ok(())
285    }
286
287    async fn sync_file_to_remote(
288        dir: PathBuf,
289        path: Path,
290        prefix: Option<Path>,
291        from: Arc<DynObjectStore>,
292        to: Arc<DynObjectStore>,
293    ) -> Result<()> {
294        let remote_dest = join_paths(prefix, &path);
295        info!("Syncing file to remote: {:?}", &remote_dest);
296        copy_file(&path, &remote_dest, &from, &to).await?;
297        fs::remove_file(path_to_filesystem(dir, &path)?)?;
298        Ok(())
299    }
300}