iota_analytics_indexer/writers/
parquet_writer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fs::{File, create_dir_all, remove_file},
7    ops::Range,
8    path::{Path, PathBuf},
9    sync::Arc,
10};
11
12use anyhow::{Result, anyhow};
13use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringArray, UInt64Array};
14use iota_storage::object_store::util::path_to_filesystem;
15use iota_types::base_types::EpochId;
16use parquet::{arrow::ArrowWriter, basic::Compression, file::properties::WriterProperties};
17use serde::Serialize;
18
19use crate::{AnalyticsWriter, FileFormat, FileType, ParquetSchema, ParquetValue};
20
21// Save table entries to parquet files.
22pub(crate) struct ParquetWriter {
23    root_dir_path: PathBuf,
24    file_type: FileType,
25    epoch: EpochId,
26    checkpoint_range: Range<u64>,
27    data: Vec<Vec<ParquetValue>>,
28}
29
30impl ParquetWriter {
31    pub(crate) fn new(
32        root_dir_path: &Path,
33        file_type: FileType,
34        start_checkpoint_seq_num: u64,
35    ) -> Result<Self> {
36        let checkpoint_range = start_checkpoint_seq_num..u64::MAX;
37        Ok(Self {
38            root_dir_path: root_dir_path.to_path_buf(),
39            file_type,
40            epoch: 0,
41            checkpoint_range,
42            data: vec![],
43        })
44    }
45
46    fn file(&self) -> Result<File> {
47        let file_path = path_to_filesystem(
48            self.root_dir_path.clone(),
49            &self.file_type.file_path(
50                FileFormat::PARQUET,
51                self.epoch,
52                self.checkpoint_range.clone(),
53            ),
54        )?;
55        create_dir_all(file_path.parent().ok_or(anyhow!("Bad directory path"))?)?;
56        if file_path.exists() {
57            remove_file(&file_path)?;
58        }
59        Ok(File::create(&file_path)?)
60    }
61}
62
63macro_rules! convert_to_arrow_array {
64    ($column:ident, $target_vector:ident, $($variant:path => $types:ty),*) => {
65        match &$column[0] {
66            $(
67                $variant(_) => {
68                    let array = <$types>::from(
69                        $column
70                            .into_iter()
71                            .flat_map(|value| match value {
72                                $variant(value) => Some(value),
73                                _ => None,
74                            })
75                            .collect::<Vec<_>>(),
76                    );
77                    $target_vector.push(Arc::new(array) as ArrayRef);
78                }
79            )*
80        }
81    };
82}
83
84impl<S: Serialize + ParquetSchema> AnalyticsWriter<S> for ParquetWriter {
85    fn file_format(&self) -> Result<FileFormat> {
86        Ok(FileFormat::PARQUET)
87    }
88
89    fn write(&mut self, rows: &[S]) -> Result<()> {
90        for row in rows {
91            for col_idx in 0..S::schema().len() {
92                if col_idx == self.data.len() {
93                    self.data.push(vec![]);
94                }
95                self.data[col_idx].push(row.get_column(col_idx));
96            }
97        }
98        Ok(())
99    }
100
101    fn flush(&mut self, end_checkpoint_seq_num: u64) -> Result<bool> {
102        if self.data.is_empty() {
103            return Ok(false);
104        }
105        self.checkpoint_range.end = end_checkpoint_seq_num;
106        let mut batch_data = vec![];
107        for column in std::mem::take(&mut self.data) {
108            convert_to_arrow_array!(column, batch_data,
109                ParquetValue::U64 => UInt64Array, ParquetValue::Str => StringArray, ParquetValue::OptionU64 => UInt64Array, ParquetValue::OptionStr => StringArray, ParquetValue::Bool => BooleanArray, ParquetValue::I64 => Int64Array
110            );
111        }
112        let batch = RecordBatch::try_from_iter(S::schema().iter().zip(batch_data.into_iter()))?;
113
114        let properties = WriterProperties::builder()
115            .set_compression(Compression::SNAPPY)
116            .build();
117
118        let mut writer = ArrowWriter::try_new(self.file()?, batch.schema(), Some(properties))?;
119        writer.write(&batch)?;
120        writer.close()?;
121        Ok(true)
122    }
123
124    fn reset(&mut self, epoch_num: EpochId, start_checkpoint_seq_num: u64) -> Result<()> {
125        self.checkpoint_range.start = start_checkpoint_seq_num;
126        self.checkpoint_range.end = u64::MAX;
127        self.epoch = epoch_num;
128        self.data = vec![];
129        Ok(())
130    }
131
132    fn file_size(&self) -> Result<Option<u64>> {
133        // parquet writer doesn't write records in a temp staging file
134        // and only flushes records after serializing and compressing them
135        // when flush is invoked
136        Ok(None)
137    }
138}