iota_storage/
http_key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12    base_types::{ObjectID, SequenceNumber, VersionNumber},
13    digests::{CheckpointDigest, TransactionDigest},
14    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15    error::{IotaError, IotaResult},
16    messages_checkpoint::{
17        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18    },
19    object::Object,
20    storage::ObjectKey,
21    transaction::Transaction,
22};
23use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
24use reqwest::{
25    Client, Url,
26    header::{CONTENT_LENGTH, HeaderValue},
27};
28use serde::{Deserialize, Serialize};
29use tap::TapFallible;
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33    key_value_store::{
34        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
35    },
36    key_value_store_metrics::KeyValueStoreMetrics,
37};
38
39pub struct HttpKVStore {
40    base_url: Url,
41    client: Client,
42    cache: MokaCache<Url, Bytes>,
43    metrics: Arc<KeyValueStoreMetrics>,
44}
45
46pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
47    base64_url::encode(digest)
48}
49
50// for non-digest keys, we need a tag to make sure we don't have collisions
51#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52pub enum TaggedKey {
53    CheckpointSequenceNumber(CheckpointSequenceNumber),
54}
55
56pub fn encoded_tagged_key(key: &TaggedKey) -> String {
57    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
58    base64_url::encode(&bytes)
59}
60
61pub fn encode_object_key(object_id: &ObjectID, version: &VersionNumber) -> String {
62    let bytes =
63        bcs::to_bytes(&ObjectKey(*object_id, *version)).expect("failed to serialize object key");
64    base64_url::encode(&bytes)
65}
66
67trait IntoIotaResult<T> {
68    fn into_iota_result(self) -> IotaResult<T>;
69}
70
71impl<T, E> IntoIotaResult<T> for Result<T, E>
72where
73    E: std::error::Error,
74{
75    fn into_iota_result(self) -> IotaResult<T> {
76        self.map_err(|e| IotaError::Storage(e.to_string()))
77    }
78}
79
80/// Represents the supported items the REST API accepts when fetching the data
81/// based on Digest or Sequence number.
82#[derive(
83    Clone,
84    Copy,
85    Debug,
86    PartialEq,
87    Eq,
88    PartialOrd,
89    Ord,
90    Hash,
91    Deserialize,
92    strum::EnumString,
93    strum::Display,
94)]
95pub enum ItemType {
96    #[strum(serialize = "tx")]
97    #[serde(rename = "tx")]
98    Transaction,
99    #[strum(serialize = "fx")]
100    #[serde(rename = "fx")]
101    TransactionEffects,
102    #[strum(serialize = "cc")]
103    #[serde(rename = "cc")]
104    CheckpointContents,
105    #[strum(serialize = "cs")]
106    #[serde(rename = "cs")]
107    CheckpointSummary,
108    #[strum(serialize = "tx2c")]
109    #[serde(rename = "tx2c")]
110    TransactionToCheckpoint,
111    #[strum(serialize = "ob")]
112    #[serde(rename = "ob")]
113    Object,
114    #[strum(serialize = "evtx")]
115    #[serde(rename = "evtx")]
116    EventTransactionDigest,
117}
118
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
120pub enum Key {
121    Transaction(TransactionDigest),
122    TransactionEffects(TransactionDigest),
123    CheckpointContents(CheckpointSequenceNumber),
124    CheckpointSummary(CheckpointSequenceNumber),
125    CheckpointSummaryByDigest(CheckpointDigest),
126    TransactionToCheckpoint(TransactionDigest),
127    ObjectKey(ObjectID, VersionNumber),
128    EventsByTransactionDigest(TransactionDigest),
129}
130
131impl Key {
132    // Create a [`Key`] instance based on the provided item type and
133    /// [`base64_url`] encoded string.
134    ///
135    /// # Example
136    ///
137    /// ```rust
138    /// use std::str::FromStr;
139    ///
140    /// use iota_storage::http_key_value_store::Key;
141    /// use iota_types::digests::TransactionDigest;
142    ///
143    /// let key = Key::new("tx", "7jb54RvJduLj9HdV9L41UJqZ5KWdzYY2rl1eL8AVl9o").unwrap();
144    /// assert_eq!(
145    ///     key,
146    ///     Key::Transaction(
147    ///         TransactionDigest::from_str("H2tetNL3CfroDF3iJNA7wFo6oRQiJedGTeykZi6HAGqP").unwrap()
148    ///     )
149    /// );
150    /// ```
151    pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
152        let item_type =
153            ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
154        let decoded_key = base64_url::decode(encoded_key)
155            .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
156
157        match item_type {
158            ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
159                decoded_key.as_slice(),
160            )?)),
161            ItemType::TransactionEffects => Ok(Key::TransactionEffects(
162                TransactionDigest::try_from(decoded_key.as_slice())?,
163            )),
164            ItemType::CheckpointContents => {
165                let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
166                    anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
167                })?;
168                match tagged_key {
169                    TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
170                }
171            }
172            ItemType::CheckpointSummary => {
173                // first try to decode as digest, otherwise try to decode as tagged key
174                match CheckpointDigest::try_from(decoded_key.clone()) {
175                    Err(_) => {
176                        let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
177                            anyhow::anyhow!(
178                                "failed to deserialize checkpoint sequence number: {err}"
179                            )
180                        })?;
181                        match tagged_key {
182                            TaggedKey::CheckpointSequenceNumber(seq) => {
183                                Ok(Key::CheckpointSummary(seq))
184                            }
185                        }
186                    }
187                    Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
188                }
189            }
190            ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
191                TransactionDigest::try_from(decoded_key.as_slice())?,
192            )),
193            ItemType::Object => {
194                let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
195                    .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
196
197                Ok(Key::ObjectKey(object_key.0, object_key.1))
198            }
199            ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
200                TransactionDigest::try_from(decoded_key.as_slice())?,
201            )),
202        }
203    }
204
205    /// Get the REST API resource type.
206    ///
207    /// This method returns the corresponding resource type string
208    /// for a given `Key` variant.
209    ///
210    /// This is used to construct the REST API route,
211    /// typically in the format `/{item_type}/{digest}`.
212    ///
213    /// # Example
214    /// ```rust
215    /// use iota_storage::http_key_value_store::{ItemType, Key};
216    /// use iota_types::digests::TransactionDigest;
217    ///
218    /// let item_type = Key::CheckpointContents(1).item_type();
219    /// assert_eq!(item_type, ItemType::CheckpointContents);
220    /// let item_type = Key::Transaction(TransactionDigest::random()).item_type();
221    /// assert_eq!(item_type, ItemType::Transaction);
222    /// ```
223    pub fn item_type(&self) -> ItemType {
224        match self {
225            Key::Transaction(_) => ItemType::Transaction,
226            Key::TransactionEffects(_) => ItemType::TransactionEffects,
227            Key::CheckpointContents(_) => ItemType::CheckpointContents,
228            Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
229                ItemType::CheckpointSummary
230            }
231            Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
232            Key::ObjectKey(_, _) => ItemType::Object,
233            Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
234        }
235    }
236
237    /// Returns a tuple containing the resource type and the encoded key.
238    ///
239    /// This is used to construct the REST API route, typically in the format
240    /// `/:item_type/:digest`.
241    ///
242    /// # Examples
243    ///
244    /// ```rust
245    /// use iota_storage::http_key_value_store::{
246    ///     ItemType, Key, TaggedKey, encode_digest, encode_object_key, encoded_tagged_key,
247    /// };
248    /// use iota_types::digests::TransactionDigest;
249    ///
250    /// let tx_digest = TransactionDigest::random();
251    /// // encode the tx_digest as base64 url
252    /// let expected_encoded_digest = encode_digest(&tx_digest);
253    /// let key = Key::Transaction(tx_digest);
254    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
255    /// assert_eq!(resource_type, ItemType::Transaction);
256    /// assert_eq!(encoded_key_digest, expected_encoded_digest);
257    ///
258    /// let chk_seq_num = 123;
259    /// let key = Key::CheckpointSummary(chk_seq_num);
260    /// // encode the checkpoint sequence number as base64 url
261    /// let expected_encoded_seq_num =
262    ///     encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(chk_seq_num));
263    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
264    /// assert_eq!(resource_type, ItemType::CheckpointSummary);
265    /// assert_eq!(encoded_key_digest, expected_encoded_seq_num);
266    /// ```
267    pub fn to_path_elements(&self) -> (ItemType, String) {
268        let encoded_key_digest = match self {
269            Key::Transaction(digest) => encode_digest(digest),
270            Key::TransactionEffects(digest) => encode_digest(digest),
271            Key::CheckpointContents(seq) => {
272                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
273            }
274            Key::CheckpointSummary(seq) => {
275                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
276            }
277            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
278            Key::TransactionToCheckpoint(digest) => encode_digest(digest),
279            Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
280            Key::EventsByTransactionDigest(digest) => encode_digest(digest),
281        };
282
283        (self.item_type(), encoded_key_digest)
284    }
285}
286
287#[derive(Clone, Debug)]
288enum Value {
289    Tx(Box<Transaction>),
290    Fx(Box<TransactionEffects>),
291    Events(Box<TransactionEvents>),
292    CheckpointContents(Box<CheckpointContents>),
293    CheckpointSummary(Box<CertifiedCheckpointSummary>),
294    TxToCheckpoint(CheckpointSequenceNumber),
295}
296
297impl HttpKVStore {
298    pub fn new_kv(
299        base_url: &str,
300        cache_size: u64,
301        metrics: Arc<KeyValueStoreMetrics>,
302    ) -> IotaResult<TransactionKeyValueStore> {
303        let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
304        Ok(TransactionKeyValueStore::new("http", metrics, inner))
305    }
306
307    pub fn new(
308        base_url: &str,
309        cache_size: u64,
310        metrics: Arc<KeyValueStoreMetrics>,
311    ) -> IotaResult<Self> {
312        info!("creating HttpKVStore with base_url: {}", base_url);
313
314        let client = Client::builder().http2_prior_knowledge().build().unwrap();
315
316        let base_url = if base_url.ends_with('/') {
317            base_url.to_string()
318        } else {
319            format!("{base_url}/")
320        };
321
322        let base_url = Url::parse(&base_url).into_iota_result()?;
323
324        let cache = MokaCacheBuilder::new(cache_size)
325            .time_to_idle(Duration::from_secs(600))
326            .build();
327
328        Ok(Self {
329            base_url,
330            client,
331            cache,
332            metrics,
333        })
334    }
335
336    fn get_url(&self, key: &Key) -> IotaResult<Url> {
337        let (item_type, digest) = key.to_path_elements();
338        let joined = self
339            .base_url
340            .join(&format!("{item_type}/{digest}"))
341            .into_iota_result()?;
342        Url::from_str(joined.as_str()).into_iota_result()
343    }
344
345    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
346        let uris_vec = uris.to_vec();
347        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
348        fetches.buffered(uris.len()).collect::<Vec<_>>().await
349    }
350
351    async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
352        let url = self.get_url(&key)?;
353
354        trace!("fetching url: {}", url);
355
356        if let Some(res) = self.cache.get(&url) {
357            trace!("found cached data for url: {}, len: {:?}", url, res.len());
358            self.metrics
359                .key_value_store_num_fetches_success
360                .with_label_values(&["http_cache", "url"])
361                .inc();
362            return Ok(Some(res));
363        }
364
365        self.metrics
366            .key_value_store_num_fetches_not_found
367            .with_label_values(&["http_cache", "url"])
368            .inc();
369
370        let resp = self
371            .client
372            .get(url.clone())
373            .send()
374            .await
375            .into_iota_result()?;
376        trace!(
377            "got response {} for url: {}, len: {:?}",
378            url,
379            resp.status(),
380            resp.headers()
381                .get(CONTENT_LENGTH)
382                .unwrap_or(&HeaderValue::from_static("0"))
383        );
384        // return None if 400
385        if resp.status().is_success() {
386            let bytes = resp.bytes().await.into_iota_result()?;
387            self.cache.insert(url, bytes.clone());
388
389            Ok(Some(bytes))
390        } else {
391            Ok(None)
392        }
393    }
394}
395
396fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
397where
398    K: std::fmt::Debug,
399    T: for<'de> Deserialize<'de>,
400{
401    bcs::from_bytes(bytes)
402        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
403        .ok()
404}
405
406fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
407where
408    K: std::fmt::Debug,
409{
410    let (fetch, key) = fetch;
411    match fetch {
412        Ok(Some(bytes)) => Some((bytes, key)),
413        Ok(None) => None,
414        Err(err) => {
415            warn!("Error fetching key: {:?}, error: {:?}", key, err);
416            None
417        }
418    }
419}
420
421fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
422    let mut start = 0;
423    lengths
424        .iter()
425        .map(|length| {
426            let end = start + length;
427            let result = &slice[start..end];
428            start = end;
429            result
430        })
431        .collect()
432}
433
434fn deser_check_digest<T, D>(
435    digest: &D,
436    bytes: &Bytes,
437    get_expected_digest: impl FnOnce(&T) -> D,
438) -> Option<T>
439where
440    D: std::fmt::Debug + PartialEq,
441    T: for<'de> Deserialize<'de>,
442{
443    deser(digest, bytes).and_then(|o: T| {
444        let expected_digest = get_expected_digest(&o);
445        if expected_digest == *digest {
446            Some(o)
447        } else {
448            error!(
449                "Digest mismatch - expected: {:?}, got: {:?}",
450                digest, expected_digest,
451            );
452            None
453        }
454    })
455}
456
457#[async_trait]
458impl TransactionKeyValueStoreTrait for HttpKVStore {
459    #[instrument(level = "trace", skip_all)]
460    async fn multi_get(
461        &self,
462        transaction_keys: &[TransactionDigest],
463        effects_keys: &[TransactionDigest],
464    ) -> IotaResult<KVStoreTransactionData> {
465        let num_txns = transaction_keys.len();
466        let num_effects = effects_keys.len();
467
468        let keys = transaction_keys
469            .iter()
470            .map(|tx| Key::Transaction(*tx))
471            .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
472            .collect::<Vec<_>>();
473
474        let fetches = self.multi_fetch(keys).await;
475        let txn_slice = fetches[..num_txns].to_vec();
476        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
477
478        let txn_results = txn_slice
479            .iter()
480            .take(num_txns)
481            .zip(transaction_keys.iter())
482            .map(map_fetch)
483            .map(|maybe_bytes| {
484                maybe_bytes.and_then(|(bytes, digest)| {
485                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
486                })
487            })
488            .collect::<Vec<_>>();
489
490        let fx_results = fx_slice
491            .iter()
492            .take(num_effects)
493            .zip(effects_keys.iter())
494            .map(map_fetch)
495            .map(|maybe_bytes| {
496                maybe_bytes.and_then(|(bytes, digest)| {
497                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
498                        *fx.transaction_digest()
499                    })
500                })
501            })
502            .collect::<Vec<_>>();
503
504        Ok((txn_results, fx_results))
505    }
506
507    #[instrument(level = "trace", skip_all)]
508    async fn multi_get_checkpoints(
509        &self,
510        checkpoint_summaries: &[CheckpointSequenceNumber],
511        checkpoint_contents: &[CheckpointSequenceNumber],
512        checkpoint_summaries_by_digest: &[CheckpointDigest],
513    ) -> IotaResult<(
514        Vec<Option<CertifiedCheckpointSummary>>,
515        Vec<Option<CheckpointContents>>,
516        Vec<Option<CertifiedCheckpointSummary>>,
517    )> {
518        let keys = checkpoint_summaries
519            .iter()
520            .map(|cp| Key::CheckpointSummary(*cp))
521            .chain(
522                checkpoint_contents
523                    .iter()
524                    .map(|cp| Key::CheckpointContents(*cp)),
525            )
526            .chain(
527                checkpoint_summaries_by_digest
528                    .iter()
529                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
530            )
531            .collect::<Vec<_>>();
532
533        let summaries_len = checkpoint_summaries.len();
534        let contents_len = checkpoint_contents.len();
535        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
536
537        let fetches = self.multi_fetch(keys).await;
538
539        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
540
541        let result_slices = multi_split_slice(&fetches, &input_slices);
542
543        let summaries_results = result_slices[0]
544            .iter()
545            .zip(checkpoint_summaries.iter())
546            .map(map_fetch)
547            .map(|maybe_bytes| {
548                maybe_bytes
549                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
550            })
551            .collect::<Vec<_>>();
552
553        let contents_results = result_slices[1]
554            .iter()
555            .zip(checkpoint_contents.iter())
556            .map(map_fetch)
557            .map(|maybe_bytes| {
558                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
559            })
560            .collect::<Vec<_>>();
561
562        let summaries_by_digest_results = result_slices[2]
563            .iter()
564            .zip(checkpoint_summaries_by_digest.iter())
565            .map(map_fetch)
566            .map(|maybe_bytes| {
567                maybe_bytes.and_then(|(bytes, digest)| {
568                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
569                })
570            })
571            .collect::<Vec<_>>();
572
573        Ok((
574            summaries_results,
575            contents_results,
576            summaries_by_digest_results,
577        ))
578    }
579
580    #[instrument(level = "trace", skip_all)]
581    async fn get_transaction_perpetual_checkpoint(
582        &self,
583        digest: TransactionDigest,
584    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
585        let key = Key::TransactionToCheckpoint(digest);
586        self.fetch(key).await.map(|maybe| {
587            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
588        })
589    }
590
591    #[instrument(level = "trace", skip_all)]
592    async fn get_object(
593        &self,
594        object_id: ObjectID,
595        version: SequenceNumber,
596    ) -> IotaResult<Option<Object>> {
597        let key = Key::ObjectKey(object_id, version);
598        self.fetch(key)
599            .await
600            .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
601    }
602
603    #[instrument(level = "trace", skip_all)]
604    async fn multi_get_transactions_perpetual_checkpoints(
605        &self,
606        digests: &[TransactionDigest],
607    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
608        let keys = digests
609            .iter()
610            .map(|digest| Key::TransactionToCheckpoint(*digest))
611            .collect::<Vec<_>>();
612
613        let fetches = self.multi_fetch(keys).await;
614
615        let results = fetches
616            .iter()
617            .zip(digests.iter())
618            .map(map_fetch)
619            .map(|maybe_bytes| {
620                maybe_bytes
621                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
622            })
623            .collect::<Vec<_>>();
624
625        Ok(results)
626    }
627
628    #[instrument(level = "trace", skip_all)]
629    async fn multi_get_events_by_tx_digests(
630        &self,
631        digests: &[TransactionDigest],
632    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
633        let keys = digests
634            .iter()
635            .map(|digest| Key::EventsByTransactionDigest(*digest))
636            .collect::<Vec<_>>();
637        Ok(self
638            .multi_fetch(keys)
639            .await
640            .iter()
641            .zip(digests.iter())
642            .map(map_fetch)
643            .map(|maybe_bytes| {
644                maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
645            })
646            .collect::<Vec<_>>())
647    }
648}