iota_analytics_indexer/writers/
parquet_writer.rs1use std::{
6 fs::{File, create_dir_all, remove_file},
7 ops::Range,
8 path::{Path, PathBuf},
9 sync::Arc,
10};
11
12use anyhow::{Result, anyhow};
13use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray, UInt64Array};
14use iota_storage::object_store::util::path_to_filesystem;
15use iota_types::base_types::EpochId;
16use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
17use serde::Serialize;
18
19use crate::{AnalyticsWriter, FileFormat, FileType, ParquetSchema, ParquetValue};
20
21pub(crate) struct ParquetWriter {
23 root_dir_path: PathBuf,
24 file_type: FileType,
25 epoch: EpochId,
26 checkpoint_range: Range<u64>,
27 data: Vec<Vec<ParquetValue>>,
28}
29
30impl ParquetWriter {
31 pub(crate) fn new(
32 root_dir_path: &Path,
33 file_type: FileType,
34 start_checkpoint_seq_num: u64,
35 ) -> Result<Self> {
36 let checkpoint_range = start_checkpoint_seq_num..u64::MAX;
37 Ok(Self {
38 root_dir_path: root_dir_path.to_path_buf(),
39 file_type,
40 epoch: 0,
41 checkpoint_range,
42 data: vec![],
43 })
44 }
45
46 fn file(&self) -> Result<File> {
47 let file_path = path_to_filesystem(
48 self.root_dir_path.clone(),
49 &self.file_type.file_path(
50 FileFormat::PARQUET,
51 self.epoch,
52 self.checkpoint_range.clone(),
53 ),
54 )?;
55 create_dir_all(file_path.parent().ok_or(anyhow!("Bad directory path"))?)?;
56 if file_path.exists() {
57 remove_file(&file_path)?;
58 }
59 Ok(File::create(&file_path)?)
60 }
61}
62
63macro_rules! convert_to_arrow_array {
64 ($column:ident, $target_vector:ident, $($variant:path => $types:ty),*) => {
65 match &$column[0] {
66 $(
67 $variant(_) => {
68 let array = <$types>::from(
69 $column
70 .into_iter()
71 .flat_map(|value| match value {
72 $variant(value) => Some(value),
73 _ => None,
74 })
75 .collect::<Vec<_>>(),
76 );
77 $target_vector.push(Arc::new(array) as ArrayRef);
78 }
79 )*
80 }
81 };
82}
83
84impl<S: Serialize + ParquetSchema> AnalyticsWriter<S> for ParquetWriter {
85 fn file_format(&self) -> Result<FileFormat> {
86 Ok(FileFormat::PARQUET)
87 }
88
89 fn write(&mut self, rows: &[S]) -> Result<()> {
90 for row in rows {
91 for col_idx in 0..S::schema().len() {
92 if col_idx == self.data.len() {
93 self.data.push(vec![]);
94 }
95 self.data[col_idx].push(row.get_column(col_idx));
96 }
97 }
98 Ok(())
99 }
100
101 fn flush(&mut self, end_checkpoint_seq_num: u64) -> Result<bool> {
102 if self.data.is_empty() {
103 return Ok(false);
104 }
105 self.checkpoint_range.end = end_checkpoint_seq_num;
106 let mut batch_data = vec![];
107 for column in std::mem::take(&mut self.data) {
108 convert_to_arrow_array!(column, batch_data,
109 ParquetValue::U64 => UInt64Array, ParquetValue::Str => StringArray, ParquetValue::OptionU64 => UInt64Array, ParquetValue::OptionStr => StringArray, ParquetValue::Bool => BooleanArray, ParquetValue::I64 => Int64Array
110 );
111 }
112 let batch = RecordBatch::try_from_iter(S::schema().iter().zip(batch_data.into_iter()))?;
113
114 let properties = WriterProperties::builder()
115 .set_compression(Compression::SNAPPY)
116 .build();
117
118 let mut writer = ArrowWriter::try_new(self.file()?, batch.schema(), Some(properties))?;
119 writer.write(&batch)?;
120 writer.close()?;
121 Ok(true)
122 }
123
124 fn reset(&mut self, epoch_num: EpochId, start_checkpoint_seq_num: u64) -> Result<()> {
125 self.checkpoint_range.start = start_checkpoint_seq_num;
126 self.checkpoint_range.end = u64::MAX;
127 self.epoch = epoch_num;
128 self.data = vec![];
129 Ok(())
130 }
131
132 fn file_size(&self) -> Result<Option<u64>> {
133 Ok(None)
137 }
138}