iota_rest_kv/
kv_store_client.rs1use std::{
7 sync::Arc,
8 time::{Duration, Instant},
9};
10
11use anyhow::Result;
12use aws_config::{BehaviorVersion, Region, timeout::TimeoutConfig};
13use aws_sdk_dynamodb::{Client, config::Credentials, primitives::Blob, types::AttributeValue};
14use bytes::Bytes;
15use iota_config::object_storage_config::ObjectStoreConfig;
16use iota_storage::http_key_value_store::{Key, TaggedKey};
17use iota_types::storage::ObjectKey;
18use object_store::{DynObjectStore, path::Path};
19use serde::{Deserialize, Serialize};
20use tokio::sync::RwLock;
21
22const OPERATION_TIMEOUT_SECS: Duration = Duration::from_secs(3);
23const OPERATION_ATTEMPT_TIMEOUT_SECS: Duration = Duration::from_secs(10);
24const CONNECT_TIMEOUT_SECS: Duration = Duration::from_secs(3);
25const AWS_STATUS_CACHE_TTL: Duration = Duration::from_secs(5);
26
27#[derive(Serialize, Deserialize, Clone, Debug)]
44#[serde(rename_all = "kebab-case")]
45pub struct KvStoreConfig {
46 pub object_store_config: ObjectStoreConfig,
47 pub dynamo_db_config: DynamoDbConfig,
48}
49
50#[derive(Serialize, Deserialize, Clone, Debug)]
56#[serde(rename_all = "kebab-case")]
57pub struct DynamoDbConfig {
58 pub aws_access_key_id: String,
59 pub aws_secret_access_key: String,
60 pub aws_endpoint: Option<String>,
62 pub aws_region: String,
63 pub table_name: String,
64}
65
66#[derive(Debug, Serialize, Clone)]
68pub struct AwsStatus {
69 pub dynamodb: ServiceStatus,
70 pub s3: ServiceStatus,
71}
72
73#[derive(Debug, Serialize, Clone)]
79pub struct ServiceStatus {
80 healthy: bool,
83 latency_ms: u64,
86}
87
88#[derive(Debug)]
95pub struct CachedAwsStatus {
96 status: AwsStatus,
98 cached_at: Instant,
101}
102
103#[derive(Debug, Clone)]
120pub struct KvStoreClient {
121 dynamo_db_client: Client,
123 remote_store: Arc<DynObjectStore>,
125 table_name: String,
127 start_time: Instant,
129 cached_status: Arc<RwLock<Option<CachedAwsStatus>>>,
131 cache_duration: Duration,
133}
134
135impl KvStoreClient {
136 pub async fn new(config: KvStoreConfig) -> Result<Self> {
141 let dynamodb_config = config.dynamo_db_config;
142
143 let credentials = Credentials::new(
144 &dynamodb_config.aws_access_key_id,
145 &dynamodb_config.aws_secret_access_key,
146 None,
147 None,
148 "dynamodb",
149 );
150 let timeout_config = TimeoutConfig::builder()
151 .operation_timeout(OPERATION_TIMEOUT_SECS)
152 .operation_attempt_timeout(OPERATION_ATTEMPT_TIMEOUT_SECS)
153 .connect_timeout(CONNECT_TIMEOUT_SECS)
154 .build();
155 let mut aws_config_loader = aws_config::defaults(BehaviorVersion::latest())
156 .credentials_provider(credentials)
157 .region(Region::new(dynamodb_config.aws_region))
158 .timeout_config(timeout_config);
159
160 if let Some(url) = dynamodb_config.aws_endpoint {
161 aws_config_loader = aws_config_loader.endpoint_url(url);
162 }
163 let aws_config = aws_config_loader.load().await;
164 let dynamo_db_client = Client::new(&aws_config);
165 let remote_store = config.object_store_config.make()?;
166
167 Ok(Self {
168 dynamo_db_client,
169 remote_store,
170 table_name: dynamodb_config.table_name,
171 start_time: Instant::now(),
172 cache_duration: AWS_STATUS_CACHE_TTL,
173 cached_status: Arc::new(RwLock::new(None)),
174 })
175 }
176
177 pub fn get_uptime(&self) -> Duration {
179 self.start_time.elapsed()
180 }
181
182 async fn check_dynamodb_health(&self) -> ServiceStatus {
183 let start = Instant::now();
184
185 let healthy = self
186 .dynamo_db_client
187 .describe_table()
188 .table_name(&self.table_name)
189 .send()
190 .await
191 .inspect_err(|err| tracing::error!("failed describing dynamodb table: {err}"))
192 .is_ok();
193
194 ServiceStatus {
195 healthy,
196 latency_ms: start.elapsed().as_millis() as u64,
197 }
198 }
199
200 async fn check_s3_health(&self) -> ServiceStatus {
201 let start = Instant::now();
202
203 let test_path = Path::from("health-check-test");
205
206 let healthy = match self.remote_store.head(&test_path).await {
207 Ok(_) => true,
208 Err(object_store::Error::NotFound { .. }) => true, Err(err) => {
210 tracing::error!("failed checking file metadata on S3: {err}");
211 false
212 }
213 };
214
215 ServiceStatus {
216 healthy,
217 latency_ms: start.elapsed().as_millis() as u64,
218 }
219 }
220
221 async fn check_aws_health(&self) -> AwsStatus {
222 AwsStatus {
223 dynamodb: self.check_dynamodb_health().await,
224 s3: self.check_s3_health().await,
225 }
226 }
227
228 pub async fn get_aws_health(&self) -> AwsStatus {
230 let should_refresh = {
232 let cached = self.cached_status.read().await;
233 cached.is_none()
234 || cached
235 .as_ref()
236 .map(|a| a.cached_at.elapsed() > self.cache_duration)
237 .unwrap_or(true)
238 };
239
240 if should_refresh {
241 let new_status = self.check_aws_health().await;
242 let mut cached = self.cached_status.write().await;
244 *cached = Some(CachedAwsStatus {
245 status: new_status.clone(),
246 cached_at: Instant::now(),
247 });
248
249 return new_status;
250 }
251
252 if let Some(cached) = self.cached_status.read().await.as_ref() {
254 cached.status.clone()
255 } else {
256 self.check_aws_health().await
258 }
259 }
260
261 async fn get_from_dynamodb<T: AsRef<[u8]>>(
263 &self,
264 digest: T,
265 item_type: String,
266 ) -> Result<Option<Bytes>> {
267 let result = self
268 .dynamo_db_client
269 .get_item()
270 .table_name(&self.table_name)
271 .key("digest", AttributeValue::B(Blob::new(digest.as_ref())))
272 .key("type", AttributeValue::S(item_type))
273 .send()
274 .await?;
275
276 if let Some(item) = result.item {
277 if let Some(AttributeValue::B(blob)) = item.get("bcs") {
278 return Ok(Some(Bytes::copy_from_slice(blob.as_ref())));
279 }
280 }
281
282 Ok(None)
283 }
284
285 async fn get_from_remote_store<T: AsRef<[u8]>>(&self, digest: &T) -> Result<Option<Bytes>> {
287 let path = Path::from(base64_url::encode(digest));
288
289 match self.remote_store.get(&path).await {
291 Ok(response) => {
292 let data = response.bytes().await.map_err(|err| {
294 anyhow::anyhow!("Failed to read data from remote store: {err}")
295 })?;
296
297 Ok(Some(data))
298 }
299 Err(err) => {
300 match err {
301 object_store::Error::NotFound { .. } => Ok(None),
303 _ => Err(anyhow::anyhow!("remote store error: {err}")),
304 }
305 }
306 }
307 }
308
309 pub async fn get(&self, key: Key) -> Result<Option<Bytes>> {
314 let item_type = key.item_type().to_string();
315
316 match key {
317 Key::Transaction(transaction_digest) => {
318 self.get_from_dynamodb(transaction_digest, item_type).await
319 }
320 Key::TransactionEffects(transaction_digest) => {
321 self.get_from_dynamodb(transaction_digest, item_type).await
322 }
323 Key::CheckpointContents(chk_seq_num) => {
324 let serialized_checkpoint_number =
325 bcs::to_bytes(&TaggedKey::CheckpointSequenceNumber(chk_seq_num))?;
326 let data = self
327 .get_from_dynamodb(&serialized_checkpoint_number, item_type)
328 .await?;
329 if data.is_none() {
330 tracing::info!(
331 "checkpoint contents with sequence number {chk_seq_num} not found in DynamoDB, attempting fetch from remote store",
332 );
333 return self
334 .get_from_remote_store(&serialized_checkpoint_number)
335 .await;
336 }
337 Ok(data)
338 }
339 Key::CheckpointSummary(chk_seq_num) => {
340 let serialized_checkpoint_number =
341 bcs::to_bytes(&TaggedKey::CheckpointSequenceNumber(chk_seq_num))?;
342
343 self.get_from_dynamodb(serialized_checkpoint_number, item_type)
344 .await
345 }
346 Key::CheckpointSummaryByDigest(checkpoint_digest) => {
347 self.get_from_dynamodb(checkpoint_digest, item_type).await
348 }
349 Key::TransactionToCheckpoint(transaction_digest) => {
350 self.get_from_dynamodb(transaction_digest, item_type).await
351 }
352 Key::ObjectKey(object_id, sequence_number) => {
353 let object_key = ObjectKey(object_id, sequence_number);
354 let serialized_object_key = bcs::to_bytes(&object_key)?;
355 self.get_from_dynamodb(serialized_object_key, item_type)
356 .await
357 }
358 Key::EventsByTransactionDigest(transaction_digest) => {
359 self.get_from_dynamodb(transaction_digest, item_type).await
360 }
361 }
362 }
363}