iota_data_ingestion/workers/
archival.rs1use std::{io::Cursor, sync::Arc};
6
7use async_trait::async_trait;
8use byteorder::{BigEndian, ByteOrder};
9use bytes::Bytes;
10use iota_archival::{
11 CHECKPOINT_FILE_MAGIC, FileType, Manifest, SUMMARY_FILE_MAGIC, create_file_metadata_from_bytes,
12 finalize_manifest, read_manifest_from_bytes,
13};
14use iota_data_ingestion_core::{Reducer, create_remote_store_client};
15use iota_storage::{
16 FileCompression, StorageFormat,
17 blob::{Blob, BlobEncoding},
18 compress,
19};
20use iota_types::{
21 base_types::{EpochId, ExecutionData},
22 full_checkpoint_content::CheckpointData,
23 messages_checkpoint::{CheckpointSequenceNumber, FullCheckpointContents},
24};
25use object_store::{ObjectStore, path::Path};
26use serde::{Deserialize, Serialize};
27
28use crate::workers::RelayWorker;
29
30#[derive(Serialize, Deserialize, Clone, Debug)]
31#[serde(rename_all = "kebab-case")]
32pub struct ArchivalConfig {
33 pub remote_url: String,
34 pub remote_store_options: Vec<(String, String)>,
35 pub commit_file_size: usize,
36 pub commit_duration_seconds: u64,
37}
38
39pub struct ArchivalReducer {
40 remote_store: Box<dyn ObjectStore>,
41 commit_duration_ms: u64,
42}
43
44impl ArchivalReducer {
45 pub async fn new(config: ArchivalConfig) -> anyhow::Result<Self> {
46 let remote_store =
47 create_remote_store_client(config.remote_url, config.remote_store_options, 10)?;
48
49 Ok(Self {
50 remote_store,
51 commit_duration_ms: config.commit_duration_seconds * 1000,
52 })
53 }
54
55 async fn upload(
56 &self,
57 epoch: EpochId,
58 start: CheckpointSequenceNumber,
59 end: CheckpointSequenceNumber,
60 summary_buffer: Vec<u8>,
61 buffer: Vec<u8>,
62 ) -> anyhow::Result<()> {
63 let checkpoint_file_path = format!("epoch_{}/{}.chk", epoch, start);
64 let chk_bytes = self
65 .upload_file(
66 Path::from(checkpoint_file_path.clone()),
67 CHECKPOINT_FILE_MAGIC,
68 &buffer,
69 )
70 .await?;
71 let summary_file_path = format!("epoch_{}/{}.sum", epoch, start);
72 let sum_bytes = self
73 .upload_file(
74 Path::from(summary_file_path.clone()),
75 SUMMARY_FILE_MAGIC,
76 &summary_buffer,
77 )
78 .await?;
79 let mut manifest = Self::read_manifest(&self.remote_store).await?;
80 let checkpoint_file_metadata = create_file_metadata_from_bytes(
81 chk_bytes,
82 FileType::CheckpointContent,
83 epoch,
84 start..end,
85 )?;
86 let summary_file_metadata = create_file_metadata_from_bytes(
87 sum_bytes,
88 FileType::CheckpointSummary,
89 epoch,
90 start..end,
91 )?;
92 manifest.update(epoch, end, checkpoint_file_metadata, summary_file_metadata);
93
94 let bytes = finalize_manifest(manifest)?;
95 self.remote_store
96 .put(&Path::from("MANIFEST"), bytes.into())
97 .await?;
98 Ok(())
99 }
100
101 async fn upload_file(
102 &self,
103 location: Path,
104 magic: u32,
105 content: &[u8],
106 ) -> anyhow::Result<Bytes> {
107 let mut buffer = vec![0; 4];
108 BigEndian::write_u32(&mut buffer, magic);
109 buffer.push(StorageFormat::Blob.into());
110 buffer.push(FileCompression::Zstd.into());
111 buffer.extend_from_slice(content);
112 let mut compressed_buffer = vec![];
113 let mut cursor = Cursor::new(buffer);
114 compress(&mut cursor, &mut compressed_buffer)?;
115 self.remote_store
116 .put(&location, Bytes::from(compressed_buffer.clone()).into())
117 .await?;
118 Ok(Bytes::from(compressed_buffer))
119 }
120
121 pub async fn get_watermark(&self) -> anyhow::Result<CheckpointSequenceNumber> {
122 let manifest = Self::read_manifest(&self.remote_store).await?;
123 Ok(manifest.next_checkpoint_seq_num())
124 }
125
126 async fn read_manifest(remote_store: &dyn ObjectStore) -> anyhow::Result<Manifest> {
127 Ok(match remote_store.get(&Path::from("MANIFEST")).await {
128 Ok(resp) => read_manifest_from_bytes(resp.bytes().await?.to_vec())?,
129 Err(err) if err.to_string().contains("404") => Manifest::new(0, 0),
130 Err(err) => Err(err)?,
131 })
132 }
133}
134
135#[async_trait]
136impl Reducer<RelayWorker> for ArchivalReducer {
137 async fn commit(&self, batch: &[Arc<CheckpointData>]) -> Result<(), anyhow::Error> {
138 if batch.is_empty() {
139 return Err(anyhow::anyhow!("commit batch can't be empty"));
140 }
141 let mut summary_buffer = vec![];
142 let mut buffer = vec![];
143 let first_checkpoint = &batch[0];
144 let epoch = first_checkpoint.checkpoint_summary.epoch;
145 let start_checkpoint = first_checkpoint.checkpoint_summary.sequence_number;
146 let mut last_checkpoint = start_checkpoint;
147 for checkpoint in batch {
148 let full_checkpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
149 checkpoint.checkpoint_contents.clone(),
150 checkpoint
151 .transactions
152 .iter()
153 .map(|t| ExecutionData::new(t.transaction.clone(), t.effects.clone())),
154 );
155 let contents_blob = Blob::encode(&full_checkpoint_contents, BlobEncoding::Bcs).unwrap();
156 let summary_blob =
157 Blob::encode(&checkpoint.checkpoint_summary, BlobEncoding::Bcs).unwrap();
158 contents_blob.write(&mut buffer).unwrap();
159 summary_blob.write(&mut summary_buffer).unwrap();
160 last_checkpoint += 1;
161 }
162 self.upload(
163 epoch,
164 start_checkpoint,
165 last_checkpoint,
166 summary_buffer,
167 buffer,
168 )
169 .await
170 .unwrap();
171 Ok(())
172 }
173
174 fn should_close_batch(
175 &self,
176 batch: &[Arc<CheckpointData>],
177 next_item: Option<&Arc<CheckpointData>>,
178 ) -> bool {
179 if batch.is_empty() || next_item.is_none() {
181 return false;
182 }
183 let first_checkpoint = &batch[0].checkpoint_summary;
184 let next_checkpoint = next_item.expect("invariant's checked");
185 next_checkpoint.checkpoint_summary.epoch != first_checkpoint.epoch
186 || next_checkpoint.checkpoint_summary.timestamp_ms
187 > (self.commit_duration_ms + first_checkpoint.timestamp_ms)
188 }
189}