iota_analytics_indexer/writers/
csv_writer.rs1#![allow(dead_code)]
6
7use std::{
8 fs,
9 fs::{File, create_dir_all, remove_file},
10 ops::Range,
11 path::{Path, PathBuf},
12};
13
14use anyhow::{Result, anyhow};
15use csv::{Writer, WriterBuilder};
16use iota_storage::object_store::util::path_to_filesystem;
17use iota_types::base_types::EpochId;
18use serde::Serialize;
19
20use crate::{FileFormat, FileType, ParquetSchema, writers::AnalyticsWriter};
21
22pub(crate) struct CSVWriter {
24 root_dir_path: PathBuf,
25 file_type: FileType,
26 writer: Writer<File>,
27 epoch: EpochId,
28 checkpoint_range: Range<u64>,
29}
30
31impl CSVWriter {
32 pub(crate) fn new(
33 root_dir_path: &Path,
34 file_type: FileType,
35 start_checkpoint_seq_num: u64,
36 ) -> Result<Self> {
37 let checkpoint_range = start_checkpoint_seq_num..u64::MAX;
38 let writer = Self::make_writer(
39 root_dir_path.to_path_buf(),
40 file_type,
41 0,
42 checkpoint_range.clone(),
43 )?;
44 Ok(CSVWriter {
45 root_dir_path: root_dir_path.to_path_buf(),
46 file_type,
47 writer,
48 epoch: 0,
49 checkpoint_range,
50 })
51 }
52
53 fn make_writer(
54 root_dir_path: PathBuf,
55 file_type: FileType,
56 epoch_num: EpochId,
57 checkpoint_range: Range<u64>,
58 ) -> Result<Writer<File>> {
59 let file_path = path_to_filesystem(
60 root_dir_path,
61 &file_type.file_path(FileFormat::CSV, epoch_num, checkpoint_range),
62 )?;
63 create_dir_all(file_path.parent().ok_or(anyhow!("Bad directory path"))?)?;
64 if file_path.exists() {
65 remove_file(&file_path)?;
66 }
67 let writer = WriterBuilder::new()
68 .has_headers(false)
69 .delimiter(b'|')
70 .from_path(file_path)?;
71 Ok(writer)
72 }
73
74 fn file_path(&self, epoch: EpochId, range: Range<u64>) -> Result<PathBuf> {
75 path_to_filesystem(
76 self.root_dir_path.clone(),
77 &self.file_type.file_path(FileFormat::CSV, epoch, range),
78 )
79 }
80}
81
82impl<S: Serialize + ParquetSchema> AnalyticsWriter<S> for CSVWriter {
83 fn file_format(&self) -> Result<FileFormat> {
84 Ok(FileFormat::CSV)
85 }
86
87 fn write(&mut self, rows: &[S]) -> Result<()> {
88 for row in rows {
89 self.writer.serialize(row)?;
90 }
91 Ok(())
92 }
93
94 fn flush(&mut self, end_checkpoint_seq_num: u64) -> Result<bool> {
95 self.writer.flush()?;
96 let old_file_path = self.file_path(self.epoch, self.checkpoint_range.clone())?;
97 let new_file_path = self.file_path(
98 self.epoch,
99 self.checkpoint_range.start..end_checkpoint_seq_num,
100 )?;
101 fs::rename(old_file_path, new_file_path)?;
102 Ok(true)
103 }
104
105 fn reset(&mut self, epoch_num: EpochId, start_checkpoint_seq_num: u64) -> Result<()> {
106 self.checkpoint_range.start = start_checkpoint_seq_num;
107 self.checkpoint_range.end = u64::MAX;
108 self.epoch = epoch_num;
109 self.writer = CSVWriter::make_writer(
110 self.root_dir_path.clone(),
111 self.file_type,
112 self.epoch,
113 self.checkpoint_range.clone(),
114 )?;
115 Ok(())
116 }
117
118 fn file_size(&self) -> Result<Option<u64>> {
119 let file_path = self.file_path(self.epoch, self.checkpoint_range.clone())?;
120 let len = fs::metadata(file_path)?.len();
121 Ok(Some(len))
122 }
123}