iota_analytics_indexer/
lib.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{ops::Range, path::PathBuf, sync::Arc};
6
7use anyhow::{Result, anyhow};
8use arrow_array::Int32Array;
9use clap::*;
10use gcp_bigquery_client::{Client, model::query_request::QueryRequest};
11use iota_config::object_storage_config::ObjectStoreConfig;
12use iota_data_ingestion_core::Worker;
13use iota_rest_api::CheckpointData;
14use iota_storage::object_store::util::{
15    find_all_dirs_with_epoch_prefix, find_all_files_with_epoch_prefix,
16};
17use iota_types::{
18    base_types::EpochId, dynamic_field::DynamicFieldType,
19    messages_checkpoint::CheckpointSequenceNumber,
20};
21use num_enum::{IntoPrimitive, TryFromPrimitive};
22use object_store::path::Path;
23use serde::{Deserialize, Serialize};
24use snowflake_api::{QueryResult, SnowflakeApi};
25use strum::EnumIter;
26use tracing::info;
27
28use crate::{
29    analytics_metrics::AnalyticsMetrics,
30    analytics_processor::AnalyticsProcessor,
31    handlers::{
32        AnalyticsHandler, checkpoint_handler::CheckpointHandler, df_handler::DynamicFieldHandler,
33        event_handler::EventHandler, move_call_handler::MoveCallHandler,
34        object_handler::ObjectHandler, package_handler::PackageHandler,
35        transaction_handler::TransactionHandler,
36        transaction_objects_handler::TransactionObjectsHandler,
37        wrapped_object_handler::WrappedObjectHandler,
38    },
39    tables::{
40        CheckpointEntry, DynamicFieldEntry, EventEntry, InputObjectKind, MoveCallEntry,
41        MovePackageEntry, ObjectEntry, ObjectStatus, OwnerType, TransactionEntry,
42        TransactionObjectEntry, WrappedObjectEntry,
43    },
44    writers::{AnalyticsWriter, csv_writer::CSVWriter, parquet_writer::ParquetWriter},
45};
46
47pub mod analytics_metrics;
48pub mod analytics_processor;
49pub mod errors;
50mod handlers;
51mod package_store;
52pub mod tables;
53mod writers;
54
55const EPOCH_DIR_PREFIX: &str = "epoch_";
56const CHECKPOINT_DIR_PREFIX: &str = "checkpoints";
57const OBJECT_DIR_PREFIX: &str = "objects";
58const TRANSACTION_DIR_PREFIX: &str = "transactions";
59const EVENT_DIR_PREFIX: &str = "events";
60const TRANSACTION_OBJECT_DIR_PREFIX: &str = "transaction_objects";
61const MOVE_CALL_PREFIX: &str = "move_call";
62const MOVE_PACKAGE_PREFIX: &str = "move_package";
63const DYNAMIC_FIELD_PREFIX: &str = "dynamic_field";
64
65const WRAPPED_OBJECT_PREFIX: &str = "wrapped_object";
66
67#[derive(Parser, Clone, Debug)]
68#[command(
69    name = "IOTA Analytics Indexer",
70    about = "Indexer service to upload data for the analytics pipeline."
71)]
72pub struct AnalyticsIndexerConfig {
73    /// The url of the checkpoint client to connect to.
74    #[arg(long)]
75    pub rest_url: String,
76    /// The url of the metrics client to connect to.
77    #[arg(long, default_value = "127.0.0.1", global = true)]
78    pub client_metric_host: String,
79    /// The port of the metrics client to connect to.
80    #[arg(long, default_value = "8081", global = true)]
81    pub client_metric_port: u16,
82    /// Directory to contain the temporary files for checkpoint entries.
83    #[arg(long, global = true, default_value = "/tmp")]
84    pub checkpoint_dir: PathBuf,
85    /// Number of checkpoints to process before uploading to the datastore.
86    #[arg(long, default_value = "10000", global = true)]
87    pub checkpoint_interval: u64,
88    /// Maximum file size in mb before uploading to the datastore.
89    #[arg(long, default_value = "100", global = true)]
90    pub max_file_size_mb: u64,
91    /// Checkpoint sequence number to start the download from
92    #[arg(long, default_value = None, global = true)]
93    pub starting_checkpoint_seq_num: Option<u64>,
94    /// Time to process in seconds before uploading to the datastore.
95    #[arg(long, default_value = "600", global = true)]
96    pub time_interval_s: u64,
97    // Remote object store where data gets written to
98    #[command(flatten)]
99    pub remote_store_config: ObjectStoreConfig,
100    // Remote object store path prefix to use while writing
101    #[arg(long, default_value = None, global = true)]
102    pub remote_store_path_prefix: Option<Path>,
103    // File format to store data in i.e. csv, parquet, etc
104    #[arg(long, value_enum, default_value = "csv", global = true)]
105    pub file_format: FileFormat,
106    // Type of data to write i.e. checkpoint, object, transaction, etc
107    #[arg(long, value_enum, long, global = true)]
108    pub file_type: FileType,
109    #[arg(
110        long,
111        default_value = "https://checkpoints.mainnet.iota.cafe",
112        global = true
113    )]
114    pub remote_store_url: String,
115    // Directory to contain the package cache for pipelines
116    #[arg(
117        long,
118        value_enum,
119        long,
120        global = true,
121        default_value = "/opt/iota/db/package_cache"
122    )]
123    pub package_cache_path: PathBuf,
124    #[arg(long, default_value = None, global = true)]
125    pub bq_service_account_key_file: Option<String>,
126    #[arg(long, default_value = None, global = true)]
127    pub bq_project_id: Option<String>,
128    #[arg(long, default_value = None, global = true)]
129    pub bq_dataset_id: Option<String>,
130    #[arg(long, default_value = None, global = true)]
131    pub bq_table_id: Option<String>,
132    #[arg(long, default_value = None, global = true)]
133    pub bq_checkpoint_col_id: Option<String>,
134    #[arg(long, global = true)]
135    pub report_bq_max_table_checkpoint: bool,
136    #[arg(long, default_value = None, global = true)]
137    pub sf_account_identifier: Option<String>,
138    #[arg(long, default_value = None, global = true)]
139    pub sf_warehouse: Option<String>,
140    #[arg(long, default_value = None, global = true)]
141    pub sf_database: Option<String>,
142    #[arg(long, default_value = None, global = true)]
143    pub sf_schema: Option<String>,
144    #[arg(long, default_value = None, global = true)]
145    pub sf_username: Option<String>,
146    #[arg(long, default_value = None, global = true)]
147    pub sf_role: Option<String>,
148    #[arg(long, default_value = None, global = true)]
149    pub sf_password: Option<String>,
150    #[arg(long, default_value = None, global = true)]
151    pub sf_table_id: Option<String>,
152    #[arg(long, default_value = None, global = true)]
153    pub sf_checkpoint_col_id: Option<String>,
154    #[arg(long, global = true)]
155    pub report_sf_max_table_checkpoint: bool,
156}
157
158#[async_trait::async_trait]
159pub trait MaxCheckpointReader: Send + Sync + 'static {
160    async fn max_checkpoint(&self) -> Result<i64>;
161}
162
163struct SnowflakeMaxCheckpointReader {
164    query: String,
165    api: SnowflakeApi,
166}
167
168impl SnowflakeMaxCheckpointReader {
169    pub async fn new(
170        account_identifier: &str,
171        warehouse: &str,
172        database: &str,
173        schema: &str,
174        user: &str,
175        role: &str,
176        passwd: &str,
177        table_id: &str,
178        col_id: &str,
179    ) -> anyhow::Result<Self> {
180        let api = SnowflakeApi::with_password_auth(
181            account_identifier,
182            Some(warehouse),
183            Some(database),
184            Some(schema),
185            user,
186            Some(role),
187            passwd,
188        )
189        .expect("Failed to build sf api client");
190        Ok(SnowflakeMaxCheckpointReader {
191            query: format!("SELECT max({}) from {}", col_id, table_id),
192            api,
193        })
194    }
195}
196
197#[async_trait::async_trait]
198impl MaxCheckpointReader for SnowflakeMaxCheckpointReader {
199    async fn max_checkpoint(&self) -> Result<i64> {
200        let res = self.api.exec(&self.query).await?;
201        match res {
202            QueryResult::Arrow(a) => {
203                if let Some(record_batch) = a.first() {
204                    let col = record_batch.column(0);
205                    let col_array = col
206                        .as_any()
207                        .downcast_ref::<Int32Array>()
208                        .expect("Failed to downcast arrow column");
209                    Ok(col_array.value(0) as i64)
210                } else {
211                    Ok(-1)
212                }
213            }
214            QueryResult::Json(_j) => Err(anyhow!("Unexpected query result")),
215            QueryResult::Empty => Err(anyhow!("Unexpected query result")),
216        }
217    }
218}
219
220struct BQMaxCheckpointReader {
221    query: String,
222    project_id: String,
223    client: Client,
224}
225
226impl BQMaxCheckpointReader {
227    pub async fn new(
228        key_path: &str,
229        project_id: &str,
230        dataset_id: &str,
231        table_id: &str,
232        col_id: &str,
233    ) -> anyhow::Result<Self> {
234        Ok(BQMaxCheckpointReader {
235            query: format!(
236                "SELECT max({}) from `{}.{}.{}`",
237                col_id, project_id, dataset_id, table_id
238            ),
239            client: Client::from_service_account_key_file(key_path).await?,
240            project_id: project_id.to_string(),
241        })
242    }
243}
244
245#[async_trait::async_trait]
246impl MaxCheckpointReader for BQMaxCheckpointReader {
247    async fn max_checkpoint(&self) -> Result<i64> {
248        let mut result = self
249            .client
250            .job()
251            .query(&self.project_id, QueryRequest::new(&self.query))
252            .await?;
253        if result.next_row() {
254            let max_checkpoint = result.get_i64(0)?.ok_or(anyhow!("No rows returned"))?;
255            Ok(max_checkpoint)
256        } else {
257            Ok(-1)
258        }
259    }
260}
261
262struct NoOpCheckpointReader;
263
264#[async_trait::async_trait]
265impl MaxCheckpointReader for NoOpCheckpointReader {
266    async fn max_checkpoint(&self) -> Result<i64> {
267        Ok(-1)
268    }
269}
270
271#[derive(
272    Copy,
273    Clone,
274    Debug,
275    Eq,
276    PartialEq,
277    Parser,
278    strum::Display,
279    ValueEnum,
280    Serialize,
281    Deserialize,
282    TryFromPrimitive,
283    IntoPrimitive,
284    EnumIter,
285)]
286#[repr(u8)]
287pub enum FileFormat {
288    CSV = 0,
289    PARQUET = 1,
290}
291
292impl FileFormat {
293    pub fn file_suffix(&self) -> &str {
294        match self {
295            FileFormat::CSV => "csv",
296            FileFormat::PARQUET => "parquet",
297        }
298    }
299}
300
301#[derive(
302    Copy,
303    Clone,
304    Debug,
305    Eq,
306    PartialEq,
307    Serialize,
308    Deserialize,
309    TryFromPrimitive,
310    IntoPrimitive,
311    EnumIter,
312    ValueEnum,
313)]
314#[repr(u8)]
315pub enum FileType {
316    Checkpoint = 0,
317    Object,
318    Transaction,
319    TransactionObjects,
320    Event,
321    MoveCall,
322    MovePackage,
323    DynamicField,
324    WrappedObject,
325}
326
327impl FileType {
328    pub fn dir_prefix(&self) -> Path {
329        match self {
330            FileType::Checkpoint => Path::from(CHECKPOINT_DIR_PREFIX),
331            FileType::Transaction => Path::from(TRANSACTION_DIR_PREFIX),
332            FileType::TransactionObjects => Path::from(TRANSACTION_OBJECT_DIR_PREFIX),
333            FileType::Object => Path::from(OBJECT_DIR_PREFIX),
334            FileType::Event => Path::from(EVENT_DIR_PREFIX),
335            FileType::MoveCall => Path::from(MOVE_CALL_PREFIX),
336            FileType::MovePackage => Path::from(MOVE_PACKAGE_PREFIX),
337            FileType::DynamicField => Path::from(DYNAMIC_FIELD_PREFIX),
338            FileType::WrappedObject => Path::from(WRAPPED_OBJECT_PREFIX),
339        }
340    }
341
342    pub fn file_path(
343        &self,
344        file_format: FileFormat,
345        epoch_num: EpochId,
346        checkpoint_range: Range<u64>,
347    ) -> Path {
348        self.dir_prefix()
349            .child(format!("{}{}", EPOCH_DIR_PREFIX, epoch_num))
350            .child(format!(
351                "{}_{}.{}",
352                checkpoint_range.start,
353                checkpoint_range.end,
354                file_format.file_suffix()
355            ))
356    }
357}
358
359pub enum ParquetValue {
360    U64(u64),
361    Str(String),
362    Bool(bool),
363    I64(i64),
364    OptionU64(Option<u64>),
365    OptionStr(Option<String>),
366}
367
368impl From<u64> for ParquetValue {
369    fn from(value: u64) -> Self {
370        Self::U64(value)
371    }
372}
373
374impl From<i64> for ParquetValue {
375    fn from(value: i64) -> Self {
376        Self::I64(value)
377    }
378}
379
380impl From<String> for ParquetValue {
381    fn from(value: String) -> Self {
382        Self::Str(value)
383    }
384}
385
386impl From<Option<u64>> for ParquetValue {
387    fn from(value: Option<u64>) -> Self {
388        Self::OptionU64(value)
389    }
390}
391
392impl From<Option<String>> for ParquetValue {
393    fn from(value: Option<String>) -> Self {
394        Self::OptionStr(value)
395    }
396}
397
398impl From<bool> for ParquetValue {
399    fn from(value: bool) -> Self {
400        Self::Bool(value)
401    }
402}
403
404impl From<OwnerType> for ParquetValue {
405    fn from(value: OwnerType) -> Self {
406        Self::Str(value.to_string())
407    }
408}
409
410impl From<Option<OwnerType>> for ParquetValue {
411    fn from(value: Option<OwnerType>) -> Self {
412        value.map(|v| v.to_string()).into()
413    }
414}
415
416impl From<ObjectStatus> for ParquetValue {
417    fn from(value: ObjectStatus) -> Self {
418        Self::Str(value.to_string())
419    }
420}
421
422impl From<Option<ObjectStatus>> for ParquetValue {
423    fn from(value: Option<ObjectStatus>) -> Self {
424        Self::OptionStr(value.map(|v| v.to_string()))
425    }
426}
427
428impl From<Option<InputObjectKind>> for ParquetValue {
429    fn from(value: Option<InputObjectKind>) -> Self {
430        Self::OptionStr(value.map(|v| v.to_string()))
431    }
432}
433
434impl From<DynamicFieldType> for ParquetValue {
435    fn from(value: DynamicFieldType) -> Self {
436        Self::Str(value.to_string())
437    }
438}
439
440impl From<Option<DynamicFieldType>> for ParquetValue {
441    fn from(value: Option<DynamicFieldType>) -> Self {
442        Self::OptionStr(value.map(|v| v.to_string()))
443    }
444}
445
446pub trait ParquetSchema {
447    fn schema() -> Vec<String>;
448
449    fn get_column(&self, idx: usize) -> ParquetValue;
450}
451
452#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
453pub struct FileMetadata {
454    pub file_type: FileType,
455    pub file_format: FileFormat,
456    pub epoch_num: u64,
457    pub checkpoint_seq_range: Range<u64>,
458}
459
460impl FileMetadata {
461    fn new(
462        file_type: FileType,
463        file_format: FileFormat,
464        epoch_num: u64,
465        checkpoint_seq_range: Range<u64>,
466    ) -> FileMetadata {
467        FileMetadata {
468            file_type,
469            file_format,
470            epoch_num,
471            checkpoint_seq_range,
472        }
473    }
474
475    pub fn file_path(&self) -> Path {
476        self.file_type.file_path(
477            self.file_format,
478            self.epoch_num,
479            self.checkpoint_seq_range.clone(),
480        )
481    }
482}
483
484pub struct Processor {
485    pub processor: Box<dyn Worker<Message = (), Error = anyhow::Error>>,
486    pub starting_checkpoint_seq_num: CheckpointSequenceNumber,
487}
488
489#[async_trait::async_trait]
490impl Worker for Processor {
491    type Message = ();
492    type Error = anyhow::Error;
493
494    #[inline]
495    async fn process_checkpoint(
496        &self,
497        checkpoint_data: Arc<CheckpointData>,
498    ) -> Result<Self::Message, Self::Error> {
499        self.processor.process_checkpoint(checkpoint_data).await
500    }
501}
502
503impl Processor {
504    pub async fn new<S: Serialize + ParquetSchema + 'static>(
505        handler: Box<dyn AnalyticsHandler<S>>,
506        writer: Box<dyn AnalyticsWriter<S>>,
507        max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
508        starting_checkpoint_seq_num: CheckpointSequenceNumber,
509        metrics: AnalyticsMetrics,
510        config: AnalyticsIndexerConfig,
511    ) -> Result<Self> {
512        let processor = Box::new(
513            AnalyticsProcessor::new(
514                handler,
515                writer,
516                max_checkpoint_reader,
517                starting_checkpoint_seq_num,
518                metrics,
519                config,
520            )
521            .await?,
522        );
523
524        Ok(Processor {
525            processor,
526            starting_checkpoint_seq_num,
527        })
528    }
529
530    pub fn last_committed_checkpoint(&self) -> Option<u64> {
531        Some(self.starting_checkpoint_seq_num.saturating_sub(1)).filter(|x| *x > 0)
532    }
533}
534
535pub async fn read_store_for_checkpoint(
536    remote_store_config: ObjectStoreConfig,
537    file_type: FileType,
538    dir_prefix: Option<Path>,
539) -> Result<CheckpointSequenceNumber> {
540    let remote_object_store = remote_store_config.make()?;
541    let remote_store_is_empty = remote_object_store
542        .list_with_delimiter(None)
543        .await
544        .expect("Failed to read remote analytics store")
545        .common_prefixes
546        .is_empty();
547    info!("Remote store is empty: {remote_store_is_empty}");
548    let file_type_prefix = file_type.dir_prefix();
549    let prefix = join_paths(dir_prefix, &file_type_prefix);
550    let epoch_dirs = find_all_dirs_with_epoch_prefix(&remote_object_store, Some(&prefix)).await?;
551    let epoch = epoch_dirs.last_key_value().map(|(k, _v)| *k).unwrap_or(0);
552    let epoch_prefix = prefix.child(format!("epoch_{}", epoch));
553    let checkpoints =
554        find_all_files_with_epoch_prefix(&remote_object_store, Some(&epoch_prefix)).await?;
555    let next_checkpoint_seq_num = checkpoints
556        .iter()
557        .max_by(|x, y| x.end.cmp(&y.end))
558        .map(|r| r.end)
559        .unwrap_or(0);
560    Ok(next_checkpoint_seq_num)
561}
562
563pub async fn make_max_checkpoint_reader(
564    config: &AnalyticsIndexerConfig,
565) -> Result<Box<dyn MaxCheckpointReader>> {
566    let res: Box<dyn MaxCheckpointReader> = if config.report_bq_max_table_checkpoint {
567        Box::new(
568            BQMaxCheckpointReader::new(
569                config
570                    .bq_service_account_key_file
571                    .as_ref()
572                    .ok_or(anyhow!("Missing gcp key file"))?,
573                config
574                    .bq_project_id
575                    .as_ref()
576                    .ok_or(anyhow!("Missing big query project id"))?,
577                config
578                    .bq_dataset_id
579                    .as_ref()
580                    .ok_or(anyhow!("Missing big query dataset id"))?,
581                config
582                    .bq_table_id
583                    .as_ref()
584                    .ok_or(anyhow!("Missing big query table id"))?,
585                config
586                    .bq_checkpoint_col_id
587                    .as_ref()
588                    .ok_or(anyhow!("Missing big query checkpoint col id"))?,
589            )
590            .await?,
591        )
592    } else if config.report_sf_max_table_checkpoint {
593        Box::new(
594            SnowflakeMaxCheckpointReader::new(
595                config
596                    .sf_account_identifier
597                    .as_ref()
598                    .ok_or(anyhow!("Missing sf account identifier"))?,
599                config
600                    .sf_warehouse
601                    .as_ref()
602                    .ok_or(anyhow!("Missing sf warehouse"))?,
603                config
604                    .sf_database
605                    .as_ref()
606                    .ok_or(anyhow!("Missing sf database"))?,
607                config
608                    .sf_schema
609                    .as_ref()
610                    .ok_or(anyhow!("Missing sf schema"))?,
611                config
612                    .sf_username
613                    .as_ref()
614                    .ok_or(anyhow!("Missing sf username"))?,
615                config.sf_role.as_ref().ok_or(anyhow!("Missing sf role"))?,
616                config
617                    .sf_password
618                    .as_ref()
619                    .ok_or(anyhow!("Missing sf password"))?,
620                config
621                    .sf_table_id
622                    .as_ref()
623                    .ok_or(anyhow!("Missing sf table id"))?,
624                config
625                    .sf_checkpoint_col_id
626                    .as_ref()
627                    .ok_or(anyhow!("Missing sf checkpoint col id"))?,
628            )
629            .await?,
630        )
631    } else {
632        Box::new(NoOpCheckpointReader {})
633    };
634    Ok(res)
635}
636
637pub async fn make_checkpoint_processor(
638    config: AnalyticsIndexerConfig,
639    metrics: AnalyticsMetrics,
640) -> Result<Processor> {
641    let handler: Box<dyn AnalyticsHandler<CheckpointEntry>> = Box::new(CheckpointHandler::new());
642    let starting_checkpoint_seq_num =
643        get_starting_checkpoint_seq_num(config.clone(), FileType::Checkpoint).await?;
644    let writer = make_writer::<CheckpointEntry>(
645        config.clone(),
646        FileType::Checkpoint,
647        starting_checkpoint_seq_num,
648    )?;
649    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
650    Processor::new::<CheckpointEntry>(
651        handler,
652        writer,
653        max_checkpoint_reader,
654        starting_checkpoint_seq_num,
655        metrics,
656        config,
657    )
658    .await
659}
660
661pub async fn make_transaction_processor(
662    config: AnalyticsIndexerConfig,
663    metrics: AnalyticsMetrics,
664) -> Result<Processor> {
665    let handler: Box<dyn AnalyticsHandler<TransactionEntry>> = Box::new(TransactionHandler::new());
666    let starting_checkpoint_seq_num =
667        get_starting_checkpoint_seq_num(config.clone(), FileType::Transaction).await?;
668    let writer = make_writer::<TransactionEntry>(
669        config.clone(),
670        FileType::Transaction,
671        starting_checkpoint_seq_num,
672    )?;
673    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
674    Processor::new::<TransactionEntry>(
675        handler,
676        writer,
677        max_checkpoint_reader,
678        starting_checkpoint_seq_num,
679        metrics,
680        config,
681    )
682    .await
683}
684
685pub async fn make_object_processor(
686    config: AnalyticsIndexerConfig,
687    metrics: AnalyticsMetrics,
688) -> Result<Processor> {
689    let handler: Box<dyn AnalyticsHandler<ObjectEntry>> = Box::new(ObjectHandler::new(
690        &config.package_cache_path,
691        &config.rest_url,
692    ));
693    let starting_checkpoint_seq_num =
694        get_starting_checkpoint_seq_num(config.clone(), FileType::Object).await?;
695    let writer = make_writer::<ObjectEntry>(
696        config.clone(),
697        FileType::Object,
698        starting_checkpoint_seq_num,
699    )?;
700    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
701    Processor::new::<ObjectEntry>(
702        handler,
703        writer,
704        max_checkpoint_reader,
705        starting_checkpoint_seq_num,
706        metrics,
707        config,
708    )
709    .await
710}
711
712pub async fn make_event_processor(
713    config: AnalyticsIndexerConfig,
714    metrics: AnalyticsMetrics,
715) -> Result<Processor> {
716    let handler: Box<dyn AnalyticsHandler<EventEntry>> = Box::new(EventHandler::new(
717        &config.package_cache_path,
718        &config.rest_url,
719    ));
720    let starting_checkpoint_seq_num =
721        get_starting_checkpoint_seq_num(config.clone(), FileType::Event).await?;
722    let writer =
723        make_writer::<EventEntry>(config.clone(), FileType::Event, starting_checkpoint_seq_num)?;
724    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
725    Processor::new::<EventEntry>(
726        handler,
727        writer,
728        max_checkpoint_reader,
729        starting_checkpoint_seq_num,
730        metrics,
731        config,
732    )
733    .await
734}
735
736pub async fn make_transaction_objects_processor(
737    config: AnalyticsIndexerConfig,
738    metrics: AnalyticsMetrics,
739) -> Result<Processor> {
740    let starting_checkpoint_seq_num =
741        get_starting_checkpoint_seq_num(config.clone(), FileType::TransactionObjects).await?;
742    let handler = Box::new(TransactionObjectsHandler::new());
743    let writer = make_writer(
744        config.clone(),
745        FileType::TransactionObjects,
746        starting_checkpoint_seq_num,
747    )?;
748    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
749    Processor::new::<TransactionObjectEntry>(
750        handler,
751        writer,
752        max_checkpoint_reader,
753        starting_checkpoint_seq_num,
754        metrics,
755        config,
756    )
757    .await
758}
759
760pub async fn make_move_package_processor(
761    config: AnalyticsIndexerConfig,
762    metrics: AnalyticsMetrics,
763) -> Result<Processor> {
764    let handler: Box<dyn AnalyticsHandler<MovePackageEntry>> = Box::new(PackageHandler::new());
765    let starting_checkpoint_seq_num =
766        get_starting_checkpoint_seq_num(config.clone(), FileType::MovePackage).await?;
767    let writer = make_writer::<MovePackageEntry>(
768        config.clone(),
769        FileType::MovePackage,
770        starting_checkpoint_seq_num,
771    )?;
772    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
773    Processor::new::<MovePackageEntry>(
774        handler,
775        writer,
776        max_checkpoint_reader,
777        starting_checkpoint_seq_num,
778        metrics,
779        config,
780    )
781    .await
782}
783
784pub async fn make_move_call_processor(
785    config: AnalyticsIndexerConfig,
786    metrics: AnalyticsMetrics,
787) -> Result<Processor> {
788    let starting_checkpoint_seq_num =
789        get_starting_checkpoint_seq_num(config.clone(), FileType::MoveCall).await?;
790    let handler: Box<dyn AnalyticsHandler<MoveCallEntry>> = Box::new(MoveCallHandler::new());
791    let writer = make_writer::<MoveCallEntry>(
792        config.clone(),
793        FileType::MoveCall,
794        starting_checkpoint_seq_num,
795    )?;
796    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
797    Processor::new::<MoveCallEntry>(
798        handler,
799        writer,
800        max_checkpoint_reader,
801        starting_checkpoint_seq_num,
802        metrics,
803        config,
804    )
805    .await
806}
807
808pub async fn make_dynamic_field_processor(
809    config: AnalyticsIndexerConfig,
810    metrics: AnalyticsMetrics,
811) -> Result<Processor> {
812    let starting_checkpoint_seq_num =
813        get_starting_checkpoint_seq_num(config.clone(), FileType::DynamicField).await?;
814    let handler: Box<dyn AnalyticsHandler<DynamicFieldEntry>> = Box::new(DynamicFieldHandler::new(
815        &config.package_cache_path,
816        &config.rest_url,
817    ));
818    let writer = make_writer::<DynamicFieldEntry>(
819        config.clone(),
820        FileType::DynamicField,
821        starting_checkpoint_seq_num,
822    )?;
823    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
824    Processor::new::<DynamicFieldEntry>(
825        handler,
826        writer,
827        max_checkpoint_reader,
828        starting_checkpoint_seq_num,
829        metrics,
830        config,
831    )
832    .await
833}
834
835pub async fn make_wrapped_object_processor(
836    config: AnalyticsIndexerConfig,
837    metrics: AnalyticsMetrics,
838) -> Result<Processor> {
839    let starting_checkpoint_seq_num =
840        get_starting_checkpoint_seq_num(config.clone(), FileType::WrappedObject).await?;
841    let handler: Box<dyn AnalyticsHandler<WrappedObjectEntry>> = Box::new(
842        WrappedObjectHandler::new(&config.package_cache_path, &config.rest_url),
843    );
844    let writer = make_writer::<WrappedObjectEntry>(
845        config.clone(),
846        FileType::WrappedObject,
847        starting_checkpoint_seq_num,
848    )?;
849    let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
850    Processor::new::<WrappedObjectEntry>(
851        handler,
852        writer,
853        max_checkpoint_reader,
854        starting_checkpoint_seq_num,
855        metrics,
856        config,
857    )
858    .await
859}
860
861pub fn make_writer<S: Serialize + ParquetSchema>(
862    config: AnalyticsIndexerConfig,
863    file_type: FileType,
864    starting_checkpoint_seq_num: u64,
865) -> Result<Box<dyn AnalyticsWriter<S>>> {
866    Ok(match config.file_format {
867        FileFormat::CSV => Box::new(CSVWriter::new(
868            &config.checkpoint_dir,
869            file_type,
870            starting_checkpoint_seq_num,
871        )?),
872        FileFormat::PARQUET => Box::new(ParquetWriter::new(
873            &config.checkpoint_dir,
874            file_type,
875            starting_checkpoint_seq_num,
876        )?),
877    })
878}
879
880pub async fn get_starting_checkpoint_seq_num(
881    config: AnalyticsIndexerConfig,
882    file_type: FileType,
883) -> Result<u64> {
884    let checkpoint = if let Some(starting_checkpoint_seq_num) = config.starting_checkpoint_seq_num {
885        starting_checkpoint_seq_num
886    } else {
887        read_store_for_checkpoint(
888            config.remote_store_config.clone(),
889            file_type,
890            config.remote_store_path_prefix,
891        )
892        .await?
893    };
894    Ok(checkpoint)
895}
896
897pub async fn make_analytics_processor(
898    config: AnalyticsIndexerConfig,
899    metrics: AnalyticsMetrics,
900) -> Result<Processor> {
901    match config.file_type {
902        FileType::Checkpoint => make_checkpoint_processor(config, metrics).await,
903        FileType::Object => make_object_processor(config, metrics).await,
904        FileType::Transaction => make_transaction_processor(config, metrics).await,
905        FileType::Event => make_event_processor(config, metrics).await,
906        FileType::TransactionObjects => make_transaction_objects_processor(config, metrics).await,
907        FileType::MoveCall => make_move_call_processor(config, metrics).await,
908        FileType::MovePackage => make_move_package_processor(config, metrics).await,
909        FileType::DynamicField => make_dynamic_field_processor(config, metrics).await,
910        FileType::WrappedObject => make_wrapped_object_processor(config, metrics).await,
911    }
912}
913
914pub fn join_paths(base: Option<Path>, child: &Path) -> Path {
915    base.map(|p| {
916        let mut out_path = p.clone();
917        for part in child.parts() {
918            out_path = out_path.child(part)
919        }
920        out_path
921    })
922    .unwrap_or(child.clone())
923}