iota_data_ingestion/workers/
kv_store.rs1use std::{
6 borrow::Borrow,
7 collections::{HashMap, HashSet, VecDeque},
8 iter::repeat,
9 sync::Arc,
10 time::Duration,
11};
12
13use anyhow::Result;
14use async_trait::async_trait;
15use aws_config::{BehaviorVersion, timeout::TimeoutConfig};
16use aws_sdk_dynamodb::{
17 Client,
18 config::{Credentials, Region},
19 primitives::Blob,
20 types::{AttributeValue, PutRequest, WriteRequest},
21};
22use backoff::{ExponentialBackoff, backoff::Backoff};
23use bytes::Bytes;
24use iota_config::object_storage_config::ObjectStoreConfig;
25use iota_data_ingestion_core::Worker;
26use iota_storage::http_key_value_store::{ItemType, TaggedKey};
27use iota_types::{full_checkpoint_content::CheckpointData, storage::ObjectKey};
28use object_store::{DynObjectStore, path::Path};
29use serde::{Deserialize, Serialize};
30use tracing::{error, info, warn};
31
32#[derive(Serialize, Deserialize, Clone, Debug)]
33#[serde(rename_all = "kebab-case")]
34pub struct KVStoreTaskConfig {
35 pub object_store_config: ObjectStoreConfig,
36 pub dynamodb_config: DynamoDBConfig,
37}
38
39#[derive(Serialize, Deserialize, Clone, Debug)]
40#[serde(rename_all = "kebab-case")]
41pub struct DynamoDBConfig {
42 pub aws_access_key_id: String,
43 pub aws_secret_access_key: String,
44 pub aws_region: String,
45 pub aws_endpoint: Option<String>,
46 pub table_name: String,
47}
48
49#[derive(Clone)]
50pub struct KVStoreWorker {
51 dynamo_client: Client,
52 remote_store: Arc<DynObjectStore>,
53 table_name: String,
54}
55
56impl KVStoreWorker {
57 pub async fn new(config: KVStoreTaskConfig) -> anyhow::Result<Self> {
58 let credentials = Credentials::new(
59 &config.dynamodb_config.aws_access_key_id,
60 &config.dynamodb_config.aws_secret_access_key,
61 None,
62 None,
63 "dynamodb",
64 );
65 let timeout_config = TimeoutConfig::builder()
66 .operation_timeout(Duration::from_secs(3))
67 .operation_attempt_timeout(Duration::from_secs(10))
68 .connect_timeout(Duration::from_secs(3))
69 .build();
70
71 let mut aws_config_loader = aws_config::defaults(BehaviorVersion::latest())
72 .credentials_provider(credentials)
73 .region(Region::new(config.dynamodb_config.aws_region))
74 .timeout_config(timeout_config);
75
76 if let Some(url) = config.dynamodb_config.aws_endpoint {
77 aws_config_loader = aws_config_loader.endpoint_url(url);
78 }
79 let aws_config = aws_config_loader.load().await;
80 Ok(Self {
81 dynamo_client: Client::new(&aws_config),
82 remote_store: config.object_store_config.make()?,
83 table_name: config.dynamodb_config.table_name,
84 })
85 }
86
87 async fn multi_set<V: Serialize>(
88 &self,
89 item_type: ItemType,
90 values: impl IntoIterator<Item = (Vec<u8>, V)> + std::marker::Send,
91 ) -> anyhow::Result<()> {
92 let mut items = vec![];
93 let mut seen = HashSet::new();
94 for (digest, value) in values {
95 if seen.contains(&digest) {
96 continue;
97 }
98 seen.insert(digest.clone());
99 let item = WriteRequest::builder()
100 .set_put_request(Some(
101 PutRequest::builder()
102 .item("digest", AttributeValue::B(Blob::new(digest)))
103 .item("type", AttributeValue::S(item_type.to_string()))
104 .item(
105 "bcs",
106 AttributeValue::B(Blob::new(bcs::to_bytes(value.borrow())?)),
107 )
108 .build()?,
109 ))
110 .build();
111 items.push(item);
112 }
113 if items.is_empty() {
114 return Ok(());
115 }
116 let mut backoff = ExponentialBackoff::default();
117 let mut queue: VecDeque<Vec<_>> = items.chunks(25).map(|ck| ck.to_vec()).collect();
118 while let Some(chunk) = queue.pop_front() {
119 let response = self
120 .dynamo_client
121 .batch_write_item()
122 .set_request_items(Some(HashMap::from([(
123 self.table_name.clone(),
124 chunk.to_vec(),
125 )])))
126 .send()
127 .await
128 .inspect_err(|sdk_err| {
129 error!(
130 "{:?}",
131 sdk_err.as_service_error().map(|e| e.meta().to_string())
132 )
133 })?;
134 if let Some(response) = response.unprocessed_items {
135 if let Some(unprocessed) = response.into_iter().next() {
136 if !unprocessed.1.is_empty() {
137 if queue.is_empty() {
138 if let Some(duration) = backoff.next_backoff() {
139 tokio::time::sleep(duration).await;
140 }
141 }
142 queue.push_back(unprocessed.1);
143 }
144 }
145 }
146 }
147 Ok(())
148 }
149
150 async fn upload_checkpoint_contents<V: Serialize + std::marker::Send>(
157 &self,
158 key: Vec<u8>,
159 value: V,
160 ) -> anyhow::Result<()> {
161 let bcs_bytes = bcs::to_bytes(value.borrow())?;
162
163 let attributes = HashMap::from([
164 (
165 "digest".to_string(),
166 AttributeValue::B(Blob::new(key.clone())),
167 ),
168 (
169 "type".to_string(),
170 AttributeValue::S(ItemType::CheckpointContents.to_string()),
171 ),
172 (
173 "bcs".to_string(),
174 AttributeValue::B(Blob::new(bcs_bytes.clone())),
175 ),
176 ]);
177
178 let res = self
179 .dynamo_client
180 .put_item()
181 .table_name(&self.table_name)
182 .set_item(Some(attributes))
183 .send()
184 .await
185 .inspect_err(|err| warn!("dynamodb error: {err}"));
186
187 if res.is_err() {
188 info!("attempt to store chekpoint contents on S3");
189 let location = Path::from(base64_url::encode(&key));
190 self.remote_store
191 .put(&location, Bytes::from(bcs_bytes).into())
192 .await?;
193 }
194
195 Ok(())
196 }
197}
198
199#[async_trait]
200impl Worker for KVStoreWorker {
201 type Message = ();
202 type Error = anyhow::Error;
203
204 async fn process_checkpoint(
205 &self,
206 checkpoint: Arc<CheckpointData>,
207 ) -> Result<Self::Message, Self::Error> {
208 let mut transactions = vec![];
209 let mut effects = vec![];
210 let mut events = vec![];
211 let mut objects = vec![];
212 let mut transactions_to_checkpoint = vec![];
213 let checkpoint_number = checkpoint.checkpoint_summary.sequence_number;
214
215 for transaction in &checkpoint.transactions {
216 let transaction_digest = transaction.transaction.digest().into_inner().to_vec();
217 effects.push((transaction_digest.clone(), transaction.effects.clone()));
218 transactions_to_checkpoint.push((transaction_digest.clone(), checkpoint_number));
219 transactions.push((transaction_digest.clone(), transaction.transaction.clone()));
220
221 if let Some(tx_events) = &transaction.events {
222 events.push((tx_events.digest().into_inner().to_vec(), tx_events));
223 }
224 for object in &transaction.output_objects {
225 let object_key = ObjectKey(object.id(), object.version());
226 objects.push((bcs::to_bytes(&object_key)?, object));
227 }
228 }
229 self.multi_set(ItemType::Transaction, transactions).await?;
230 self.multi_set(ItemType::TransactionEffects, effects)
231 .await?;
232 self.multi_set(ItemType::EventTransactionDigest, events)
233 .await?;
234 self.multi_set(ItemType::Object, objects).await?;
235 self.multi_set(
236 ItemType::TransactionToCheckpoint,
237 transactions_to_checkpoint,
238 )
239 .await?;
240
241 let serialized_checkpoint_number =
242 bcs::to_bytes(&TaggedKey::CheckpointSequenceNumber(checkpoint_number))?;
243 let checkpoint_summary = &checkpoint.checkpoint_summary;
244
245 self.upload_checkpoint_contents(
246 serialized_checkpoint_number.clone(),
247 checkpoint.checkpoint_contents.clone(),
248 )
249 .await?;
250
251 self.multi_set(
252 ItemType::CheckpointSummary,
253 [
254 serialized_checkpoint_number,
255 checkpoint_summary.digest().into_inner().to_vec(),
256 ]
257 .into_iter()
258 .zip(repeat(checkpoint_summary)),
259 )
260 .await?;
261 Ok(())
262 }
263}