iota_data_ingestion/workers/
historical.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{io::Cursor, ops::Range, sync::Arc};
6
7use async_trait::async_trait;
8use byteorder::{BigEndian, ByteOrder};
9use bytes::Bytes;
10use iota_config::object_storage_config::ObjectStoreConfig;
11use iota_data_ingestion_core::{
12    Reducer,
13    history::{
14        CHECKPOINT_FILE_MAGIC, MAGIC_BYTES,
15        manifest::{
16            Manifest, create_file_metadata_from_bytes, finalize_manifest, read_manifest_from_bytes,
17        },
18    },
19};
20use iota_storage::{
21    FileCompression, StorageFormat,
22    blob::{Blob, BlobEncoding},
23    compress,
24};
25use iota_types::{
26    full_checkpoint_content::CheckpointData, messages_checkpoint::CheckpointSequenceNumber,
27};
28use object_store::{DynObjectStore, Error as ObjectStoreError, ObjectStore};
29use serde::{Deserialize, Serialize};
30
31use crate::RelayWorker;
32
33#[derive(Serialize, Deserialize, Clone, Debug)]
34#[serde(rename_all = "kebab-case")]
35pub struct HistoricalWriterConfig {
36    pub object_store_config: ObjectStoreConfig,
37    pub commit_duration_seconds: u64,
38}
39
40pub struct HistoricalReducer {
41    remote_store: Arc<DynObjectStore>,
42    commit_duration_ms: u64,
43}
44
45impl HistoricalReducer {
46    pub async fn new(config: HistoricalWriterConfig) -> anyhow::Result<Self> {
47        let remote_store = config.object_store_config.make()?;
48
49        Ok(Self {
50            remote_store,
51            commit_duration_ms: config.commit_duration_seconds * 1000,
52        })
53    }
54
55    async fn upload(
56        &self,
57        checkpoint_range: Range<CheckpointSequenceNumber>,
58        data: Bytes,
59    ) -> anyhow::Result<()> {
60        let file_metadata =
61            create_file_metadata_from_bytes(data.clone(), checkpoint_range.clone())?;
62        self.remote_store
63            .put(&file_metadata.file_path(), data.into())
64            .await?;
65        let mut manifest = Self::read_manifest(&self.remote_store).await?;
66        manifest.update(checkpoint_range.end, file_metadata);
67
68        let bytes = finalize_manifest(manifest)?;
69        self.remote_store
70            .put(&Manifest::file_path(), bytes.into())
71            .await?;
72        Ok(())
73    }
74
75    fn prepare_data_to_upload(&self, mut checkpoint_data: Vec<u8>) -> anyhow::Result<Bytes> {
76        let mut buffer = vec![0; MAGIC_BYTES];
77        BigEndian::write_u32(&mut buffer, CHECKPOINT_FILE_MAGIC);
78        buffer.push(StorageFormat::Blob.into());
79        buffer.push(FileCompression::Zstd.into());
80        buffer.append(&mut checkpoint_data);
81        let mut compressed_buffer = vec![];
82        let mut cursor = Cursor::new(buffer);
83        compress(&mut cursor, &mut compressed_buffer)?;
84        Ok(Bytes::from(compressed_buffer))
85    }
86
87    pub async fn get_watermark(&self) -> anyhow::Result<CheckpointSequenceNumber> {
88        let manifest = Self::read_manifest(&self.remote_store).await?;
89        Ok(manifest.next_checkpoint_seq_num())
90    }
91
92    async fn read_manifest(remote_store: &dyn ObjectStore) -> anyhow::Result<Manifest> {
93        Ok(match remote_store.get(&Manifest::file_path()).await {
94            Ok(resp) => read_manifest_from_bytes(resp.bytes().await?.to_vec())?,
95            Err(ObjectStoreError::NotFound { .. }) => Manifest::new(0),
96            Err(err) => Err(err)?,
97        })
98    }
99}
100
101#[async_trait]
102impl Reducer<RelayWorker> for HistoricalReducer {
103    async fn commit(&self, batch: &[Arc<CheckpointData>]) -> Result<(), anyhow::Error> {
104        if batch.is_empty() {
105            anyhow::bail!("commit batch can't be empty");
106        }
107        let mut buffer = vec![];
108        let first_checkpoint = &batch[0];
109        let start_checkpoint = first_checkpoint.checkpoint_summary.sequence_number;
110        let uploaded_range = start_checkpoint..(start_checkpoint + batch.len() as u64);
111        for checkpoint in batch {
112            let data = Blob::encode(&checkpoint, BlobEncoding::Bcs)?;
113            data.write(&mut buffer)?;
114        }
115        self.upload(uploaded_range, self.prepare_data_to_upload(buffer)?)
116            .await
117    }
118
119    fn should_close_batch(
120        &self,
121        batch: &[Arc<CheckpointData>],
122        next_item: Option<&Arc<CheckpointData>>,
123    ) -> bool {
124        // never close a batch without a trigger condition
125        if batch.is_empty() || next_item.is_none() {
126            return false;
127        }
128        let first_checkpoint = &batch[0].checkpoint_summary;
129        let next_checkpoint = next_item.expect("invariant's checked");
130        // close batch after genesis
131        if next_checkpoint.checkpoint_summary.sequence_number == 1 {
132            return true;
133        }
134        next_checkpoint.checkpoint_summary.epoch != first_checkpoint.epoch
135            || next_checkpoint.checkpoint_summary.timestamp_ms
136                > (self.commit_duration_ms + first_checkpoint.timestamp_ms)
137    }
138}