iota_data_ingestion/workers/
kv_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    borrow::Borrow,
7    collections::{HashMap, HashSet, VecDeque},
8    iter::repeat,
9    sync::Arc,
10    time::Duration,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use aws_config::{BehaviorVersion, timeout::TimeoutConfig};
16use aws_sdk_dynamodb::{
17    Client,
18    config::{Credentials, Region},
19    primitives::Blob,
20    types::{AttributeValue, PutRequest, WriteRequest},
21};
22use backoff::{ExponentialBackoff, backoff::Backoff};
23use bytes::Bytes;
24use iota_config::object_storage_config::ObjectStoreConfig;
25use iota_data_ingestion_core::Worker;
26use iota_storage::http_key_value_store::{ItemType, TaggedKey};
27use iota_types::{full_checkpoint_content::CheckpointData, storage::ObjectKey};
28use object_store::{DynObjectStore, path::Path};
29use serde::{Deserialize, Serialize};
30use tracing::{error, info, warn};
31
32#[derive(Serialize, Deserialize, Clone, Debug)]
33#[serde(rename_all = "kebab-case")]
34pub struct KVStoreTaskConfig {
35    pub object_store_config: ObjectStoreConfig,
36    pub dynamodb_config: DynamoDBConfig,
37}
38
39#[derive(Serialize, Deserialize, Clone, Debug)]
40#[serde(rename_all = "kebab-case")]
41pub struct DynamoDBConfig {
42    pub aws_access_key_id: String,
43    pub aws_secret_access_key: String,
44    pub aws_region: String,
45    pub aws_endpoint: Option<String>,
46    pub table_name: String,
47}
48
49#[derive(Clone)]
50pub struct KVStoreWorker {
51    dynamo_client: Client,
52    remote_store: Arc<DynObjectStore>,
53    table_name: String,
54}
55
56impl KVStoreWorker {
57    pub async fn new(config: KVStoreTaskConfig) -> anyhow::Result<Self> {
58        let credentials = Credentials::new(
59            &config.dynamodb_config.aws_access_key_id,
60            &config.dynamodb_config.aws_secret_access_key,
61            None,
62            None,
63            "dynamodb",
64        );
65        let timeout_config = TimeoutConfig::builder()
66            .operation_timeout(Duration::from_secs(3))
67            .operation_attempt_timeout(Duration::from_secs(10))
68            .connect_timeout(Duration::from_secs(3))
69            .build();
70
71        let mut aws_config_loader = aws_config::defaults(BehaviorVersion::latest())
72            .credentials_provider(credentials)
73            .region(Region::new(config.dynamodb_config.aws_region))
74            .timeout_config(timeout_config);
75
76        if let Some(url) = config.dynamodb_config.aws_endpoint {
77            aws_config_loader = aws_config_loader.endpoint_url(url);
78        }
79        let aws_config = aws_config_loader.load().await;
80        Ok(Self {
81            dynamo_client: Client::new(&aws_config),
82            remote_store: config.object_store_config.make()?,
83            table_name: config.dynamodb_config.table_name,
84        })
85    }
86
87    async fn multi_set<V: Serialize>(
88        &self,
89        item_type: ItemType,
90        values: impl IntoIterator<Item = (Vec<u8>, V)> + std::marker::Send,
91    ) -> anyhow::Result<()> {
92        let mut items = vec![];
93        let mut seen = HashSet::new();
94        for (digest, value) in values {
95            if seen.contains(&digest) {
96                continue;
97            }
98            seen.insert(digest.clone());
99            let item = WriteRequest::builder()
100                .set_put_request(Some(
101                    PutRequest::builder()
102                        .item("digest", AttributeValue::B(Blob::new(digest)))
103                        .item("type", AttributeValue::S(item_type.to_string()))
104                        .item(
105                            "bcs",
106                            AttributeValue::B(Blob::new(bcs::to_bytes(value.borrow())?)),
107                        )
108                        .build()?,
109                ))
110                .build();
111            items.push(item);
112        }
113        if items.is_empty() {
114            return Ok(());
115        }
116        let mut backoff = ExponentialBackoff::default();
117        let mut queue: VecDeque<Vec<_>> = items.chunks(25).map(|ck| ck.to_vec()).collect();
118        while let Some(chunk) = queue.pop_front() {
119            let response = self
120                .dynamo_client
121                .batch_write_item()
122                .set_request_items(Some(HashMap::from([(
123                    self.table_name.clone(),
124                    chunk.to_vec(),
125                )])))
126                .send()
127                .await
128                .inspect_err(|sdk_err| {
129                    error!(
130                        "{:?}",
131                        sdk_err.as_service_error().map(|e| e.meta().to_string())
132                    )
133                })?;
134            if let Some(response) = response.unprocessed_items {
135                if let Some(unprocessed) = response.into_iter().next() {
136                    if !unprocessed.1.is_empty() {
137                        if queue.is_empty() {
138                            if let Some(duration) = backoff.next_backoff() {
139                                tokio::time::sleep(duration).await;
140                            }
141                        }
142                        queue.push_back(unprocessed.1);
143                    }
144                }
145            }
146        }
147        Ok(())
148    }
149
150    /// Uploads checkpoint contents to storage, with automatic fallback from
151    /// DynamoDB to S3.
152    ///
153    /// This function attempts to store checkpoint contents in DynamoDB first.
154    /// If that fails (typically due to size constraints), it automatically
155    /// falls back to uploading the contents to S3 instead.
156    async fn upload_checkpoint_contents<V: Serialize + std::marker::Send>(
157        &self,
158        key: Vec<u8>,
159        value: V,
160    ) -> anyhow::Result<()> {
161        let bcs_bytes = bcs::to_bytes(value.borrow())?;
162
163        let attributes = HashMap::from([
164            (
165                "digest".to_string(),
166                AttributeValue::B(Blob::new(key.clone())),
167            ),
168            (
169                "type".to_string(),
170                AttributeValue::S(ItemType::CheckpointContents.to_string()),
171            ),
172            (
173                "bcs".to_string(),
174                AttributeValue::B(Blob::new(bcs_bytes.clone())),
175            ),
176        ]);
177
178        let res = self
179            .dynamo_client
180            .put_item()
181            .table_name(&self.table_name)
182            .set_item(Some(attributes))
183            .send()
184            .await
185            .inspect_err(|err| warn!("dynamodb error: {err}"));
186
187        if res.is_err() {
188            info!("attempt to store chekpoint contents on S3");
189            let location = Path::from(base64_url::encode(&key));
190            self.remote_store
191                .put(&location, Bytes::from(bcs_bytes).into())
192                .await?;
193        }
194
195        Ok(())
196    }
197}
198
199#[async_trait]
200impl Worker for KVStoreWorker {
201    type Message = ();
202    type Error = anyhow::Error;
203
204    async fn process_checkpoint(
205        &self,
206        checkpoint: Arc<CheckpointData>,
207    ) -> Result<Self::Message, Self::Error> {
208        let mut transactions = vec![];
209        let mut effects = vec![];
210        let mut events = vec![];
211        let mut objects = vec![];
212        let mut transactions_to_checkpoint = vec![];
213        let checkpoint_number = checkpoint.checkpoint_summary.sequence_number;
214
215        for transaction in &checkpoint.transactions {
216            let transaction_digest = transaction.transaction.digest().into_inner().to_vec();
217            effects.push((transaction_digest.clone(), transaction.effects.clone()));
218            transactions_to_checkpoint.push((transaction_digest.clone(), checkpoint_number));
219            transactions.push((transaction_digest.clone(), transaction.transaction.clone()));
220
221            if let Some(tx_events) = &transaction.events {
222                events.push((tx_events.digest().into_inner().to_vec(), tx_events));
223            }
224            for object in &transaction.output_objects {
225                let object_key = ObjectKey(object.id(), object.version());
226                objects.push((bcs::to_bytes(&object_key)?, object));
227            }
228        }
229        self.multi_set(ItemType::Transaction, transactions).await?;
230        self.multi_set(ItemType::TransactionEffects, effects)
231            .await?;
232        self.multi_set(ItemType::EventTransactionDigest, events)
233            .await?;
234        self.multi_set(ItemType::Object, objects).await?;
235        self.multi_set(
236            ItemType::TransactionToCheckpoint,
237            transactions_to_checkpoint,
238        )
239        .await?;
240
241        let serialized_checkpoint_number =
242            bcs::to_bytes(&TaggedKey::CheckpointSequenceNumber(checkpoint_number))?;
243        let checkpoint_summary = &checkpoint.checkpoint_summary;
244
245        self.upload_checkpoint_contents(
246            serialized_checkpoint_number.clone(),
247            checkpoint.checkpoint_contents.clone(),
248        )
249        .await?;
250
251        self.multi_set(
252            ItemType::CheckpointSummary,
253            [
254                serialized_checkpoint_number,
255                checkpoint_summary.digest().into_inner().to_vec(),
256            ]
257            .into_iter()
258            .zip(repeat(checkpoint_summary)),
259        )
260        .await?;
261        Ok(())
262    }
263}