1use std::{ops::Range, path::PathBuf, sync::Arc};
6
7use anyhow::{Result, anyhow};
8use arrow_array::Int32Array;
9use clap::*;
10use gcp_bigquery_client::{Client, model::query_request::QueryRequest};
11use iota_config::object_storage_config::ObjectStoreConfig;
12use iota_data_ingestion_core::Worker;
13use iota_rest_api::CheckpointData;
14use iota_storage::object_store::util::{
15 find_all_dirs_with_epoch_prefix, find_all_files_with_epoch_prefix,
16};
17use iota_types::{
18 base_types::EpochId, dynamic_field::DynamicFieldType,
19 messages_checkpoint::CheckpointSequenceNumber,
20};
21use num_enum::{IntoPrimitive, TryFromPrimitive};
22use object_store::path::Path;
23use serde::{Deserialize, Serialize};
24use snowflake_api::{QueryResult, SnowflakeApi};
25use strum::EnumIter;
26use tracing::info;
27
28use crate::{
29 analytics_metrics::AnalyticsMetrics,
30 analytics_processor::AnalyticsProcessor,
31 handlers::{
32 AnalyticsHandler, checkpoint_handler::CheckpointHandler, df_handler::DynamicFieldHandler,
33 event_handler::EventHandler, move_call_handler::MoveCallHandler,
34 object_handler::ObjectHandler, package_handler::PackageHandler,
35 transaction_handler::TransactionHandler,
36 transaction_objects_handler::TransactionObjectsHandler,
37 wrapped_object_handler::WrappedObjectHandler,
38 },
39 tables::{
40 CheckpointEntry, DynamicFieldEntry, EventEntry, InputObjectKind, MoveCallEntry,
41 MovePackageEntry, ObjectEntry, ObjectStatus, OwnerType, TransactionEntry,
42 TransactionObjectEntry, WrappedObjectEntry,
43 },
44 writers::{AnalyticsWriter, csv_writer::CSVWriter, parquet_writer::ParquetWriter},
45};
46
47pub mod analytics_metrics;
48pub mod analytics_processor;
49pub mod errors;
50mod handlers;
51mod package_store;
52pub mod tables;
53mod writers;
54
55const EPOCH_DIR_PREFIX: &str = "epoch_";
56const CHECKPOINT_DIR_PREFIX: &str = "checkpoints";
57const OBJECT_DIR_PREFIX: &str = "objects";
58const TRANSACTION_DIR_PREFIX: &str = "transactions";
59const EVENT_DIR_PREFIX: &str = "events";
60const TRANSACTION_OBJECT_DIR_PREFIX: &str = "transaction_objects";
61const MOVE_CALL_PREFIX: &str = "move_call";
62const MOVE_PACKAGE_PREFIX: &str = "move_package";
63const DYNAMIC_FIELD_PREFIX: &str = "dynamic_field";
64
65const WRAPPED_OBJECT_PREFIX: &str = "wrapped_object";
66
67#[derive(Parser, Clone, Debug)]
68#[command(
69 name = "IOTA Analytics Indexer",
70 about = "Indexer service to upload data for the analytics pipeline."
71)]
72pub struct AnalyticsIndexerConfig {
73 #[arg(long)]
75 pub rest_url: String,
76 #[arg(long, default_value = "127.0.0.1", global = true)]
78 pub client_metric_host: String,
79 #[arg(long, default_value = "8081", global = true)]
81 pub client_metric_port: u16,
82 #[arg(long, global = true, default_value = "/tmp")]
84 pub checkpoint_dir: PathBuf,
85 #[arg(long, default_value = "10000", global = true)]
87 pub checkpoint_interval: u64,
88 #[arg(long, default_value = "100", global = true)]
90 pub max_file_size_mb: u64,
91 #[arg(long, default_value = None, global = true)]
93 pub starting_checkpoint_seq_num: Option<u64>,
94 #[arg(long, default_value = "600", global = true)]
96 pub time_interval_s: u64,
97 #[command(flatten)]
99 pub remote_store_config: ObjectStoreConfig,
100 #[arg(long, default_value = None, global = true)]
102 pub remote_store_path_prefix: Option<Path>,
103 #[arg(long, value_enum, default_value = "csv", global = true)]
105 pub file_format: FileFormat,
106 #[arg(long, value_enum, long, global = true)]
108 pub file_type: FileType,
109 #[arg(
110 long,
111 default_value = "https://checkpoints.mainnet.iota.cafe",
112 global = true
113 )]
114 pub remote_store_url: String,
115 #[arg(
117 long,
118 value_enum,
119 long,
120 global = true,
121 default_value = "/opt/iota/db/package_cache"
122 )]
123 pub package_cache_path: PathBuf,
124 #[arg(long, default_value = None, global = true)]
125 pub bq_service_account_key_file: Option<String>,
126 #[arg(long, default_value = None, global = true)]
127 pub bq_project_id: Option<String>,
128 #[arg(long, default_value = None, global = true)]
129 pub bq_dataset_id: Option<String>,
130 #[arg(long, default_value = None, global = true)]
131 pub bq_table_id: Option<String>,
132 #[arg(long, default_value = None, global = true)]
133 pub bq_checkpoint_col_id: Option<String>,
134 #[arg(long, global = true)]
135 pub report_bq_max_table_checkpoint: bool,
136 #[arg(long, default_value = None, global = true)]
137 pub sf_account_identifier: Option<String>,
138 #[arg(long, default_value = None, global = true)]
139 pub sf_warehouse: Option<String>,
140 #[arg(long, default_value = None, global = true)]
141 pub sf_database: Option<String>,
142 #[arg(long, default_value = None, global = true)]
143 pub sf_schema: Option<String>,
144 #[arg(long, default_value = None, global = true)]
145 pub sf_username: Option<String>,
146 #[arg(long, default_value = None, global = true)]
147 pub sf_role: Option<String>,
148 #[arg(long, default_value = None, global = true)]
149 pub sf_password: Option<String>,
150 #[arg(long, default_value = None, global = true)]
151 pub sf_table_id: Option<String>,
152 #[arg(long, default_value = None, global = true)]
153 pub sf_checkpoint_col_id: Option<String>,
154 #[arg(long, global = true)]
155 pub report_sf_max_table_checkpoint: bool,
156}
157
158#[async_trait::async_trait]
159pub trait MaxCheckpointReader: Send + Sync + 'static {
160 async fn max_checkpoint(&self) -> Result<i64>;
161}
162
163struct SnowflakeMaxCheckpointReader {
164 query: String,
165 api: SnowflakeApi,
166}
167
168impl SnowflakeMaxCheckpointReader {
169 pub async fn new(
170 account_identifier: &str,
171 warehouse: &str,
172 database: &str,
173 schema: &str,
174 user: &str,
175 role: &str,
176 passwd: &str,
177 table_id: &str,
178 col_id: &str,
179 ) -> anyhow::Result<Self> {
180 let api = SnowflakeApi::with_password_auth(
181 account_identifier,
182 Some(warehouse),
183 Some(database),
184 Some(schema),
185 user,
186 Some(role),
187 passwd,
188 )
189 .expect("Failed to build sf api client");
190 Ok(SnowflakeMaxCheckpointReader {
191 query: format!("SELECT max({}) from {}", col_id, table_id),
192 api,
193 })
194 }
195}
196
197#[async_trait::async_trait]
198impl MaxCheckpointReader for SnowflakeMaxCheckpointReader {
199 async fn max_checkpoint(&self) -> Result<i64> {
200 let res = self.api.exec(&self.query).await?;
201 match res {
202 QueryResult::Arrow(a) => {
203 if let Some(record_batch) = a.first() {
204 let col = record_batch.column(0);
205 let col_array = col
206 .as_any()
207 .downcast_ref::<Int32Array>()
208 .expect("Failed to downcast arrow column");
209 Ok(col_array.value(0) as i64)
210 } else {
211 Ok(-1)
212 }
213 }
214 QueryResult::Json(_j) => Err(anyhow!("Unexpected query result")),
215 QueryResult::Empty => Err(anyhow!("Unexpected query result")),
216 }
217 }
218}
219
220struct BQMaxCheckpointReader {
221 query: String,
222 project_id: String,
223 client: Client,
224}
225
226impl BQMaxCheckpointReader {
227 pub async fn new(
228 key_path: &str,
229 project_id: &str,
230 dataset_id: &str,
231 table_id: &str,
232 col_id: &str,
233 ) -> anyhow::Result<Self> {
234 Ok(BQMaxCheckpointReader {
235 query: format!(
236 "SELECT max({}) from `{}.{}.{}`",
237 col_id, project_id, dataset_id, table_id
238 ),
239 client: Client::from_service_account_key_file(key_path).await?,
240 project_id: project_id.to_string(),
241 })
242 }
243}
244
245#[async_trait::async_trait]
246impl MaxCheckpointReader for BQMaxCheckpointReader {
247 async fn max_checkpoint(&self) -> Result<i64> {
248 let mut result = self
249 .client
250 .job()
251 .query(&self.project_id, QueryRequest::new(&self.query))
252 .await?;
253 if result.next_row() {
254 let max_checkpoint = result.get_i64(0)?.ok_or(anyhow!("No rows returned"))?;
255 Ok(max_checkpoint)
256 } else {
257 Ok(-1)
258 }
259 }
260}
261
262struct NoOpCheckpointReader;
263
264#[async_trait::async_trait]
265impl MaxCheckpointReader for NoOpCheckpointReader {
266 async fn max_checkpoint(&self) -> Result<i64> {
267 Ok(-1)
268 }
269}
270
271#[derive(
272 Copy,
273 Clone,
274 Debug,
275 Eq,
276 PartialEq,
277 Parser,
278 strum::Display,
279 ValueEnum,
280 Serialize,
281 Deserialize,
282 TryFromPrimitive,
283 IntoPrimitive,
284 EnumIter,
285)]
286#[repr(u8)]
287pub enum FileFormat {
288 CSV = 0,
289 PARQUET = 1,
290}
291
292impl FileFormat {
293 pub fn file_suffix(&self) -> &str {
294 match self {
295 FileFormat::CSV => "csv",
296 FileFormat::PARQUET => "parquet",
297 }
298 }
299}
300
301#[derive(
302 Copy,
303 Clone,
304 Debug,
305 Eq,
306 PartialEq,
307 Serialize,
308 Deserialize,
309 TryFromPrimitive,
310 IntoPrimitive,
311 EnumIter,
312 ValueEnum,
313)]
314#[repr(u8)]
315pub enum FileType {
316 Checkpoint = 0,
317 Object,
318 Transaction,
319 TransactionObjects,
320 Event,
321 MoveCall,
322 MovePackage,
323 DynamicField,
324 WrappedObject,
325}
326
327impl FileType {
328 pub fn dir_prefix(&self) -> Path {
329 match self {
330 FileType::Checkpoint => Path::from(CHECKPOINT_DIR_PREFIX),
331 FileType::Transaction => Path::from(TRANSACTION_DIR_PREFIX),
332 FileType::TransactionObjects => Path::from(TRANSACTION_OBJECT_DIR_PREFIX),
333 FileType::Object => Path::from(OBJECT_DIR_PREFIX),
334 FileType::Event => Path::from(EVENT_DIR_PREFIX),
335 FileType::MoveCall => Path::from(MOVE_CALL_PREFIX),
336 FileType::MovePackage => Path::from(MOVE_PACKAGE_PREFIX),
337 FileType::DynamicField => Path::from(DYNAMIC_FIELD_PREFIX),
338 FileType::WrappedObject => Path::from(WRAPPED_OBJECT_PREFIX),
339 }
340 }
341
342 pub fn file_path(
343 &self,
344 file_format: FileFormat,
345 epoch_num: EpochId,
346 checkpoint_range: Range<u64>,
347 ) -> Path {
348 self.dir_prefix()
349 .child(format!("{}{}", EPOCH_DIR_PREFIX, epoch_num))
350 .child(format!(
351 "{}_{}.{}",
352 checkpoint_range.start,
353 checkpoint_range.end,
354 file_format.file_suffix()
355 ))
356 }
357}
358
359pub enum ParquetValue {
360 U64(u64),
361 Str(String),
362 Bool(bool),
363 I64(i64),
364 OptionU64(Option<u64>),
365 OptionStr(Option<String>),
366}
367
368impl From<u64> for ParquetValue {
369 fn from(value: u64) -> Self {
370 Self::U64(value)
371 }
372}
373
374impl From<i64> for ParquetValue {
375 fn from(value: i64) -> Self {
376 Self::I64(value)
377 }
378}
379
380impl From<String> for ParquetValue {
381 fn from(value: String) -> Self {
382 Self::Str(value)
383 }
384}
385
386impl From<Option<u64>> for ParquetValue {
387 fn from(value: Option<u64>) -> Self {
388 Self::OptionU64(value)
389 }
390}
391
392impl From<Option<String>> for ParquetValue {
393 fn from(value: Option<String>) -> Self {
394 Self::OptionStr(value)
395 }
396}
397
398impl From<bool> for ParquetValue {
399 fn from(value: bool) -> Self {
400 Self::Bool(value)
401 }
402}
403
404impl From<OwnerType> for ParquetValue {
405 fn from(value: OwnerType) -> Self {
406 Self::Str(value.to_string())
407 }
408}
409
410impl From<Option<OwnerType>> for ParquetValue {
411 fn from(value: Option<OwnerType>) -> Self {
412 value.map(|v| v.to_string()).into()
413 }
414}
415
416impl From<ObjectStatus> for ParquetValue {
417 fn from(value: ObjectStatus) -> Self {
418 Self::Str(value.to_string())
419 }
420}
421
422impl From<Option<ObjectStatus>> for ParquetValue {
423 fn from(value: Option<ObjectStatus>) -> Self {
424 Self::OptionStr(value.map(|v| v.to_string()))
425 }
426}
427
428impl From<Option<InputObjectKind>> for ParquetValue {
429 fn from(value: Option<InputObjectKind>) -> Self {
430 Self::OptionStr(value.map(|v| v.to_string()))
431 }
432}
433
434impl From<DynamicFieldType> for ParquetValue {
435 fn from(value: DynamicFieldType) -> Self {
436 Self::Str(value.to_string())
437 }
438}
439
440impl From<Option<DynamicFieldType>> for ParquetValue {
441 fn from(value: Option<DynamicFieldType>) -> Self {
442 Self::OptionStr(value.map(|v| v.to_string()))
443 }
444}
445
446pub trait ParquetSchema {
447 fn schema() -> Vec<String>;
448
449 fn get_column(&self, idx: usize) -> ParquetValue;
450}
451
452#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
453pub struct FileMetadata {
454 pub file_type: FileType,
455 pub file_format: FileFormat,
456 pub epoch_num: u64,
457 pub checkpoint_seq_range: Range<u64>,
458}
459
460impl FileMetadata {
461 fn new(
462 file_type: FileType,
463 file_format: FileFormat,
464 epoch_num: u64,
465 checkpoint_seq_range: Range<u64>,
466 ) -> FileMetadata {
467 FileMetadata {
468 file_type,
469 file_format,
470 epoch_num,
471 checkpoint_seq_range,
472 }
473 }
474
475 pub fn file_path(&self) -> Path {
476 self.file_type.file_path(
477 self.file_format,
478 self.epoch_num,
479 self.checkpoint_seq_range.clone(),
480 )
481 }
482}
483
484pub struct Processor {
485 pub processor: Box<dyn Worker<Message = (), Error = anyhow::Error>>,
486 pub starting_checkpoint_seq_num: CheckpointSequenceNumber,
487}
488
489#[async_trait::async_trait]
490impl Worker for Processor {
491 type Message = ();
492 type Error = anyhow::Error;
493
494 #[inline]
495 async fn process_checkpoint(
496 &self,
497 checkpoint_data: Arc<CheckpointData>,
498 ) -> Result<Self::Message, Self::Error> {
499 self.processor.process_checkpoint(checkpoint_data).await
500 }
501}
502
503impl Processor {
504 pub async fn new<S: Serialize + ParquetSchema + 'static>(
505 handler: Box<dyn AnalyticsHandler<S>>,
506 writer: Box<dyn AnalyticsWriter<S>>,
507 max_checkpoint_reader: Box<dyn MaxCheckpointReader>,
508 starting_checkpoint_seq_num: CheckpointSequenceNumber,
509 metrics: AnalyticsMetrics,
510 config: AnalyticsIndexerConfig,
511 ) -> Result<Self> {
512 let processor = Box::new(
513 AnalyticsProcessor::new(
514 handler,
515 writer,
516 max_checkpoint_reader,
517 starting_checkpoint_seq_num,
518 metrics,
519 config,
520 )
521 .await?,
522 );
523
524 Ok(Processor {
525 processor,
526 starting_checkpoint_seq_num,
527 })
528 }
529
530 pub fn last_committed_checkpoint(&self) -> Option<u64> {
531 Some(self.starting_checkpoint_seq_num.saturating_sub(1)).filter(|x| *x > 0)
532 }
533}
534
535pub async fn read_store_for_checkpoint(
536 remote_store_config: ObjectStoreConfig,
537 file_type: FileType,
538 dir_prefix: Option<Path>,
539) -> Result<CheckpointSequenceNumber> {
540 let remote_object_store = remote_store_config.make()?;
541 let remote_store_is_empty = remote_object_store
542 .list_with_delimiter(None)
543 .await
544 .expect("Failed to read remote analytics store")
545 .common_prefixes
546 .is_empty();
547 info!("Remote store is empty: {remote_store_is_empty}");
548 let file_type_prefix = file_type.dir_prefix();
549 let prefix = join_paths(dir_prefix, &file_type_prefix);
550 let epoch_dirs = find_all_dirs_with_epoch_prefix(&remote_object_store, Some(&prefix)).await?;
551 let epoch = epoch_dirs.last_key_value().map(|(k, _v)| *k).unwrap_or(0);
552 let epoch_prefix = prefix.child(format!("epoch_{}", epoch));
553 let checkpoints =
554 find_all_files_with_epoch_prefix(&remote_object_store, Some(&epoch_prefix)).await?;
555 let next_checkpoint_seq_num = checkpoints
556 .iter()
557 .max_by(|x, y| x.end.cmp(&y.end))
558 .map(|r| r.end)
559 .unwrap_or(0);
560 Ok(next_checkpoint_seq_num)
561}
562
563pub async fn make_max_checkpoint_reader(
564 config: &AnalyticsIndexerConfig,
565) -> Result<Box<dyn MaxCheckpointReader>> {
566 let res: Box<dyn MaxCheckpointReader> = if config.report_bq_max_table_checkpoint {
567 Box::new(
568 BQMaxCheckpointReader::new(
569 config
570 .bq_service_account_key_file
571 .as_ref()
572 .ok_or(anyhow!("Missing gcp key file"))?,
573 config
574 .bq_project_id
575 .as_ref()
576 .ok_or(anyhow!("Missing big query project id"))?,
577 config
578 .bq_dataset_id
579 .as_ref()
580 .ok_or(anyhow!("Missing big query dataset id"))?,
581 config
582 .bq_table_id
583 .as_ref()
584 .ok_or(anyhow!("Missing big query table id"))?,
585 config
586 .bq_checkpoint_col_id
587 .as_ref()
588 .ok_or(anyhow!("Missing big query checkpoint col id"))?,
589 )
590 .await?,
591 )
592 } else if config.report_sf_max_table_checkpoint {
593 Box::new(
594 SnowflakeMaxCheckpointReader::new(
595 config
596 .sf_account_identifier
597 .as_ref()
598 .ok_or(anyhow!("Missing sf account identifier"))?,
599 config
600 .sf_warehouse
601 .as_ref()
602 .ok_or(anyhow!("Missing sf warehouse"))?,
603 config
604 .sf_database
605 .as_ref()
606 .ok_or(anyhow!("Missing sf database"))?,
607 config
608 .sf_schema
609 .as_ref()
610 .ok_or(anyhow!("Missing sf schema"))?,
611 config
612 .sf_username
613 .as_ref()
614 .ok_or(anyhow!("Missing sf username"))?,
615 config.sf_role.as_ref().ok_or(anyhow!("Missing sf role"))?,
616 config
617 .sf_password
618 .as_ref()
619 .ok_or(anyhow!("Missing sf password"))?,
620 config
621 .sf_table_id
622 .as_ref()
623 .ok_or(anyhow!("Missing sf table id"))?,
624 config
625 .sf_checkpoint_col_id
626 .as_ref()
627 .ok_or(anyhow!("Missing sf checkpoint col id"))?,
628 )
629 .await?,
630 )
631 } else {
632 Box::new(NoOpCheckpointReader {})
633 };
634 Ok(res)
635}
636
637pub async fn make_checkpoint_processor(
638 config: AnalyticsIndexerConfig,
639 metrics: AnalyticsMetrics,
640) -> Result<Processor> {
641 let handler: Box<dyn AnalyticsHandler<CheckpointEntry>> = Box::new(CheckpointHandler::new());
642 let starting_checkpoint_seq_num =
643 get_starting_checkpoint_seq_num(config.clone(), FileType::Checkpoint).await?;
644 let writer = make_writer::<CheckpointEntry>(
645 config.clone(),
646 FileType::Checkpoint,
647 starting_checkpoint_seq_num,
648 )?;
649 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
650 Processor::new::<CheckpointEntry>(
651 handler,
652 writer,
653 max_checkpoint_reader,
654 starting_checkpoint_seq_num,
655 metrics,
656 config,
657 )
658 .await
659}
660
661pub async fn make_transaction_processor(
662 config: AnalyticsIndexerConfig,
663 metrics: AnalyticsMetrics,
664) -> Result<Processor> {
665 let handler: Box<dyn AnalyticsHandler<TransactionEntry>> = Box::new(TransactionHandler::new());
666 let starting_checkpoint_seq_num =
667 get_starting_checkpoint_seq_num(config.clone(), FileType::Transaction).await?;
668 let writer = make_writer::<TransactionEntry>(
669 config.clone(),
670 FileType::Transaction,
671 starting_checkpoint_seq_num,
672 )?;
673 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
674 Processor::new::<TransactionEntry>(
675 handler,
676 writer,
677 max_checkpoint_reader,
678 starting_checkpoint_seq_num,
679 metrics,
680 config,
681 )
682 .await
683}
684
685pub async fn make_object_processor(
686 config: AnalyticsIndexerConfig,
687 metrics: AnalyticsMetrics,
688) -> Result<Processor> {
689 let handler: Box<dyn AnalyticsHandler<ObjectEntry>> = Box::new(ObjectHandler::new(
690 &config.package_cache_path,
691 &config.rest_url,
692 ));
693 let starting_checkpoint_seq_num =
694 get_starting_checkpoint_seq_num(config.clone(), FileType::Object).await?;
695 let writer = make_writer::<ObjectEntry>(
696 config.clone(),
697 FileType::Object,
698 starting_checkpoint_seq_num,
699 )?;
700 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
701 Processor::new::<ObjectEntry>(
702 handler,
703 writer,
704 max_checkpoint_reader,
705 starting_checkpoint_seq_num,
706 metrics,
707 config,
708 )
709 .await
710}
711
712pub async fn make_event_processor(
713 config: AnalyticsIndexerConfig,
714 metrics: AnalyticsMetrics,
715) -> Result<Processor> {
716 let handler: Box<dyn AnalyticsHandler<EventEntry>> = Box::new(EventHandler::new(
717 &config.package_cache_path,
718 &config.rest_url,
719 ));
720 let starting_checkpoint_seq_num =
721 get_starting_checkpoint_seq_num(config.clone(), FileType::Event).await?;
722 let writer =
723 make_writer::<EventEntry>(config.clone(), FileType::Event, starting_checkpoint_seq_num)?;
724 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
725 Processor::new::<EventEntry>(
726 handler,
727 writer,
728 max_checkpoint_reader,
729 starting_checkpoint_seq_num,
730 metrics,
731 config,
732 )
733 .await
734}
735
736pub async fn make_transaction_objects_processor(
737 config: AnalyticsIndexerConfig,
738 metrics: AnalyticsMetrics,
739) -> Result<Processor> {
740 let starting_checkpoint_seq_num =
741 get_starting_checkpoint_seq_num(config.clone(), FileType::TransactionObjects).await?;
742 let handler = Box::new(TransactionObjectsHandler::new());
743 let writer = make_writer(
744 config.clone(),
745 FileType::TransactionObjects,
746 starting_checkpoint_seq_num,
747 )?;
748 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
749 Processor::new::<TransactionObjectEntry>(
750 handler,
751 writer,
752 max_checkpoint_reader,
753 starting_checkpoint_seq_num,
754 metrics,
755 config,
756 )
757 .await
758}
759
760pub async fn make_move_package_processor(
761 config: AnalyticsIndexerConfig,
762 metrics: AnalyticsMetrics,
763) -> Result<Processor> {
764 let handler: Box<dyn AnalyticsHandler<MovePackageEntry>> = Box::new(PackageHandler::new());
765 let starting_checkpoint_seq_num =
766 get_starting_checkpoint_seq_num(config.clone(), FileType::MovePackage).await?;
767 let writer = make_writer::<MovePackageEntry>(
768 config.clone(),
769 FileType::MovePackage,
770 starting_checkpoint_seq_num,
771 )?;
772 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
773 Processor::new::<MovePackageEntry>(
774 handler,
775 writer,
776 max_checkpoint_reader,
777 starting_checkpoint_seq_num,
778 metrics,
779 config,
780 )
781 .await
782}
783
784pub async fn make_move_call_processor(
785 config: AnalyticsIndexerConfig,
786 metrics: AnalyticsMetrics,
787) -> Result<Processor> {
788 let starting_checkpoint_seq_num =
789 get_starting_checkpoint_seq_num(config.clone(), FileType::MoveCall).await?;
790 let handler: Box<dyn AnalyticsHandler<MoveCallEntry>> = Box::new(MoveCallHandler::new());
791 let writer = make_writer::<MoveCallEntry>(
792 config.clone(),
793 FileType::MoveCall,
794 starting_checkpoint_seq_num,
795 )?;
796 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
797 Processor::new::<MoveCallEntry>(
798 handler,
799 writer,
800 max_checkpoint_reader,
801 starting_checkpoint_seq_num,
802 metrics,
803 config,
804 )
805 .await
806}
807
808pub async fn make_dynamic_field_processor(
809 config: AnalyticsIndexerConfig,
810 metrics: AnalyticsMetrics,
811) -> Result<Processor> {
812 let starting_checkpoint_seq_num =
813 get_starting_checkpoint_seq_num(config.clone(), FileType::DynamicField).await?;
814 let handler: Box<dyn AnalyticsHandler<DynamicFieldEntry>> = Box::new(DynamicFieldHandler::new(
815 &config.package_cache_path,
816 &config.rest_url,
817 ));
818 let writer = make_writer::<DynamicFieldEntry>(
819 config.clone(),
820 FileType::DynamicField,
821 starting_checkpoint_seq_num,
822 )?;
823 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
824 Processor::new::<DynamicFieldEntry>(
825 handler,
826 writer,
827 max_checkpoint_reader,
828 starting_checkpoint_seq_num,
829 metrics,
830 config,
831 )
832 .await
833}
834
835pub async fn make_wrapped_object_processor(
836 config: AnalyticsIndexerConfig,
837 metrics: AnalyticsMetrics,
838) -> Result<Processor> {
839 let starting_checkpoint_seq_num =
840 get_starting_checkpoint_seq_num(config.clone(), FileType::WrappedObject).await?;
841 let handler: Box<dyn AnalyticsHandler<WrappedObjectEntry>> = Box::new(
842 WrappedObjectHandler::new(&config.package_cache_path, &config.rest_url),
843 );
844 let writer = make_writer::<WrappedObjectEntry>(
845 config.clone(),
846 FileType::WrappedObject,
847 starting_checkpoint_seq_num,
848 )?;
849 let max_checkpoint_reader = make_max_checkpoint_reader(&config).await?;
850 Processor::new::<WrappedObjectEntry>(
851 handler,
852 writer,
853 max_checkpoint_reader,
854 starting_checkpoint_seq_num,
855 metrics,
856 config,
857 )
858 .await
859}
860
861pub fn make_writer<S: Serialize + ParquetSchema>(
862 config: AnalyticsIndexerConfig,
863 file_type: FileType,
864 starting_checkpoint_seq_num: u64,
865) -> Result<Box<dyn AnalyticsWriter<S>>> {
866 Ok(match config.file_format {
867 FileFormat::CSV => Box::new(CSVWriter::new(
868 &config.checkpoint_dir,
869 file_type,
870 starting_checkpoint_seq_num,
871 )?),
872 FileFormat::PARQUET => Box::new(ParquetWriter::new(
873 &config.checkpoint_dir,
874 file_type,
875 starting_checkpoint_seq_num,
876 )?),
877 })
878}
879
880pub async fn get_starting_checkpoint_seq_num(
881 config: AnalyticsIndexerConfig,
882 file_type: FileType,
883) -> Result<u64> {
884 let checkpoint = if let Some(starting_checkpoint_seq_num) = config.starting_checkpoint_seq_num {
885 starting_checkpoint_seq_num
886 } else {
887 read_store_for_checkpoint(
888 config.remote_store_config.clone(),
889 file_type,
890 config.remote_store_path_prefix,
891 )
892 .await?
893 };
894 Ok(checkpoint)
895}
896
897pub async fn make_analytics_processor(
898 config: AnalyticsIndexerConfig,
899 metrics: AnalyticsMetrics,
900) -> Result<Processor> {
901 match config.file_type {
902 FileType::Checkpoint => make_checkpoint_processor(config, metrics).await,
903 FileType::Object => make_object_processor(config, metrics).await,
904 FileType::Transaction => make_transaction_processor(config, metrics).await,
905 FileType::Event => make_event_processor(config, metrics).await,
906 FileType::TransactionObjects => make_transaction_objects_processor(config, metrics).await,
907 FileType::MoveCall => make_move_call_processor(config, metrics).await,
908 FileType::MovePackage => make_move_package_processor(config, metrics).await,
909 FileType::DynamicField => make_dynamic_field_processor(config, metrics).await,
910 FileType::WrappedObject => make_wrapped_object_processor(config, metrics).await,
911 }
912}
913
914pub fn join_paths(base: Option<Path>, child: &Path) -> Path {
915 base.map(|p| {
916 let mut out_path = p.clone();
917 for part in child.parts() {
918 out_path = out_path.child(part)
919 }
920 out_path
921 })
922 .unwrap_or(child.clone())
923}