iota_rest_kv/
kv_store_client.rs

1// Copyright (c) 2025 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! This module provides a client for interacting with the key-value store.
5
6use std::{
7    sync::Arc,
8    time::{Duration, Instant},
9};
10
11use anyhow::Result;
12use aws_config::{BehaviorVersion, Region, timeout::TimeoutConfig};
13use aws_sdk_dynamodb::{Client, config::Credentials, primitives::Blob, types::AttributeValue};
14use bytes::Bytes;
15use iota_config::object_storage_config::ObjectStoreConfig;
16use iota_storage::http_key_value_store::{Key, TaggedKey};
17use iota_types::storage::ObjectKey;
18use object_store::{DynObjectStore, path::Path};
19use serde::{Deserialize, Serialize};
20use tokio::sync::RwLock;
21
22const OPERATION_TIMEOUT_SECS: Duration = Duration::from_secs(3);
23const OPERATION_ATTEMPT_TIMEOUT_SECS: Duration = Duration::from_secs(10);
24const CONNECT_TIMEOUT_SECS: Duration = Duration::from_secs(3);
25const AWS_STATUS_CACHE_TTL: Duration = Duration::from_secs(5);
26
27/// Configuration for the [`KvStoreClient`] used to access data from S3 and
28/// DynamoDB.
29///
30/// This configuration combines settings for both object storage (S3) and
31/// DynamoDB, matching the storage locations used by the `KVStoreWorker`
32/// in the `iota-data-ingestion` crate.
33///
34/// The client retrieves data from:
35///
36/// - **S3:** Checkpoint contents.
37/// - **DynamoDB:**
38///   - Transactions
39///   - Effects
40///   - Events
41///   - Objects
42///   - Checkpoint summaries
43#[derive(Serialize, Deserialize, Clone, Debug)]
44#[serde(rename_all = "kebab-case")]
45pub struct KvStoreConfig {
46    pub object_store_config: ObjectStoreConfig,
47    pub dynamo_db_config: DynamoDbConfig,
48}
49
50/// Configuration for DynamoDB connection.
51///
52/// This configuration matches the AWS resources used by the `KVStoreWorker` in
53/// the `iota-data-ingestion` crate, allowing the [`KvStoreClient`] to read the
54/// stored data.
55#[derive(Serialize, Deserialize, Clone, Debug)]
56#[serde(rename_all = "kebab-case")]
57pub struct DynamoDbConfig {
58    pub aws_access_key_id: String,
59    pub aws_secret_access_key: String,
60    /// Useful for local testing eg. (localstack).
61    pub aws_endpoint: Option<String>,
62    pub aws_region: String,
63    pub table_name: String,
64}
65
66/// Status of the AWS components used by the [`KvStoreClient`].
67#[derive(Debug, Serialize, Clone)]
68pub struct AwsStatus {
69    pub dynamodb: ServiceStatus,
70    pub s3: ServiceStatus,
71}
72
73/// Represents the health status of a service.
74///
75/// It captures the current health status (healthy or unhealthy) and
76/// the latency of the service. It's typically used to monitor the status of
77/// external components.
78#[derive(Debug, Serialize, Clone)]
79pub struct ServiceStatus {
80    /// Indicates whether the service is healthy (`true`) or unhealthy
81    /// (`false`).
82    healthy: bool,
83    /// The latency of the service, measured in milliseconds. This represents
84    /// the time it took to check the service's health.
85    latency_ms: u64,
86}
87
88/// Represents a cached status of AWS components.
89///
90/// This struct stores the status of AWS components along with the time it was
91/// cached. It's used to avoid overwhelming the AWS services with frequent
92/// health check requests. The cached status is considered valid for a limited
93/// time (TTL).
94#[derive(Debug)]
95pub struct CachedAwsStatus {
96    /// The status of the AWS components.
97    status: AwsStatus,
98    /// The time at which the status was cached. This is used to determine
99    /// if the cached status is still valid (within the TTL).
100    cached_at: Instant,
101}
102
103/// Provides read access to data ingested by the `iota-data-ingestion`
104/// crate's `KVStoreWorker`.
105///
106/// It retrieves data from two storage backends:
107///
108/// - **S3:** Used for checkpoint contents.
109/// - **DynamoDB:** Used for:
110///   - Transactions
111///   - Effects
112///   - Events
113///   - Objects
114///   - Checkpoint summaries
115///
116/// The client implements a read-only interface and supports the HTTP fallback
117/// mechanism used by
118/// [`HttpKVStore`](iota_storage::http_key_value_store::HttpKVStore).
119#[derive(Debug, Clone)]
120pub struct KvStoreClient {
121    /// DynamoDb client.
122    dynamo_db_client: Client,
123    /// S3 compatible bucket client.
124    remote_store: Arc<DynObjectStore>,
125    /// DynamoDb table name.
126    table_name: String,
127    /// The representation of the uptime of the service.
128    start_time: Instant,
129    /// Cached AWS components sttaus.
130    cached_status: Arc<RwLock<Option<CachedAwsStatus>>>,
131    /// The TTL of the [`CachedAwsStatus`].
132    cache_duration: Duration,
133}
134
135impl KvStoreClient {
136    /// Create a new instance of the client.
137    ///
138    /// Internally it instantiates a DynamoDb Client and an S3 compatible bucket
139    /// Client.
140    pub async fn new(config: KvStoreConfig) -> Result<Self> {
141        let dynamodb_config = config.dynamo_db_config;
142
143        let credentials = Credentials::new(
144            &dynamodb_config.aws_access_key_id,
145            &dynamodb_config.aws_secret_access_key,
146            None,
147            None,
148            "dynamodb",
149        );
150        let timeout_config = TimeoutConfig::builder()
151            .operation_timeout(OPERATION_TIMEOUT_SECS)
152            .operation_attempt_timeout(OPERATION_ATTEMPT_TIMEOUT_SECS)
153            .connect_timeout(CONNECT_TIMEOUT_SECS)
154            .build();
155        let mut aws_config_loader = aws_config::defaults(BehaviorVersion::latest())
156            .credentials_provider(credentials)
157            .region(Region::new(dynamodb_config.aws_region))
158            .timeout_config(timeout_config);
159
160        if let Some(url) = dynamodb_config.aws_endpoint {
161            aws_config_loader = aws_config_loader.endpoint_url(url);
162        }
163        let aws_config = aws_config_loader.load().await;
164        let dynamo_db_client = Client::new(&aws_config);
165        let remote_store = config.object_store_config.make()?;
166
167        Ok(Self {
168            dynamo_db_client,
169            remote_store,
170            table_name: dynamodb_config.table_name,
171            start_time: Instant::now(),
172            cache_duration: AWS_STATUS_CACHE_TTL,
173            cached_status: Arc::new(RwLock::new(None)),
174        })
175    }
176
177    /// Get the elapsed time from which the service was instantiated.
178    pub fn get_uptime(&self) -> Duration {
179        self.start_time.elapsed()
180    }
181
182    async fn check_dynamodb_health(&self) -> ServiceStatus {
183        let start = Instant::now();
184
185        let healthy = self
186            .dynamo_db_client
187            .describe_table()
188            .table_name(&self.table_name)
189            .send()
190            .await
191            .inspect_err(|err| tracing::error!("failed describing dynamodb table: {err}"))
192            .is_ok();
193
194        ServiceStatus {
195            healthy,
196            latency_ms: start.elapsed().as_millis() as u64,
197        }
198    }
199
200    async fn check_s3_health(&self) -> ServiceStatus {
201        let start = Instant::now();
202
203        // Just check if we can access the bucket by trying to get a non-existent key
204        let test_path = Path::from("health-check-test");
205
206        let healthy = match self.remote_store.head(&test_path).await {
207            Ok(_) => true,
208            Err(object_store::Error::NotFound { .. }) => true, // Not found is OK
209            Err(err) => {
210                tracing::error!("failed checking file metadata on S3: {err}");
211                false
212            }
213        };
214
215        ServiceStatus {
216            healthy,
217            latency_ms: start.elapsed().as_millis() as u64,
218        }
219    }
220
221    async fn check_aws_health(&self) -> AwsStatus {
222        AwsStatus {
223            dynamodb: self.check_dynamodb_health().await,
224            s3: self.check_s3_health().await,
225        }
226    }
227
228    /// Get AWS service status.
229    pub async fn get_aws_health(&self) -> AwsStatus {
230        // Read lock for checking cache status
231        let should_refresh = {
232            let cached = self.cached_status.read().await;
233            cached.is_none()
234                || cached
235                    .as_ref()
236                    .map(|a| a.cached_at.elapsed() > self.cache_duration)
237                    .unwrap_or(true)
238        };
239
240        if should_refresh {
241            let new_status = self.check_aws_health().await;
242            // Write lock only when updating
243            let mut cached = self.cached_status.write().await;
244            *cached = Some(CachedAwsStatus {
245                status: new_status.clone(),
246                cached_at: Instant::now(),
247            });
248
249            return new_status;
250        }
251
252        // Read lock for getting cached value
253        if let Some(cached) = self.cached_status.read().await.as_ref() {
254            cached.status.clone()
255        } else {
256            // Cache was cleared between our check and here, get fresh status
257            self.check_aws_health().await
258        }
259    }
260
261    /// Get value as [`Bytes`] from DynamoDb.
262    async fn get_from_dynamodb<T: AsRef<[u8]>>(
263        &self,
264        digest: T,
265        item_type: String,
266    ) -> Result<Option<Bytes>> {
267        let result = self
268            .dynamo_db_client
269            .get_item()
270            .table_name(&self.table_name)
271            .key("digest", AttributeValue::B(Blob::new(digest.as_ref())))
272            .key("type", AttributeValue::S(item_type))
273            .send()
274            .await?;
275
276        if let Some(item) = result.item {
277            if let Some(AttributeValue::B(blob)) = item.get("bcs") {
278                return Ok(Some(Bytes::copy_from_slice(blob.as_ref())));
279            }
280        }
281
282        Ok(None)
283    }
284
285    /// Get value as [`Bytes`] from the S3 compatible bucket.
286    async fn get_from_remote_store<T: AsRef<[u8]>>(&self, digest: &T) -> Result<Option<Bytes>> {
287        let path = Path::from(base64_url::encode(digest));
288
289        // Get the object
290        match self.remote_store.get(&path).await {
291            Ok(response) => {
292                // Get bytes from the response
293                let data = response.bytes().await.map_err(|err| {
294                    anyhow::anyhow!("Failed to read data from remote store: {err}")
295                })?;
296
297                Ok(Some(data))
298            }
299            Err(err) => {
300                match err {
301                    // Handle specific object_store errors
302                    object_store::Error::NotFound { .. } => Ok(None),
303                    _ => Err(anyhow::anyhow!("remote store error: {err}")),
304                }
305            }
306        }
307    }
308
309    /// Get value as [`Bytes`] from the kv store.
310    ///
311    /// Based on the provided [`Key`] fetch the data from DynamoDb or S3
312    /// compatible buckets.
313    pub async fn get(&self, key: Key) -> Result<Option<Bytes>> {
314        let item_type = key.item_type().to_string();
315
316        match key {
317            Key::Transaction(transaction_digest) => {
318                self.get_from_dynamodb(transaction_digest, item_type).await
319            }
320            Key::TransactionEffects(transaction_digest) => {
321                self.get_from_dynamodb(transaction_digest, item_type).await
322            }
323            Key::CheckpointContents(chk_seq_num) => {
324                let serialized_checkpoint_number =
325                    bcs::to_bytes(&TaggedKey::CheckpointSequenceNumber(chk_seq_num))?;
326                let data = self
327                    .get_from_dynamodb(&serialized_checkpoint_number, item_type)
328                    .await?;
329                if data.is_none() {
330                    tracing::info!(
331                        "checkpoint contents with sequence number {chk_seq_num} not found in DynamoDB, attempting fetch from remote store",
332                    );
333                    return self
334                        .get_from_remote_store(&serialized_checkpoint_number)
335                        .await;
336                }
337                Ok(data)
338            }
339            Key::CheckpointSummary(chk_seq_num) => {
340                let serialized_checkpoint_number =
341                    bcs::to_bytes(&TaggedKey::CheckpointSequenceNumber(chk_seq_num))?;
342
343                self.get_from_dynamodb(serialized_checkpoint_number, item_type)
344                    .await
345            }
346            Key::CheckpointSummaryByDigest(checkpoint_digest) => {
347                self.get_from_dynamodb(checkpoint_digest, item_type).await
348            }
349            Key::TransactionToCheckpoint(transaction_digest) => {
350                self.get_from_dynamodb(transaction_digest, item_type).await
351            }
352            Key::ObjectKey(object_id, sequence_number) => {
353                let object_key = ObjectKey(object_id, sequence_number);
354                let serialized_object_key = bcs::to_bytes(&object_key)?;
355                self.get_from_dynamodb(serialized_object_key, item_type)
356                    .await
357            }
358            Key::EventsByTransactionDigest(transaction_digest) => {
359                self.get_from_dynamodb(transaction_digest, item_type).await
360            }
361        }
362    }
363}