iota_data_ingestion/workers/
historical.rs1use std::{io::Cursor, ops::Range, sync::Arc};
6
7use async_trait::async_trait;
8use byteorder::{BigEndian, ByteOrder};
9use bytes::Bytes;
10use iota_config::object_storage_config::ObjectStoreConfig;
11use iota_data_ingestion_core::{
12 Reducer,
13 history::{
14 CHECKPOINT_FILE_MAGIC, MAGIC_BYTES,
15 manifest::{
16 Manifest, create_file_metadata_from_bytes, finalize_manifest, read_manifest_from_bytes,
17 },
18 },
19};
20use iota_storage::{
21 FileCompression, StorageFormat,
22 blob::{Blob, BlobEncoding},
23 compress,
24};
25use iota_types::{
26 full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
27};
28use object_store::{DynObjectStore, Error as ObjectStoreError, ObjectStore};
29use serde::{Deserialize, Serialize};
30
31use crate::RelayWorker;
32
33#[derive(Serialize, Deserialize, Clone, Debug)]
34#[serde(rename_all = "kebab-case")]
35pub struct HistoricalWriterConfig {
36 pub object_store_config: ObjectStoreConfig,
37 pub commit_duration_seconds: u64,
38}
39
40pub struct HistoricalReducer {
41 remote_store: Arc<DynObjectStore>,
42 commit_duration_ms: u64,
43}
44
45impl HistoricalReducer {
46 pub async fn new(config: HistoricalWriterConfig) -> anyhow::Result<Self> {
47 let remote_store = config.object_store_config.make()?;
48
49 Ok(Self {
50 remote_store,
51 commit_duration_ms: config.commit_duration_seconds * 1000,
52 })
53 }
54
55 async fn upload(
56 &self,
57 checkpoint_range: Range<CheckpointSequenceNumber>,
58 data: Bytes,
59 ) -> anyhow::Result<()> {
60 let file_metadata =
61 create_file_metadata_from_bytes(data.clone(), checkpoint_range.clone())?;
62 self.remote_store
63 .put(&file_metadata.file_path(), data.into())
64 .await?;
65 let mut manifest = Self::read_manifest(&self.remote_store).await?;
66 manifest.update(checkpoint_range.end, file_metadata);
67
68 let bytes = finalize_manifest(manifest)?;
69 self.remote_store
70 .put(&Manifest::file_path(), bytes.into())
71 .await?;
72 Ok(())
73 }
74
75 fn prepare_data_to_upload(&self, mut checkpoint_data: Vec<u8>) -> anyhow::Result<Bytes> {
76 let mut buffer = vec![0; MAGIC_BYTES];
77 BigEndian::write_u32(&mut buffer, CHECKPOINT_FILE_MAGIC);
78 buffer.push(StorageFormat::Blob.into());
79 buffer.push(FileCompression::Zstd.into());
80 buffer.append(&mut checkpoint_data);
81 let mut compressed_buffer = vec![];
82 let mut cursor = Cursor::new(buffer);
83 compress(&mut cursor, &mut compressed_buffer)?;
84 Ok(Bytes::from(compressed_buffer))
85 }
86
87 pub async fn get_watermark(&self) -> anyhow::Result<CheckpointSequenceNumber> {
88 let manifest = Self::read_manifest(&self.remote_store).await?;
89 Ok(manifest.next_checkpoint_seq_num())
90 }
91
92 async fn read_manifest(remote_store: &dyn ObjectStore) -> anyhow::Result<Manifest> {
93 Ok(match remote_store.get(&Manifest::file_path()).await {
94 Ok(resp) => read_manifest_from_bytes(resp.bytes().await?.to_vec())?,
95 Err(ObjectStoreError::NotFound { .. }) => Manifest::new(0),
96 Err(err) => Err(err)?,
97 })
98 }
99}
100
101#[async_trait]
102impl Reducer<RelayWorker> for HistoricalReducer {
103 async fn commit(&self, batch: &[Arc<CheckpointData>]) -> Result<(), anyhow::Error> {
104 if batch.is_empty() {
105 anyhow::bail!("commit batch can't be empty");
106 }
107 let mut buffer = vec![];
108 let first_checkpoint = &batch[0];
109 let start_checkpoint = first_checkpoint.checkpoint_summary.sequence_number;
110 let uploaded_range = start_checkpoint..(start_checkpoint + batch.len() as u64);
111 for checkpoint in batch {
112 let data = Blob::encode(&checkpoint, BlobEncoding::Bcs)?;
113 data.write(&mut buffer)?;
114 }
115 self.upload(uploaded_range, self.prepare_data_to_upload(buffer)?)
116 .await
117 }
118
119 fn should_close_batch(
120 &self,
121 batch: &[Arc<CheckpointData>],
122 next_item: Option<&Arc<CheckpointData>>,
123 ) -> bool {
124 if batch.is_empty() || next_item.is_none() {
126 return false;
127 }
128 let first_checkpoint = &batch[0].checkpoint_summary;
129 let next_checkpoint = next_item.expect("invariant's checked");
130 if next_checkpoint.checkpoint_summary.sequence_number == 1 {
132 return true;
133 }
134 next_checkpoint.checkpoint_summary.epoch != first_checkpoint.epoch
135 || next_checkpoint.checkpoint_summary.timestamp_ms
136 > (self.commit_duration_ms + first_checkpoint.timestamp_ms)
137 }
138}