iota_storage/
http_key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12    base_types::{ObjectID, SequenceNumber, VersionNumber},
13    digests::{CheckpointDigest, TransactionDigest},
14    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15    error::{IotaError, IotaResult},
16    messages_checkpoint::{
17        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18    },
19    object::Object,
20    storage::ObjectKey,
21    transaction::Transaction,
22};
23use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
24use reqwest::{
25    Client, Url,
26    header::{CONTENT_LENGTH, HeaderValue},
27};
28use serde::{Deserialize, Serialize};
29use tap::TapFallible;
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33    key_value_store::{
34        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
35    },
36    key_value_store_metrics::KeyValueStoreMetrics,
37};
38
39pub struct HttpKVStore {
40    base_url: Url,
41    client: Client,
42    cache: MokaCache<Url, Bytes>,
43    metrics: Arc<KeyValueStoreMetrics>,
44}
45
46pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
47    base64_url::encode(digest)
48}
49
50// for non-digest keys, we need a tag to make sure we don't have collisions
51#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52pub enum TaggedKey {
53    CheckpointSequenceNumber(CheckpointSequenceNumber),
54}
55
56pub fn encoded_tagged_key(key: &TaggedKey) -> String {
57    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
58    base64_url::encode(&bytes)
59}
60
61pub fn encode_object_key(object_id: &ObjectID, version: &VersionNumber) -> String {
62    let bytes =
63        bcs::to_bytes(&ObjectKey(*object_id, *version)).expect("failed to serialize object key");
64    base64_url::encode(&bytes)
65}
66
67trait IntoIotaResult<T> {
68    fn into_iota_result(self) -> IotaResult<T>;
69}
70
71impl<T, E> IntoIotaResult<T> for Result<T, E>
72where
73    E: std::error::Error,
74{
75    fn into_iota_result(self) -> IotaResult<T> {
76        self.map_err(|e| IotaError::Storage(e.to_string()))
77    }
78}
79
80/// Represents the supported items the REST API accepts when fetching the data
81/// based on Digest or Sequence number.
82#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, strum::EnumString, strum::Display)]
83pub enum ItemType {
84    #[strum(serialize = "tx")]
85    #[serde(rename = "tx")]
86    Transaction,
87    #[strum(serialize = "fx")]
88    #[serde(rename = "fx")]
89    TransactionEffects,
90    #[strum(serialize = "cc")]
91    #[serde(rename = "cc")]
92    CheckpointContents,
93    #[strum(serialize = "cs")]
94    #[serde(rename = "cs")]
95    CheckpointSummary,
96    #[strum(serialize = "tx2c")]
97    #[serde(rename = "tx2c")]
98    TransactionToCheckpoint,
99    #[strum(serialize = "ob")]
100    #[serde(rename = "ob")]
101    Object,
102    #[strum(serialize = "evtx")]
103    #[serde(rename = "evtx")]
104    EventTransactionDigest,
105}
106
107#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub enum Key {
109    Transaction(TransactionDigest),
110    TransactionEffects(TransactionDigest),
111    CheckpointContents(CheckpointSequenceNumber),
112    CheckpointSummary(CheckpointSequenceNumber),
113    CheckpointSummaryByDigest(CheckpointDigest),
114    TransactionToCheckpoint(TransactionDigest),
115    ObjectKey(ObjectID, VersionNumber),
116    EventsByTransactionDigest(TransactionDigest),
117}
118
119impl Key {
120    // Create a [`Key`] instance based on the provided item type and
121    /// [`base64_url`] encoded string.
122    ///
123    /// # Example
124    ///
125    /// ```rust
126    /// use std::str::FromStr;
127    ///
128    /// use iota_storage::http_key_value_store::Key;
129    /// use iota_types::digests::TransactionDigest;
130    ///
131    /// let key = Key::new("tx", "7jb54RvJduLj9HdV9L41UJqZ5KWdzYY2rl1eL8AVl9o").unwrap();
132    /// assert_eq!(
133    ///     key,
134    ///     Key::Transaction(
135    ///         TransactionDigest::from_str("H2tetNL3CfroDF3iJNA7wFo6oRQiJedGTeykZi6HAGqP").unwrap()
136    ///     )
137    /// );
138    /// ```
139    pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
140        let item_type =
141            ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
142        let decoded_key = base64_url::decode(encoded_key)
143            .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
144
145        match item_type {
146            ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
147                decoded_key.as_slice(),
148            )?)),
149            ItemType::TransactionEffects => Ok(Key::TransactionEffects(
150                TransactionDigest::try_from(decoded_key.as_slice())?,
151            )),
152            ItemType::CheckpointContents => {
153                let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
154                    anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
155                })?;
156                match tagged_key {
157                    TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
158                }
159            }
160            ItemType::CheckpointSummary => {
161                // first try to decode as digest, otherwise try to decode as tagged key
162                match CheckpointDigest::try_from(decoded_key.clone()) {
163                    Err(_) => {
164                        let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
165                            anyhow::anyhow!(
166                                "failed to deserialize checkpoint sequence number: {err}"
167                            )
168                        })?;
169                        match tagged_key {
170                            TaggedKey::CheckpointSequenceNumber(seq) => {
171                                Ok(Key::CheckpointSummary(seq))
172                            }
173                        }
174                    }
175                    Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
176                }
177            }
178            ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
179                TransactionDigest::try_from(decoded_key.as_slice())?,
180            )),
181            ItemType::Object => {
182                let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
183                    .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
184
185                Ok(Key::ObjectKey(object_key.0, object_key.1))
186            }
187            ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
188                TransactionDigest::try_from(decoded_key.as_slice())?,
189            )),
190        }
191    }
192
193    /// Get the REST API resource type.
194    ///
195    /// This method returns the corresponding resource type string
196    /// for a given `Key` variant.
197    ///
198    /// This is used to construct the REST API route,
199    /// typically in the format `/{item_type}/{digest}`.
200    ///
201    /// # Example
202    /// ```rust
203    /// use iota_storage::http_key_value_store::{ItemType, Key};
204    /// use iota_types::digests::TransactionDigest;
205    ///
206    /// let item_type = Key::CheckpointContents(1).item_type();
207    /// assert_eq!(item_type, ItemType::CheckpointContents);
208    /// let item_type = Key::Transaction(TransactionDigest::random()).item_type();
209    /// assert_eq!(item_type, ItemType::Transaction);
210    /// ```
211    pub fn item_type(&self) -> ItemType {
212        match self {
213            Key::Transaction(_) => ItemType::Transaction,
214            Key::TransactionEffects(_) => ItemType::TransactionEffects,
215            Key::CheckpointContents(_) => ItemType::CheckpointContents,
216            Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
217                ItemType::CheckpointSummary
218            }
219            Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
220            Key::ObjectKey(_, _) => ItemType::Object,
221            Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
222        }
223    }
224
225    /// Returns a tuple containing the resource type and the encoded key.
226    ///
227    /// This is used to construct the REST API route, typically in the format
228    /// `/:item_type/:digest`.
229    ///
230    /// # Examples
231    ///
232    /// ```rust
233    /// use iota_storage::http_key_value_store::{
234    ///     ItemType, Key, TaggedKey, encode_digest, encode_object_key, encoded_tagged_key,
235    /// };
236    /// use iota_types::digests::TransactionDigest;
237    ///
238    /// let tx_digest = TransactionDigest::random();
239    /// // encode the tx_digest as base64 url
240    /// let expected_encoded_digest = encode_digest(&tx_digest);
241    /// let key = Key::Transaction(tx_digest);
242    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
243    /// assert_eq!(resource_type, ItemType::Transaction);
244    /// assert_eq!(encoded_key_digest, expected_encoded_digest);
245    ///
246    /// let chk_seq_num = 123;
247    /// let key = Key::CheckpointSummary(chk_seq_num);
248    /// // encode the checkpoint sequence number as base64 url
249    /// let expected_encoded_seq_num =
250    ///     encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(chk_seq_num));
251    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
252    /// assert_eq!(resource_type, ItemType::CheckpointSummary);
253    /// assert_eq!(encoded_key_digest, expected_encoded_seq_num);
254    /// ```
255    pub fn to_path_elements(&self) -> (ItemType, String) {
256        let encoded_key_digest = match self {
257            Key::Transaction(digest) => encode_digest(digest),
258            Key::TransactionEffects(digest) => encode_digest(digest),
259            Key::CheckpointContents(seq) => {
260                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
261            }
262            Key::CheckpointSummary(seq) => {
263                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
264            }
265            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
266            Key::TransactionToCheckpoint(digest) => encode_digest(digest),
267            Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
268            Key::EventsByTransactionDigest(digest) => encode_digest(digest),
269        };
270
271        (self.item_type(), encoded_key_digest)
272    }
273}
274
275#[derive(Clone, Debug)]
276enum Value {
277    Tx(Box<Transaction>),
278    Fx(Box<TransactionEffects>),
279    Events(Box<TransactionEvents>),
280    CheckpointContents(Box<CheckpointContents>),
281    CheckpointSummary(Box<CertifiedCheckpointSummary>),
282    TxToCheckpoint(CheckpointSequenceNumber),
283}
284
285impl HttpKVStore {
286    pub fn new_kv(
287        base_url: &str,
288        cache_size: u64,
289        metrics: Arc<KeyValueStoreMetrics>,
290    ) -> IotaResult<TransactionKeyValueStore> {
291        let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
292        Ok(TransactionKeyValueStore::new("http", metrics, inner))
293    }
294
295    pub fn new(
296        base_url: &str,
297        cache_size: u64,
298        metrics: Arc<KeyValueStoreMetrics>,
299    ) -> IotaResult<Self> {
300        info!("creating HttpKVStore with base_url: {}", base_url);
301
302        let client = Client::builder().http2_prior_knowledge().build().unwrap();
303
304        let base_url = if base_url.ends_with('/') {
305            base_url.to_string()
306        } else {
307            format!("{}/", base_url)
308        };
309
310        let base_url = Url::parse(&base_url).into_iota_result()?;
311
312        let cache = MokaCacheBuilder::new(cache_size)
313            .time_to_idle(Duration::from_secs(600))
314            .build();
315
316        Ok(Self {
317            base_url,
318            client,
319            cache,
320            metrics,
321        })
322    }
323
324    fn get_url(&self, key: &Key) -> IotaResult<Url> {
325        let (item_type, digest) = key.to_path_elements();
326        let joined = self
327            .base_url
328            .join(&format!("{item_type}/{digest}"))
329            .into_iota_result()?;
330        Url::from_str(joined.as_str()).into_iota_result()
331    }
332
333    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
334        let uris_vec = uris.to_vec();
335        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
336        fetches.buffered(uris.len()).collect::<Vec<_>>().await
337    }
338
339    async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
340        let url = self.get_url(&key)?;
341
342        trace!("fetching url: {}", url);
343
344        if let Some(res) = self.cache.get(&url) {
345            trace!("found cached data for url: {}, len: {:?}", url, res.len());
346            self.metrics
347                .key_value_store_num_fetches_success
348                .with_label_values(&["http_cache", "url"])
349                .inc();
350            return Ok(Some(res));
351        }
352
353        self.metrics
354            .key_value_store_num_fetches_not_found
355            .with_label_values(&["http_cache", "url"])
356            .inc();
357
358        let resp = self
359            .client
360            .get(url.clone())
361            .send()
362            .await
363            .into_iota_result()?;
364        trace!(
365            "got response {} for url: {}, len: {:?}",
366            url,
367            resp.status(),
368            resp.headers()
369                .get(CONTENT_LENGTH)
370                .unwrap_or(&HeaderValue::from_static("0"))
371        );
372        // return None if 400
373        if resp.status().is_success() {
374            let bytes = resp.bytes().await.into_iota_result()?;
375            self.cache.insert(url, bytes.clone());
376
377            Ok(Some(bytes))
378        } else {
379            Ok(None)
380        }
381    }
382}
383
384fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
385where
386    K: std::fmt::Debug,
387    T: for<'de> Deserialize<'de>,
388{
389    bcs::from_bytes(bytes)
390        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
391        .ok()
392}
393
394fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
395where
396    K: std::fmt::Debug,
397{
398    let (fetch, key) = fetch;
399    match fetch {
400        Ok(Some(bytes)) => Some((bytes, key)),
401        Ok(None) => None,
402        Err(err) => {
403            warn!("Error fetching key: {:?}, error: {:?}", key, err);
404            None
405        }
406    }
407}
408
409fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
410    let mut start = 0;
411    lengths
412        .iter()
413        .map(|length| {
414            let end = start + length;
415            let result = &slice[start..end];
416            start = end;
417            result
418        })
419        .collect()
420}
421
422fn deser_check_digest<T, D>(
423    digest: &D,
424    bytes: &Bytes,
425    get_expected_digest: impl FnOnce(&T) -> D,
426) -> Option<T>
427where
428    D: std::fmt::Debug + PartialEq,
429    T: for<'de> Deserialize<'de>,
430{
431    deser(digest, bytes).and_then(|o: T| {
432        let expected_digest = get_expected_digest(&o);
433        if expected_digest == *digest {
434            Some(o)
435        } else {
436            error!(
437                "Digest mismatch - expected: {:?}, got: {:?}",
438                digest, expected_digest,
439            );
440            None
441        }
442    })
443}
444
445#[async_trait]
446impl TransactionKeyValueStoreTrait for HttpKVStore {
447    #[instrument(level = "trace", skip_all)]
448    async fn multi_get(
449        &self,
450        transaction_keys: &[TransactionDigest],
451        effects_keys: &[TransactionDigest],
452    ) -> IotaResult<KVStoreTransactionData> {
453        let num_txns = transaction_keys.len();
454        let num_effects = effects_keys.len();
455
456        let keys = transaction_keys
457            .iter()
458            .map(|tx| Key::Transaction(*tx))
459            .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
460            .collect::<Vec<_>>();
461
462        let fetches = self.multi_fetch(keys).await;
463        let txn_slice = fetches[..num_txns].to_vec();
464        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
465
466        let txn_results = txn_slice
467            .iter()
468            .take(num_txns)
469            .zip(transaction_keys.iter())
470            .map(map_fetch)
471            .map(|maybe_bytes| {
472                maybe_bytes.and_then(|(bytes, digest)| {
473                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
474                })
475            })
476            .collect::<Vec<_>>();
477
478        let fx_results = fx_slice
479            .iter()
480            .take(num_effects)
481            .zip(effects_keys.iter())
482            .map(map_fetch)
483            .map(|maybe_bytes| {
484                maybe_bytes.and_then(|(bytes, digest)| {
485                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
486                        *fx.transaction_digest()
487                    })
488                })
489            })
490            .collect::<Vec<_>>();
491
492        Ok((txn_results, fx_results))
493    }
494
495    #[instrument(level = "trace", skip_all)]
496    async fn multi_get_checkpoints(
497        &self,
498        checkpoint_summaries: &[CheckpointSequenceNumber],
499        checkpoint_contents: &[CheckpointSequenceNumber],
500        checkpoint_summaries_by_digest: &[CheckpointDigest],
501    ) -> IotaResult<(
502        Vec<Option<CertifiedCheckpointSummary>>,
503        Vec<Option<CheckpointContents>>,
504        Vec<Option<CertifiedCheckpointSummary>>,
505    )> {
506        let keys = checkpoint_summaries
507            .iter()
508            .map(|cp| Key::CheckpointSummary(*cp))
509            .chain(
510                checkpoint_contents
511                    .iter()
512                    .map(|cp| Key::CheckpointContents(*cp)),
513            )
514            .chain(
515                checkpoint_summaries_by_digest
516                    .iter()
517                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
518            )
519            .collect::<Vec<_>>();
520
521        let summaries_len = checkpoint_summaries.len();
522        let contents_len = checkpoint_contents.len();
523        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
524
525        let fetches = self.multi_fetch(keys).await;
526
527        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
528
529        let result_slices = multi_split_slice(&fetches, &input_slices);
530
531        let summaries_results = result_slices[0]
532            .iter()
533            .zip(checkpoint_summaries.iter())
534            .map(map_fetch)
535            .map(|maybe_bytes| {
536                maybe_bytes
537                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
538            })
539            .collect::<Vec<_>>();
540
541        let contents_results = result_slices[1]
542            .iter()
543            .zip(checkpoint_contents.iter())
544            .map(map_fetch)
545            .map(|maybe_bytes| {
546                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
547            })
548            .collect::<Vec<_>>();
549
550        let summaries_by_digest_results = result_slices[2]
551            .iter()
552            .zip(checkpoint_summaries_by_digest.iter())
553            .map(map_fetch)
554            .map(|maybe_bytes| {
555                maybe_bytes.and_then(|(bytes, digest)| {
556                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
557                })
558            })
559            .collect::<Vec<_>>();
560
561        Ok((
562            summaries_results,
563            contents_results,
564            summaries_by_digest_results,
565        ))
566    }
567
568    #[instrument(level = "trace", skip_all)]
569    async fn get_transaction_perpetual_checkpoint(
570        &self,
571        digest: TransactionDigest,
572    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
573        let key = Key::TransactionToCheckpoint(digest);
574        self.fetch(key).await.map(|maybe| {
575            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
576        })
577    }
578
579    #[instrument(level = "trace", skip_all)]
580    async fn get_object(
581        &self,
582        object_id: ObjectID,
583        version: SequenceNumber,
584    ) -> IotaResult<Option<Object>> {
585        let key = Key::ObjectKey(object_id, version);
586        self.fetch(key)
587            .await
588            .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
589    }
590
591    #[instrument(level = "trace", skip_all)]
592    async fn multi_get_transactions_perpetual_checkpoints(
593        &self,
594        digests: &[TransactionDigest],
595    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
596        let keys = digests
597            .iter()
598            .map(|digest| Key::TransactionToCheckpoint(*digest))
599            .collect::<Vec<_>>();
600
601        let fetches = self.multi_fetch(keys).await;
602
603        let results = fetches
604            .iter()
605            .zip(digests.iter())
606            .map(map_fetch)
607            .map(|maybe_bytes| {
608                maybe_bytes
609                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
610            })
611            .collect::<Vec<_>>();
612
613        Ok(results)
614    }
615
616    #[instrument(level = "trace", skip_all)]
617    async fn multi_get_events_by_tx_digests(
618        &self,
619        digests: &[TransactionDigest],
620    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
621        let keys = digests
622            .iter()
623            .map(|digest| Key::EventsByTransactionDigest(*digest))
624            .collect::<Vec<_>>();
625        Ok(self
626            .multi_fetch(keys)
627            .await
628            .iter()
629            .zip(digests.iter())
630            .map(map_fetch)
631            .map(|maybe_bytes| {
632                maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
633            })
634            .collect::<Vec<_>>())
635    }
636}