iota_storage/
http_key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12    base_types::{ObjectID, SequenceNumber},
13    digests::{CheckpointDigest, TransactionDigest},
14    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15    error::{IotaError, IotaResult},
16    messages_checkpoint::{
17        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18    },
19    object::Object,
20    storage::ObjectKey,
21    transaction::Transaction,
22};
23use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
24use reqwest::{
25    Client, Url,
26    header::{CONTENT_LENGTH, HeaderValue},
27};
28use serde::{Deserialize, Serialize};
29use tap::TapFallible;
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33    key_value_store::{
34        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
35    },
36    key_value_store_metrics::KeyValueStoreMetrics,
37};
38
39pub struct HttpKVStore {
40    base_url: Url,
41    client: Client,
42    cache: MokaCache<Url, Bytes>,
43    metrics: Arc<KeyValueStoreMetrics>,
44}
45
46pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
47    base64_url::encode(digest)
48}
49
50// for non-digest keys, we need a tag to make sure we don't have collisions
51#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52pub enum TaggedKey {
53    CheckpointSequenceNumber(CheckpointSequenceNumber),
54}
55
56pub fn encoded_tagged_key(key: &TaggedKey) -> String {
57    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
58    base64_url::encode(&bytes)
59}
60
61pub fn encode_object_key(object_key: &ObjectKey) -> String {
62    let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
63    base64_url::encode(&bytes)
64}
65
66trait IntoIotaResult<T> {
67    fn into_iota_result(self) -> IotaResult<T>;
68}
69
70impl<T, E> IntoIotaResult<T> for Result<T, E>
71where
72    E: std::error::Error,
73{
74    fn into_iota_result(self) -> IotaResult<T> {
75        self.map_err(|e| IotaError::Storage(e.to_string()))
76    }
77}
78
79/// Represents the supported items the REST API accepts when fetching the data
80/// based on Digest or Sequence number.
81#[derive(
82    Clone,
83    Copy,
84    Debug,
85    PartialEq,
86    Eq,
87    PartialOrd,
88    Ord,
89    Hash,
90    Deserialize,
91    strum::EnumString,
92    strum::Display,
93)]
94pub enum ItemType {
95    #[strum(serialize = "tx")]
96    #[serde(rename = "tx")]
97    Transaction,
98    #[strum(serialize = "fx")]
99    #[serde(rename = "fx")]
100    TransactionEffects,
101    #[strum(serialize = "cc")]
102    #[serde(rename = "cc")]
103    CheckpointContents,
104    #[strum(serialize = "cs")]
105    #[serde(rename = "cs")]
106    CheckpointSummary,
107    #[strum(serialize = "tx2c")]
108    #[serde(rename = "tx2c")]
109    TransactionToCheckpoint,
110    #[strum(serialize = "ob")]
111    #[serde(rename = "ob")]
112    Object,
113    #[strum(serialize = "evtx")]
114    #[serde(rename = "evtx")]
115    EventTransactionDigest,
116}
117
118#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
119pub enum Key {
120    Transaction(TransactionDigest),
121    TransactionEffects(TransactionDigest),
122    CheckpointContents(CheckpointSequenceNumber),
123    CheckpointSummary(CheckpointSequenceNumber),
124    CheckpointSummaryByDigest(CheckpointDigest),
125    TransactionToCheckpoint(TransactionDigest),
126    ObjectKey(ObjectKey),
127    EventsByTransactionDigest(TransactionDigest),
128}
129
130impl Key {
131    // Create a [`Key`] instance based on the provided item type and
132    /// [`base64_url`] encoded string.
133    ///
134    /// # Example
135    ///
136    /// ```rust
137    /// use std::str::FromStr;
138    ///
139    /// use iota_storage::http_key_value_store::Key;
140    /// use iota_types::digests::TransactionDigest;
141    ///
142    /// let key = Key::new("tx", "7jb54RvJduLj9HdV9L41UJqZ5KWdzYY2rl1eL8AVl9o").unwrap();
143    /// assert_eq!(
144    ///     key,
145    ///     Key::Transaction(
146    ///         TransactionDigest::from_str("H2tetNL3CfroDF3iJNA7wFo6oRQiJedGTeykZi6HAGqP").unwrap()
147    ///     )
148    /// );
149    /// ```
150    pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
151        let item_type =
152            ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
153        let decoded_key = base64_url::decode(encoded_key)
154            .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
155
156        match item_type {
157            ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
158                decoded_key.as_slice(),
159            )?)),
160            ItemType::TransactionEffects => Ok(Key::TransactionEffects(
161                TransactionDigest::try_from(decoded_key.as_slice())?,
162            )),
163            ItemType::CheckpointContents => {
164                let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
165                    anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
166                })?;
167                match tagged_key {
168                    TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
169                }
170            }
171            ItemType::CheckpointSummary => {
172                // first try to decode as digest, otherwise try to decode as tagged key
173                match CheckpointDigest::try_from(decoded_key.clone()) {
174                    Err(_) => {
175                        let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
176                            anyhow::anyhow!(
177                                "failed to deserialize checkpoint sequence number: {err}"
178                            )
179                        })?;
180                        match tagged_key {
181                            TaggedKey::CheckpointSequenceNumber(seq) => {
182                                Ok(Key::CheckpointSummary(seq))
183                            }
184                        }
185                    }
186                    Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
187                }
188            }
189            ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
190                TransactionDigest::try_from(decoded_key.as_slice())?,
191            )),
192            ItemType::Object => {
193                let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
194                    .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
195
196                Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
197            }
198            ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
199                TransactionDigest::try_from(decoded_key.as_slice())?,
200            )),
201        }
202    }
203
204    /// Get the REST API resource type.
205    ///
206    /// This method returns the corresponding resource type string
207    /// for a given `Key` variant.
208    ///
209    /// This is used to construct the REST API route,
210    /// typically in the format `/{item_type}/{digest}`.
211    ///
212    /// # Example
213    /// ```rust
214    /// use iota_storage::http_key_value_store::{ItemType, Key};
215    /// use iota_types::digests::TransactionDigest;
216    ///
217    /// let item_type = Key::CheckpointContents(1).item_type();
218    /// assert_eq!(item_type, ItemType::CheckpointContents);
219    /// let item_type = Key::Transaction(TransactionDigest::random()).item_type();
220    /// assert_eq!(item_type, ItemType::Transaction);
221    /// ```
222    pub fn item_type(&self) -> ItemType {
223        match self {
224            Key::Transaction(_) => ItemType::Transaction,
225            Key::TransactionEffects(_) => ItemType::TransactionEffects,
226            Key::CheckpointContents(_) => ItemType::CheckpointContents,
227            Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
228                ItemType::CheckpointSummary
229            }
230            Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
231            Key::ObjectKey(_) => ItemType::Object,
232            Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
233        }
234    }
235
236    /// Returns a tuple containing the resource type and the encoded key.
237    ///
238    /// This is used to construct the REST API route, typically in the format
239    /// `/:item_type/:digest`.
240    ///
241    /// # Examples
242    ///
243    /// ```rust
244    /// use iota_storage::http_key_value_store::{
245    ///     ItemType, Key, TaggedKey, encode_digest, encode_object_key, encoded_tagged_key,
246    /// };
247    /// use iota_types::digests::TransactionDigest;
248    ///
249    /// let tx_digest = TransactionDigest::random();
250    /// // encode the tx_digest as base64 url
251    /// let expected_encoded_digest = encode_digest(&tx_digest);
252    /// let key = Key::Transaction(tx_digest);
253    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
254    /// assert_eq!(resource_type, ItemType::Transaction);
255    /// assert_eq!(encoded_key_digest, expected_encoded_digest);
256    ///
257    /// let chk_seq_num = 123;
258    /// let key = Key::CheckpointSummary(chk_seq_num);
259    /// // encode the checkpoint sequence number as base64 url
260    /// let expected_encoded_seq_num =
261    ///     encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(chk_seq_num));
262    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
263    /// assert_eq!(resource_type, ItemType::CheckpointSummary);
264    /// assert_eq!(encoded_key_digest, expected_encoded_seq_num);
265    /// ```
266    pub fn to_path_elements(&self) -> (ItemType, String) {
267        let encoded_key_digest = match self {
268            Key::Transaction(digest) => encode_digest(digest),
269            Key::TransactionEffects(digest) => encode_digest(digest),
270            Key::CheckpointContents(seq) => {
271                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
272            }
273            Key::CheckpointSummary(seq) => {
274                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
275            }
276            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
277            Key::TransactionToCheckpoint(digest) => encode_digest(digest),
278            Key::ObjectKey(object_key) => encode_object_key(object_key),
279            Key::EventsByTransactionDigest(digest) => encode_digest(digest),
280        };
281
282        (self.item_type(), encoded_key_digest)
283    }
284}
285
286#[derive(Clone, Debug)]
287enum Value {
288    Tx(Box<Transaction>),
289    Fx(Box<TransactionEffects>),
290    Events(Box<TransactionEvents>),
291    CheckpointContents(Box<CheckpointContents>),
292    CheckpointSummary(Box<CertifiedCheckpointSummary>),
293    TxToCheckpoint(CheckpointSequenceNumber),
294}
295
296impl HttpKVStore {
297    pub fn new_kv(
298        base_url: &str,
299        cache_size: u64,
300        metrics: Arc<KeyValueStoreMetrics>,
301    ) -> IotaResult<TransactionKeyValueStore> {
302        let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
303        Ok(TransactionKeyValueStore::new("http", metrics, inner))
304    }
305
306    pub fn new(
307        base_url: &str,
308        cache_size: u64,
309        metrics: Arc<KeyValueStoreMetrics>,
310    ) -> IotaResult<Self> {
311        info!("creating HttpKVStore with base_url: {}", base_url);
312
313        let client = Client::builder().http2_prior_knowledge().build().unwrap();
314
315        let base_url = if base_url.ends_with('/') {
316            base_url.to_string()
317        } else {
318            format!("{base_url}/")
319        };
320
321        let base_url = Url::parse(&base_url).into_iota_result()?;
322
323        let cache = MokaCacheBuilder::new(cache_size)
324            .time_to_idle(Duration::from_secs(600))
325            .build();
326
327        Ok(Self {
328            base_url,
329            client,
330            cache,
331            metrics,
332        })
333    }
334
335    fn get_url(&self, key: &Key) -> IotaResult<Url> {
336        let (item_type, digest) = key.to_path_elements();
337        let joined = self
338            .base_url
339            .join(&format!("{item_type}/{digest}"))
340            .into_iota_result()?;
341        Url::from_str(joined.as_str()).into_iota_result()
342    }
343
344    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
345        let uris_vec = uris.to_vec();
346        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
347        fetches.buffered(uris.len()).collect::<Vec<_>>().await
348    }
349
350    async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
351        let url = self.get_url(&key)?;
352
353        trace!("fetching url: {}", url);
354
355        if let Some(res) = self.cache.get(&url) {
356            trace!("found cached data for url: {}, len: {:?}", url, res.len());
357            self.metrics
358                .key_value_store_num_fetches_success
359                .with_label_values(&["http_cache", "url"])
360                .inc();
361            return Ok(Some(res));
362        }
363
364        self.metrics
365            .key_value_store_num_fetches_not_found
366            .with_label_values(&["http_cache", "url"])
367            .inc();
368
369        let resp = self
370            .client
371            .get(url.clone())
372            .send()
373            .await
374            .into_iota_result()?;
375        trace!(
376            "got response {} for url: {}, len: {:?}",
377            url,
378            resp.status(),
379            resp.headers()
380                .get(CONTENT_LENGTH)
381                .unwrap_or(&HeaderValue::from_static("0"))
382        );
383        // return None if 400
384        if resp.status().is_success() {
385            let bytes = resp.bytes().await.into_iota_result()?;
386            self.cache.insert(url, bytes.clone());
387
388            Ok(Some(bytes))
389        } else {
390            Ok(None)
391        }
392    }
393}
394
395fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
396where
397    K: std::fmt::Debug,
398    T: for<'de> Deserialize<'de>,
399{
400    bcs::from_bytes(bytes)
401        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
402        .ok()
403}
404
405fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
406where
407    K: std::fmt::Debug,
408{
409    let (fetch, key) = fetch;
410    match fetch {
411        Ok(Some(bytes)) => Some((bytes, key)),
412        Ok(None) => None,
413        Err(err) => {
414            warn!("Error fetching key: {:?}, error: {:?}", key, err);
415            None
416        }
417    }
418}
419
420fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
421    let mut start = 0;
422    lengths
423        .iter()
424        .map(|length| {
425            let end = start + length;
426            let result = &slice[start..end];
427            start = end;
428            result
429        })
430        .collect()
431}
432
433fn deser_check_digest<T, D>(
434    digest: &D,
435    bytes: &Bytes,
436    get_expected_digest: impl FnOnce(&T) -> D,
437) -> Option<T>
438where
439    D: std::fmt::Debug + PartialEq,
440    T: for<'de> Deserialize<'de>,
441{
442    deser(digest, bytes).and_then(|o: T| {
443        let expected_digest = get_expected_digest(&o);
444        if expected_digest == *digest {
445            Some(o)
446        } else {
447            error!(
448                "Digest mismatch - expected: {:?}, got: {:?}",
449                digest, expected_digest,
450            );
451            None
452        }
453    })
454}
455
456#[async_trait]
457impl TransactionKeyValueStoreTrait for HttpKVStore {
458    #[instrument(level = "trace", skip_all)]
459    async fn multi_get(
460        &self,
461        transaction_keys: &[TransactionDigest],
462        effects_keys: &[TransactionDigest],
463    ) -> IotaResult<KVStoreTransactionData> {
464        let num_txns = transaction_keys.len();
465        let num_effects = effects_keys.len();
466
467        let keys = transaction_keys
468            .iter()
469            .map(|tx| Key::Transaction(*tx))
470            .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
471            .collect::<Vec<_>>();
472
473        let fetches = self.multi_fetch(keys).await;
474        let txn_slice = fetches[..num_txns].to_vec();
475        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
476
477        let txn_results = txn_slice
478            .iter()
479            .take(num_txns)
480            .zip(transaction_keys.iter())
481            .map(map_fetch)
482            .map(|maybe_bytes| {
483                maybe_bytes.and_then(|(bytes, digest)| {
484                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
485                })
486            })
487            .collect::<Vec<_>>();
488
489        let fx_results = fx_slice
490            .iter()
491            .take(num_effects)
492            .zip(effects_keys.iter())
493            .map(map_fetch)
494            .map(|maybe_bytes| {
495                maybe_bytes.and_then(|(bytes, digest)| {
496                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
497                        *fx.transaction_digest()
498                    })
499                })
500            })
501            .collect::<Vec<_>>();
502
503        Ok((txn_results, fx_results))
504    }
505
506    #[instrument(level = "trace", skip_all)]
507    async fn multi_get_checkpoints(
508        &self,
509        checkpoint_summaries: &[CheckpointSequenceNumber],
510        checkpoint_contents: &[CheckpointSequenceNumber],
511        checkpoint_summaries_by_digest: &[CheckpointDigest],
512    ) -> IotaResult<(
513        Vec<Option<CertifiedCheckpointSummary>>,
514        Vec<Option<CheckpointContents>>,
515        Vec<Option<CertifiedCheckpointSummary>>,
516    )> {
517        let keys = checkpoint_summaries
518            .iter()
519            .map(|cp| Key::CheckpointSummary(*cp))
520            .chain(
521                checkpoint_contents
522                    .iter()
523                    .map(|cp| Key::CheckpointContents(*cp)),
524            )
525            .chain(
526                checkpoint_summaries_by_digest
527                    .iter()
528                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
529            )
530            .collect::<Vec<_>>();
531
532        let summaries_len = checkpoint_summaries.len();
533        let contents_len = checkpoint_contents.len();
534        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
535
536        let fetches = self.multi_fetch(keys).await;
537
538        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
539
540        let result_slices = multi_split_slice(&fetches, &input_slices);
541
542        let summaries_results = result_slices[0]
543            .iter()
544            .zip(checkpoint_summaries.iter())
545            .map(map_fetch)
546            .map(|maybe_bytes| {
547                maybe_bytes
548                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
549            })
550            .collect::<Vec<_>>();
551
552        let contents_results = result_slices[1]
553            .iter()
554            .zip(checkpoint_contents.iter())
555            .map(map_fetch)
556            .map(|maybe_bytes| {
557                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
558            })
559            .collect::<Vec<_>>();
560
561        let summaries_by_digest_results = result_slices[2]
562            .iter()
563            .zip(checkpoint_summaries_by_digest.iter())
564            .map(map_fetch)
565            .map(|maybe_bytes| {
566                maybe_bytes.and_then(|(bytes, digest)| {
567                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
568                })
569            })
570            .collect::<Vec<_>>();
571
572        Ok((
573            summaries_results,
574            contents_results,
575            summaries_by_digest_results,
576        ))
577    }
578
579    #[instrument(level = "trace", skip_all)]
580    async fn get_transaction_perpetual_checkpoint(
581        &self,
582        digest: TransactionDigest,
583    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
584        let key = Key::TransactionToCheckpoint(digest);
585        self.fetch(key).await.map(|maybe| {
586            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
587        })
588    }
589
590    #[instrument(level = "trace", skip_all)]
591    async fn get_object(
592        &self,
593        object_id: ObjectID,
594        version: SequenceNumber,
595    ) -> IotaResult<Option<Object>> {
596        let key = Key::ObjectKey(ObjectKey(object_id, version));
597        self.fetch(key)
598            .await
599            .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
600    }
601
602    #[instrument(level = "trace", skip_all)]
603    async fn multi_get_objects(
604        &self,
605        object_keys: &[ObjectKey],
606    ) -> IotaResult<Vec<Option<Object>>> {
607        let keys = object_keys
608            .iter()
609            .map(|key| Key::ObjectKey(*key))
610            .collect::<Vec<_>>();
611
612        let fetches = self.multi_fetch(keys).await;
613
614        let results = fetches
615            .iter()
616            .zip(object_keys.iter())
617            .map(map_fetch)
618            .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
619            .collect::<Vec<_>>();
620
621        Ok(results)
622    }
623
624    #[instrument(level = "trace", skip_all)]
625    async fn multi_get_transactions_perpetual_checkpoints(
626        &self,
627        digests: &[TransactionDigest],
628    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
629        let keys = digests
630            .iter()
631            .map(|digest| Key::TransactionToCheckpoint(*digest))
632            .collect::<Vec<_>>();
633
634        let fetches = self.multi_fetch(keys).await;
635
636        let results = fetches
637            .iter()
638            .zip(digests.iter())
639            .map(map_fetch)
640            .map(|maybe_bytes| {
641                maybe_bytes
642                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
643            })
644            .collect::<Vec<_>>();
645
646        Ok(results)
647    }
648
649    #[instrument(level = "trace", skip_all)]
650    async fn multi_get_events_by_tx_digests(
651        &self,
652        digests: &[TransactionDigest],
653    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
654        let keys = digests
655            .iter()
656            .map(|digest| Key::EventsByTransactionDigest(*digest))
657            .collect::<Vec<_>>();
658        Ok(self
659            .multi_fetch(keys)
660            .await
661            .iter()
662            .zip(digests.iter())
663            .map(map_fetch)
664            .map(|maybe_bytes| {
665                maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
666            })
667            .collect::<Vec<_>>())
668    }
669}