iota_analytics_indexer/writers/
csv_writer.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#![allow(dead_code)]
6
7use std::{
8    fs,
9    fs::{File, create_dir_all, remove_file},
10    ops::Range,
11    path::{Path, PathBuf},
12};
13
14use anyhow::{Result, anyhow};
15use csv::{Writer, WriterBuilder};
16use iota_storage::object_store::util::path_to_filesystem;
17use iota_types::base_types::EpochId;
18use serde::Serialize;
19
20use crate::{FileFormat, FileType, ParquetSchema, writers::AnalyticsWriter};
21
22// Save table entries to csv files.
23pub(crate) struct CSVWriter {
24    root_dir_path: PathBuf,
25    file_type: FileType,
26    writer: Writer<File>,
27    epoch: EpochId,
28    checkpoint_range: Range<u64>,
29}
30
31impl CSVWriter {
32    pub(crate) fn new(
33        root_dir_path: &Path,
34        file_type: FileType,
35        start_checkpoint_seq_num: u64,
36    ) -> Result<Self> {
37        let checkpoint_range = start_checkpoint_seq_num..u64::MAX;
38        let writer = Self::make_writer(
39            root_dir_path.to_path_buf(),
40            file_type,
41            0,
42            checkpoint_range.clone(),
43        )?;
44        Ok(CSVWriter {
45            root_dir_path: root_dir_path.to_path_buf(),
46            file_type,
47            writer,
48            epoch: 0,
49            checkpoint_range,
50        })
51    }
52
53    fn make_writer(
54        root_dir_path: PathBuf,
55        file_type: FileType,
56        epoch_num: EpochId,
57        checkpoint_range: Range<u64>,
58    ) -> Result<Writer<File>> {
59        let file_path = path_to_filesystem(
60            root_dir_path,
61            &file_type.file_path(FileFormat::CSV, epoch_num, checkpoint_range),
62        )?;
63        create_dir_all(file_path.parent().ok_or(anyhow!("Bad directory path"))?)?;
64        if file_path.exists() {
65            remove_file(&file_path)?;
66        }
67        let writer = WriterBuilder::new()
68            .has_headers(false)
69            .delimiter(b'|')
70            .from_path(file_path)?;
71        Ok(writer)
72    }
73
74    fn file_path(&self, epoch: EpochId, range: Range<u64>) -> Result<PathBuf> {
75        path_to_filesystem(
76            self.root_dir_path.clone(),
77            &self.file_type.file_path(FileFormat::CSV, epoch, range),
78        )
79    }
80}
81
82impl<S: Serialize + ParquetSchema> AnalyticsWriter<S> for CSVWriter {
83    fn file_format(&self) -> Result<FileFormat> {
84        Ok(FileFormat::CSV)
85    }
86
87    fn write(&mut self, rows: &[S]) -> Result<()> {
88        for row in rows {
89            self.writer.serialize(row)?;
90        }
91        Ok(())
92    }
93
94    fn flush(&mut self, end_checkpoint_seq_num: u64) -> Result<bool> {
95        self.writer.flush()?;
96        let old_file_path = self.file_path(self.epoch, self.checkpoint_range.clone())?;
97        let new_file_path = self.file_path(
98            self.epoch,
99            self.checkpoint_range.start..end_checkpoint_seq_num,
100        )?;
101        fs::rename(old_file_path, new_file_path)?;
102        Ok(true)
103    }
104
105    fn reset(&mut self, epoch_num: EpochId, start_checkpoint_seq_num: u64) -> Result<()> {
106        self.checkpoint_range.start = start_checkpoint_seq_num;
107        self.checkpoint_range.end = u64::MAX;
108        self.epoch = epoch_num;
109        self.writer = CSVWriter::make_writer(
110            self.root_dir_path.clone(),
111            self.file_type,
112            self.epoch,
113            self.checkpoint_range.clone(),
114        )?;
115        Ok(())
116    }
117
118    fn file_size(&self) -> Result<Option<u64>> {
119        let file_path = self.file_path(self.epoch, self.checkpoint_range.clone())?;
120        let len = fs::metadata(file_path)?.len();
121        Ok(Some(len))
122    }
123}