1#![allow(dead_code)]
6
7use std::{
8 fs,
9 fs::{File, OpenOptions},
10 io::{BufWriter, Seek, SeekFrom, Write},
11 ops::Range,
12 path::{Path, PathBuf},
13 sync::Arc,
14 thread::sleep,
15 time::Duration,
16};
17
18use anyhow::{Context, Result, anyhow};
19use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
20use iota_config::object_storage_config::ObjectStoreConfig;
21use iota_storage::{
22 FileCompression, StorageFormat,
23 blob::{Blob, BlobEncoding},
24 compress,
25 object_store::util::{copy_file, path_to_filesystem},
26};
27use iota_types::{
28 messages_checkpoint::{
29 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber,
30 FullCheckpointContents as CheckpointContents,
31 },
32 storage::WriteStore,
33};
34use object_store::DynObjectStore;
35use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
36use tokio::{
37 sync::{
38 mpsc,
39 mpsc::{Receiver, Sender},
40 },
41 time::Instant,
42};
43use tracing::{debug, info};
44
45use crate::{
46 CHECKPOINT_FILE_MAGIC, CHECKPOINT_FILE_SUFFIX, CheckpointUpdates, EPOCH_DIR_PREFIX,
47 FileMetadata, FileType, MAGIC_BYTES, Manifest, SUMMARY_FILE_MAGIC, SUMMARY_FILE_SUFFIX,
48 create_file_metadata, read_manifest, write_manifest,
49};
50
51pub struct ArchiveMetrics {
52 pub latest_checkpoint_archived: IntGauge,
53}
54
55impl ArchiveMetrics {
56 pub fn new(registry: &Registry) -> Arc<Self> {
57 let this = Self {
58 latest_checkpoint_archived: register_int_gauge_with_registry!(
59 "latest_checkpoint_archived",
60 "Latest checkpoint to have archived to the remote store",
61 registry
62 )
63 .unwrap(),
64 };
65 Arc::new(this)
66 }
67}
68
69struct CheckpointWriter {
72 root_dir_path: PathBuf,
73 epoch_num: u64,
74 checkpoint_range: Range<u64>,
75 wbuf: BufWriter<File>,
76 summary_wbuf: BufWriter<File>,
77 sender: Sender<CheckpointUpdates>,
78 checkpoint_buf_offset: usize,
79 file_compression: FileCompression,
80 storage_format: StorageFormat,
81 manifest: Manifest,
82 last_commit_instant: Instant,
83 commit_duration: Duration,
84 commit_file_size: usize,
85}
86
87impl CheckpointWriter {
88 fn new(
89 root_dir_path: PathBuf,
90 file_compression: FileCompression,
91 storage_format: StorageFormat,
92 sender: Sender<CheckpointUpdates>,
93 manifest: Manifest,
94 commit_duration: Duration,
95 commit_file_size: usize,
96 ) -> Result<Self> {
97 let epoch_num = manifest.epoch_num();
98 let checkpoint_sequence_num = manifest.next_checkpoint_seq_num();
99 let epoch_dir = root_dir_path.join(format!("{}{epoch_num}", EPOCH_DIR_PREFIX));
100 if epoch_dir.exists() {
101 fs::remove_dir_all(&epoch_dir)?;
102 }
103 fs::create_dir_all(&epoch_dir)?;
104 let checkpoint_file = Self::next_file(
105 &epoch_dir,
106 checkpoint_sequence_num,
107 CHECKPOINT_FILE_SUFFIX,
108 CHECKPOINT_FILE_MAGIC,
109 storage_format,
110 file_compression,
111 )?;
112 let summary_file = Self::next_file(
113 &epoch_dir,
114 checkpoint_sequence_num,
115 SUMMARY_FILE_SUFFIX,
116 SUMMARY_FILE_MAGIC,
117 storage_format,
118 file_compression,
119 )?;
120 Ok(CheckpointWriter {
121 root_dir_path,
122 epoch_num,
123 checkpoint_range: checkpoint_sequence_num..checkpoint_sequence_num,
124 wbuf: BufWriter::new(checkpoint_file),
125 summary_wbuf: BufWriter::new(summary_file),
126 checkpoint_buf_offset: 0,
127 sender,
128 file_compression,
129 storage_format,
130 manifest,
131 last_commit_instant: Instant::now(),
132 commit_duration,
133 commit_file_size,
134 })
135 }
136
137 pub fn write(
138 &mut self,
139 checkpoint_contents: CheckpointContents,
140 checkpoint_summary: Checkpoint,
141 ) -> Result<()> {
142 match self.storage_format {
143 StorageFormat::Blob => self.write_as_blob(checkpoint_contents, checkpoint_summary),
144 }
145 }
146
147 pub fn write_as_blob(
148 &mut self,
149 checkpoint_contents: CheckpointContents,
150 checkpoint_summary: Checkpoint,
151 ) -> Result<()> {
152 assert_eq!(
153 checkpoint_summary.sequence_number,
154 self.checkpoint_range.end
155 );
156
157 if checkpoint_summary.epoch()
158 == self
159 .epoch_num
160 .checked_add(1)
161 .context("Epoch num overflow")?
162 {
163 self.cut()?;
164 self.update_to_next_epoch();
165 if self.epoch_dir().exists() {
166 fs::remove_dir_all(self.epoch_dir())?;
167 }
168 fs::create_dir_all(self.epoch_dir())?;
169 self.reset()?;
170 }
171
172 assert_eq!(checkpoint_summary.epoch, self.epoch_num);
173
174 assert_eq!(
175 checkpoint_summary.content_digest,
176 *checkpoint_contents.checkpoint_contents().digest()
177 );
178
179 let contents_blob = Blob::encode(&checkpoint_contents, BlobEncoding::Bcs)?;
180 let blob_size = contents_blob.size();
181 let cut_new_checkpoint_file = (self.checkpoint_buf_offset + blob_size)
185 > self.commit_file_size
186 || (self.last_commit_instant.elapsed() > self.commit_duration);
187 if cut_new_checkpoint_file {
188 self.cut()?;
189 self.reset()?;
190 }
191
192 self.checkpoint_buf_offset += contents_blob.write(&mut self.wbuf)?;
193
194 let summary_blob = Blob::encode(&checkpoint_summary, BlobEncoding::Bcs)?;
195 summary_blob.write(&mut self.summary_wbuf)?;
196
197 self.checkpoint_range.end = self
198 .checkpoint_range
199 .end
200 .checked_add(1)
201 .context("Checkpoint sequence num overflow")?;
202 Ok(())
203 }
204 fn finalize(&mut self) -> Result<FileMetadata> {
205 self.wbuf.flush()?;
206 self.wbuf.get_ref().sync_data()?;
207 let off = self.wbuf.get_ref().stream_position()?;
208 self.wbuf.get_ref().set_len(off)?;
209 let file_path = self.epoch_dir().join(format!(
210 "{}.{CHECKPOINT_FILE_SUFFIX}",
211 self.checkpoint_range.start
212 ));
213 self.compress(&file_path)?;
214 let file_metadata = create_file_metadata(
215 &file_path,
216 FileType::CheckpointContent,
217 self.epoch_num,
218 self.checkpoint_range.clone(),
219 )?;
220 Ok(file_metadata)
221 }
222 fn finalize_summary(&mut self) -> Result<FileMetadata> {
223 self.summary_wbuf.flush()?;
224 self.summary_wbuf.get_ref().sync_data()?;
225 let off = self.summary_wbuf.get_ref().stream_position()?;
226 self.summary_wbuf.get_ref().set_len(off)?;
227 let file_path = self.epoch_dir().join(format!(
228 "{}.{SUMMARY_FILE_SUFFIX}",
229 self.checkpoint_range.start
230 ));
231 self.compress(&file_path)?;
232 let file_metadata = create_file_metadata(
233 &file_path,
234 FileType::CheckpointSummary,
235 self.epoch_num,
236 self.checkpoint_range.clone(),
237 )?;
238 Ok(file_metadata)
239 }
240
241 fn cut(&mut self) -> Result<()> {
244 if !self.checkpoint_range.is_empty() {
245 let checkpoint_file_metadata = self.finalize()?;
246 let summary_file_metadata = self.finalize_summary()?;
247 let checkpoint_updates = CheckpointUpdates::new(
248 self.epoch_num,
249 self.checkpoint_range.end,
250 checkpoint_file_metadata,
251 summary_file_metadata,
252 &mut self.manifest,
253 );
254 info!("Checkpoint file cut for: {:?}", checkpoint_updates);
255 self.sender.blocking_send(checkpoint_updates)?;
256 }
257 Ok(())
258 }
259 fn compress(&self, source: &Path) -> Result<()> {
260 if self.file_compression == FileCompression::None {
261 return Ok(());
262 }
263 let mut input = File::open(source)?;
264 let tmp_file_name = source.with_extension("tmp");
265 let mut output = File::create(&tmp_file_name)?;
266 compress(&mut input, &mut output)?;
267 fs::rename(tmp_file_name, source)?;
268 Ok(())
269 }
270 fn next_file(
271 dir_path: &Path,
272 checkpoint_sequence_num: u64,
273 suffix: &str,
274 magic_bytes: u32,
275 storage_format: StorageFormat,
276 file_compression: FileCompression,
277 ) -> Result<File> {
278 let next_file_path = dir_path.join(format!("{checkpoint_sequence_num}.{suffix}"));
279 let mut f = File::create(next_file_path.clone())?;
280 let mut metab = [0u8; MAGIC_BYTES];
281 BigEndian::write_u32(&mut metab, magic_bytes);
282 let n = f.write(&metab)?;
283 drop(f);
284 f = OpenOptions::new().append(true).open(next_file_path)?;
285 f.seek(SeekFrom::Start(n as u64))?;
286 f.write_u8(storage_format.into())?;
287 f.write_u8(file_compression.into())?;
288 Ok(f)
289 }
290 fn create_new_files(&mut self) -> Result<()> {
291 let f = Self::next_file(
292 &self.epoch_dir(),
293 self.checkpoint_range.start,
294 CHECKPOINT_FILE_SUFFIX,
295 CHECKPOINT_FILE_MAGIC,
296 self.storage_format,
297 self.file_compression,
298 )?;
299 self.checkpoint_buf_offset = MAGIC_BYTES;
300 self.wbuf = BufWriter::new(f);
301 let f = Self::next_file(
302 &self.epoch_dir(),
303 self.checkpoint_range.start,
304 SUMMARY_FILE_SUFFIX,
305 SUMMARY_FILE_MAGIC,
306 self.storage_format,
307 self.file_compression,
308 )?;
309 self.summary_wbuf = BufWriter::new(f);
310 Ok(())
311 }
312 fn reset(&mut self) -> Result<()> {
313 self.reset_checkpoint_range();
314 self.create_new_files()?;
315 self.reset_last_commit_ts();
316 Ok(())
317 }
318 fn reset_last_commit_ts(&mut self) {
319 self.last_commit_instant = Instant::now();
320 }
321 fn reset_checkpoint_range(&mut self) {
322 self.checkpoint_range = self.checkpoint_range.end..self.checkpoint_range.end
323 }
324 fn epoch_dir(&self) -> PathBuf {
325 self.root_dir_path
326 .join(format!("{}{}", EPOCH_DIR_PREFIX, self.epoch_num))
327 }
328 fn update_to_next_epoch(&mut self) {
329 self.epoch_num = self.epoch_num.checked_add(1).unwrap();
330 }
331}
332
333pub struct ArchiveWriter {
336 file_compression: FileCompression,
337 storage_format: StorageFormat,
338 local_staging_dir_root: PathBuf,
339 local_object_store: Arc<DynObjectStore>,
340 remote_object_store: Arc<DynObjectStore>,
341 commit_duration: Duration,
342 commit_file_size: usize,
343 archive_metrics: Arc<ArchiveMetrics>,
344}
345
346impl ArchiveWriter {
347 pub async fn new(
348 local_store_config: ObjectStoreConfig,
349 remote_store_config: ObjectStoreConfig,
350 file_compression: FileCompression,
351 storage_format: StorageFormat,
352 commit_duration: Duration,
353 commit_file_size: usize,
354 registry: &Registry,
355 ) -> Result<Self> {
356 Ok(ArchiveWriter {
357 file_compression,
358 storage_format,
359 remote_object_store: remote_store_config.make()?,
360 local_object_store: local_store_config.make()?,
361 local_staging_dir_root: local_store_config.directory.context("Missing local dir")?,
362 commit_duration,
363 commit_file_size,
364 archive_metrics: ArchiveMetrics::new(registry),
365 })
366 }
367
368 pub async fn start<S>(&self, store: S) -> Result<tokio::sync::broadcast::Sender<()>>
372 where
373 S: WriteStore + Send + Sync + 'static,
374 {
375 let remote_archive_is_empty = self
376 .remote_object_store
377 .list_with_delimiter(None)
378 .await
379 .expect("Failed to read remote archive dir")
380 .common_prefixes
381 .is_empty();
382 let manifest = if remote_archive_is_empty {
383 Manifest::new(0, 0)
385 } else {
386 read_manifest(self.remote_object_store.clone())
387 .await
388 .expect("Failed to read manifest")
389 };
390 let start_checkpoint_sequence_number = manifest.next_checkpoint_seq_num();
391 let (sender, receiver) = mpsc::channel::<CheckpointUpdates>(100);
392 let checkpoint_writer = CheckpointWriter::new(
393 self.local_staging_dir_root.clone(),
394 self.file_compression,
395 self.storage_format,
396 sender,
397 manifest,
398 self.commit_duration,
399 self.commit_file_size,
400 )
401 .expect("Failed to create checkpoint writer");
402 let (kill_sender, kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
403
404 tokio::spawn(Self::start_syncing_with_remote(
406 self.remote_object_store.clone(),
407 self.local_object_store.clone(),
408 self.local_staging_dir_root.clone(),
409 receiver,
410 kill_sender.subscribe(),
411 self.archive_metrics.clone(),
412 ));
413
414 tokio::task::spawn_blocking(move || {
416 Self::start_tailing_checkpoints(
417 start_checkpoint_sequence_number,
418 checkpoint_writer,
419 store,
420 kill_receiver,
421 )
422 });
423 Ok(kill_sender)
424 }
425
426 fn start_tailing_checkpoints<S>(
431 start_checkpoint_sequence_number: CheckpointSequenceNumber,
432 mut checkpoint_writer: CheckpointWriter,
433 store: S,
434 mut kill: tokio::sync::broadcast::Receiver<()>,
435 ) -> Result<()>
436 where
437 S: WriteStore + Send + Sync + 'static,
438 {
439 let mut checkpoint_sequence_number = start_checkpoint_sequence_number;
440 info!("Starting checkpoint tailing from sequence number: {checkpoint_sequence_number}");
441
442 while kill.try_recv().is_err() {
443 if let Some(checkpoint_summary) = store
444 .get_checkpoint_by_sequence_number(checkpoint_sequence_number)
445 .map_err(|_| anyhow!("Failed to read checkpoint summary from store"))?
446 {
447 if let Some(checkpoint_contents) = store
448 .get_full_checkpoint_contents(&checkpoint_summary.content_digest)
449 .map_err(|_| anyhow!("Failed to read checkpoint content from store"))?
450 {
451 checkpoint_writer
452 .write(checkpoint_contents, checkpoint_summary.into_inner())?;
453 checkpoint_sequence_number = checkpoint_sequence_number
454 .checked_add(1)
455 .context("checkpoint seq number overflow")?;
456 continue;
458 }
459 }
460 sleep(Duration::from_secs(3));
463 }
464 Ok(())
465 }
466
467 async fn start_syncing_with_remote(
470 remote_object_store: Arc<DynObjectStore>,
471 local_object_store: Arc<DynObjectStore>,
472 local_staging_root_dir: PathBuf,
473 mut update_receiver: Receiver<CheckpointUpdates>,
474 mut kill: tokio::sync::broadcast::Receiver<()>,
475 metrics: Arc<ArchiveMetrics>,
476 ) -> Result<()> {
477 loop {
478 tokio::select! {
479 _ = kill.recv() => break,
480 updates = update_receiver.recv() => {
481 if let Some(checkpoint_updates) = updates {
482 info!("Received checkpoint update: {:?}", checkpoint_updates);
483 let latest_checkpoint_seq_num = checkpoint_updates.manifest.next_checkpoint_seq_num();
484 let summary_file_path = checkpoint_updates.summary_file_path();
485 Self::sync_file_to_remote(
486 local_staging_root_dir.clone(),
487 summary_file_path,
488 local_object_store.clone(),
489 remote_object_store.clone()
490 )
491 .await
492 .expect("Syncing checkpoint summary should not fail");
493
494 let content_file_path = checkpoint_updates.content_file_path();
495 Self::sync_file_to_remote(
496 local_staging_root_dir.clone(),
497 content_file_path,
498 local_object_store.clone(),
499 remote_object_store.clone()
500 )
501 .await
502 .expect("Syncing checkpoint content should not fail");
503
504 write_manifest(
505 checkpoint_updates.manifest,
506 remote_object_store.clone()
507 )
508 .await
509 .expect("Updating manifest should not fail");
510 metrics.latest_checkpoint_archived.set(latest_checkpoint_seq_num as i64)
511 } else {
512 info!("Terminating archive sync loop");
513 break;
514 }
515 },
516 }
517 }
518 Ok(())
519 }
520
521 async fn sync_file_to_remote(
523 dir: PathBuf,
524 path: object_store::path::Path,
525 from: Arc<DynObjectStore>,
526 to: Arc<DynObjectStore>,
527 ) -> Result<()> {
528 debug!("Syncing archive file to remote: {:?}", path);
529 copy_file(&path, &path, &from, &to).await?;
530 fs::remove_file(path_to_filesystem(dir, &path)?)?;
531 Ok(())
532 }
533}