iota_storage/
http_key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, sync::Arc};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12    base_types::{ObjectID, SequenceNumber, VersionNumber},
13    digests::{CheckpointDigest, TransactionDigest},
14    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15    error::{IotaError, IotaResult},
16    messages_checkpoint::{
17        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18    },
19    object::Object,
20    storage::ObjectKey,
21    transaction::Transaction,
22};
23use reqwest::{
24    Client, Url,
25    header::{CONTENT_LENGTH, HeaderValue},
26};
27use serde::{Deserialize, Serialize};
28use tap::TapFallible;
29use tracing::{error, info, instrument, trace, warn};
30
31use crate::{
32    key_value_store::{
33        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
34    },
35    key_value_store_metrics::KeyValueStoreMetrics,
36};
37
38pub struct HttpKVStore {
39    base_url: Url,
40    client: Client,
41}
42
43pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
44    base64_url::encode(digest)
45}
46
47// for non-digest keys, we need a tag to make sure we don't have collisions
48#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
49pub enum TaggedKey {
50    CheckpointSequenceNumber(CheckpointSequenceNumber),
51}
52
53pub fn encoded_tagged_key(key: &TaggedKey) -> String {
54    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
55    base64_url::encode(&bytes)
56}
57
58pub fn encode_object_key(object_id: &ObjectID, version: &VersionNumber) -> String {
59    let bytes =
60        bcs::to_bytes(&ObjectKey(*object_id, *version)).expect("failed to serialize object key");
61    base64_url::encode(&bytes)
62}
63
64trait IntoIotaResult<T> {
65    fn into_iota_result(self) -> IotaResult<T>;
66}
67
68impl<T, E> IntoIotaResult<T> for Result<T, E>
69where
70    E: std::error::Error,
71{
72    fn into_iota_result(self) -> IotaResult<T> {
73        self.map_err(|e| IotaError::Storage(e.to_string()))
74    }
75}
76
77/// Represents the supported items the REST API accepts when fetching the data
78/// based on Digest or Sequence number.
79#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, strum::EnumString, strum::Display)]
80pub enum ItemType {
81    #[strum(serialize = "tx")]
82    #[serde(rename = "tx")]
83    Transaction,
84    #[strum(serialize = "fx")]
85    #[serde(rename = "fx")]
86    TransactionEffects,
87    #[strum(serialize = "cc")]
88    #[serde(rename = "cc")]
89    CheckpointContents,
90    #[strum(serialize = "cs")]
91    #[serde(rename = "cs")]
92    CheckpointSummary,
93    #[strum(serialize = "tx2c")]
94    #[serde(rename = "tx2c")]
95    TransactionToCheckpoint,
96    #[strum(serialize = "ob")]
97    #[serde(rename = "ob")]
98    Object,
99    #[strum(serialize = "evtx")]
100    #[serde(rename = "evtx")]
101    EventTransactionDigest,
102}
103
104#[derive(Clone, Copy, Debug, PartialEq, Eq)]
105pub enum Key {
106    Transaction(TransactionDigest),
107    TransactionEffects(TransactionDigest),
108    CheckpointContents(CheckpointSequenceNumber),
109    CheckpointSummary(CheckpointSequenceNumber),
110    CheckpointSummaryByDigest(CheckpointDigest),
111    TransactionToCheckpoint(TransactionDigest),
112    ObjectKey(ObjectID, VersionNumber),
113    EventsByTransactionDigest(TransactionDigest),
114}
115
116impl Key {
117    // Create a [`Key`] instance based on the provided item type and
118    /// [`base64_url`] encoded string.
119    ///
120    /// # Example
121    ///
122    /// ```rust
123    /// use std::str::FromStr;
124    ///
125    /// use iota_storage::http_key_value_store::Key;
126    /// use iota_types::digests::TransactionDigest;
127    ///
128    /// let key = Key::new("tx", "7jb54RvJduLj9HdV9L41UJqZ5KWdzYY2rl1eL8AVl9o").unwrap();
129    /// assert_eq!(
130    ///     key,
131    ///     Key::Transaction(
132    ///         TransactionDigest::from_str("H2tetNL3CfroDF3iJNA7wFo6oRQiJedGTeykZi6HAGqP").unwrap()
133    ///     )
134    /// );
135    /// ```
136    pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
137        let item_type =
138            ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
139        let decoded_key = base64_url::decode(encoded_key)
140            .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
141
142        match item_type {
143            ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
144                decoded_key.as_slice(),
145            )?)),
146            ItemType::TransactionEffects => Ok(Key::TransactionEffects(
147                TransactionDigest::try_from(decoded_key.as_slice())?,
148            )),
149            ItemType::CheckpointContents => {
150                let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
151                    anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
152                })?;
153                match tagged_key {
154                    TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
155                }
156            }
157            ItemType::CheckpointSummary => {
158                // first try to decode as digest, otherwise try to decode as tagged key
159                match CheckpointDigest::try_from(decoded_key.clone()) {
160                    Err(_) => {
161                        let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
162                            anyhow::anyhow!(
163                                "failed to deserialize checkpoint sequence number: {err}"
164                            )
165                        })?;
166                        match tagged_key {
167                            TaggedKey::CheckpointSequenceNumber(seq) => {
168                                Ok(Key::CheckpointSummary(seq))
169                            }
170                        }
171                    }
172                    Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
173                }
174            }
175            ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
176                TransactionDigest::try_from(decoded_key.as_slice())?,
177            )),
178            ItemType::Object => {
179                let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
180                    .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
181
182                Ok(Key::ObjectKey(object_key.0, object_key.1))
183            }
184            ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
185                TransactionDigest::try_from(decoded_key.as_slice())?,
186            )),
187        }
188    }
189
190    /// Get the REST API resource type.
191    ///
192    /// This method returns the corresponding resource type string
193    /// for a given `Key` variant.
194    ///
195    /// This is used to construct the REST API route,
196    /// typically in the format `/:item_type/:digest`.
197    ///
198    /// # Example
199    /// ```rust
200    /// use iota_storage::http_key_value_store::{ItemType, Key};
201    /// use iota_types::digests::TransactionDigest;
202    ///
203    /// let item_type = Key::CheckpointContents(1).item_type();
204    /// assert_eq!(item_type, ItemType::CheckpointContents);
205    /// let item_type = Key::Transaction(TransactionDigest::random()).item_type();
206    /// assert_eq!(item_type, ItemType::Transaction);
207    /// ```
208    pub fn item_type(&self) -> ItemType {
209        match self {
210            Key::Transaction(_) => ItemType::Transaction,
211            Key::TransactionEffects(_) => ItemType::TransactionEffects,
212            Key::CheckpointContents(_) => ItemType::CheckpointContents,
213            Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
214                ItemType::CheckpointSummary
215            }
216            Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
217            Key::ObjectKey(_, _) => ItemType::Object,
218            Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
219        }
220    }
221
222    /// Returns a tuple containing the resource type and the encoded key.
223    ///
224    /// This is used to construct the REST API route, typically in the format
225    /// `/:item_type/:digest`.
226    ///
227    /// # Examples
228    ///
229    /// ```rust
230    /// use iota_storage::http_key_value_store::{
231    ///     ItemType, Key, TaggedKey, encode_digest, encode_object_key, encoded_tagged_key,
232    /// };
233    /// use iota_types::digests::TransactionDigest;
234    ///
235    /// let tx_digest = TransactionDigest::random();
236    /// // encode the tx_digest as base64 url
237    /// let expected_encoded_digest = encode_digest(&tx_digest);
238    /// let key = Key::Transaction(tx_digest);
239    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
240    /// assert_eq!(resource_type, ItemType::Transaction);
241    /// assert_eq!(encoded_key_digest, expected_encoded_digest);
242    ///
243    /// let chk_seq_num = 123;
244    /// let key = Key::CheckpointSummary(chk_seq_num);
245    /// // encode the checkpoint sequence number as base64 url
246    /// let expected_encoded_seq_num =
247    ///     encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(chk_seq_num));
248    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
249    /// assert_eq!(resource_type, ItemType::CheckpointSummary);
250    /// assert_eq!(encoded_key_digest, expected_encoded_seq_num);
251    /// ```
252    pub fn to_path_elements(&self) -> (ItemType, String) {
253        let encoded_key_digest = match self {
254            Key::Transaction(digest) => encode_digest(digest),
255            Key::TransactionEffects(digest) => encode_digest(digest),
256            Key::CheckpointContents(seq) => {
257                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
258            }
259            Key::CheckpointSummary(seq) => {
260                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
261            }
262            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
263            Key::TransactionToCheckpoint(digest) => encode_digest(digest),
264            Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
265            Key::EventsByTransactionDigest(digest) => encode_digest(digest),
266        };
267
268        (self.item_type(), encoded_key_digest)
269    }
270}
271
272#[derive(Clone, Debug)]
273enum Value {
274    Tx(Box<Transaction>),
275    Fx(Box<TransactionEffects>),
276    Events(Box<TransactionEvents>),
277    CheckpointContents(Box<CheckpointContents>),
278    CheckpointSummary(Box<CertifiedCheckpointSummary>),
279    TxToCheckpoint(CheckpointSequenceNumber),
280}
281
282impl HttpKVStore {
283    pub fn new_kv(
284        base_url: &str,
285        metrics: Arc<KeyValueStoreMetrics>,
286    ) -> IotaResult<TransactionKeyValueStore> {
287        let inner = Arc::new(Self::new(base_url)?);
288        Ok(TransactionKeyValueStore::new("http", metrics, inner))
289    }
290
291    pub fn new(base_url: &str) -> IotaResult<Self> {
292        info!("creating HttpKVStore with base_url: {}", base_url);
293
294        let client = Client::builder().http2_prior_knowledge().build().unwrap();
295
296        let base_url = if base_url.ends_with('/') {
297            base_url.to_string()
298        } else {
299            format!("{}/", base_url)
300        };
301
302        let base_url = Url::parse(&base_url).into_iota_result()?;
303
304        Ok(Self { base_url, client })
305    }
306
307    fn get_url(&self, key: &Key) -> IotaResult<Url> {
308        let (item_type, digest) = key.to_path_elements();
309        let joined = self
310            .base_url
311            .join(&format!("{item_type}/{digest}"))
312            .into_iota_result()?;
313        Url::from_str(joined.as_str()).into_iota_result()
314    }
315
316    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
317        let uris_vec = uris.to_vec();
318        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
319        fetches.buffered(uris.len()).collect::<Vec<_>>().await
320    }
321
322    async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
323        let url = self.get_url(&key)?;
324        trace!("fetching url: {}", url);
325        let resp = self
326            .client
327            .get(url.clone())
328            .send()
329            .await
330            .into_iota_result()?;
331        trace!(
332            "got response {} for url: {}, len: {:?}",
333            url,
334            resp.status(),
335            resp.headers()
336                .get(CONTENT_LENGTH)
337                .unwrap_or(&HeaderValue::from_static("0"))
338        );
339        // return None if 400
340        if resp.status().is_success() {
341            resp.bytes().await.map(Some).into_iota_result()
342        } else {
343            Ok(None)
344        }
345    }
346}
347
348fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
349where
350    K: std::fmt::Debug,
351    T: for<'de> Deserialize<'de>,
352{
353    bcs::from_bytes(bytes)
354        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
355        .ok()
356}
357
358fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
359where
360    K: std::fmt::Debug,
361{
362    let (fetch, key) = fetch;
363    match fetch {
364        Ok(Some(bytes)) => Some((bytes, key)),
365        Ok(None) => None,
366        Err(err) => {
367            warn!("Error fetching key: {:?}, error: {:?}", key, err);
368            None
369        }
370    }
371}
372
373fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
374    let mut start = 0;
375    lengths
376        .iter()
377        .map(|length| {
378            let end = start + length;
379            let result = &slice[start..end];
380            start = end;
381            result
382        })
383        .collect()
384}
385
386fn deser_check_digest<T, D>(
387    digest: &D,
388    bytes: &Bytes,
389    get_expected_digest: impl FnOnce(&T) -> D,
390) -> Option<T>
391where
392    D: std::fmt::Debug + PartialEq,
393    T: for<'de> Deserialize<'de>,
394{
395    deser(digest, bytes).and_then(|o: T| {
396        let expected_digest = get_expected_digest(&o);
397        if expected_digest == *digest {
398            Some(o)
399        } else {
400            error!(
401                "Digest mismatch - expected: {:?}, got: {:?}",
402                digest, expected_digest,
403            );
404            None
405        }
406    })
407}
408
409#[async_trait]
410impl TransactionKeyValueStoreTrait for HttpKVStore {
411    #[instrument(level = "trace", skip_all)]
412    async fn multi_get(
413        &self,
414        transaction_keys: &[TransactionDigest],
415        effects_keys: &[TransactionDigest],
416    ) -> IotaResult<KVStoreTransactionData> {
417        let num_txns = transaction_keys.len();
418        let num_effects = effects_keys.len();
419
420        let keys = transaction_keys
421            .iter()
422            .map(|tx| Key::Transaction(*tx))
423            .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
424            .collect::<Vec<_>>();
425
426        let fetches = self.multi_fetch(keys).await;
427        let txn_slice = fetches[..num_txns].to_vec();
428        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
429
430        let txn_results = txn_slice
431            .iter()
432            .take(num_txns)
433            .zip(transaction_keys.iter())
434            .map(map_fetch)
435            .map(|maybe_bytes| {
436                maybe_bytes.and_then(|(bytes, digest)| {
437                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
438                })
439            })
440            .collect::<Vec<_>>();
441
442        let fx_results = fx_slice
443            .iter()
444            .take(num_effects)
445            .zip(effects_keys.iter())
446            .map(map_fetch)
447            .map(|maybe_bytes| {
448                maybe_bytes.and_then(|(bytes, digest)| {
449                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
450                        *fx.transaction_digest()
451                    })
452                })
453            })
454            .collect::<Vec<_>>();
455
456        Ok((txn_results, fx_results))
457    }
458
459    #[instrument(level = "trace", skip_all)]
460    async fn multi_get_checkpoints(
461        &self,
462        checkpoint_summaries: &[CheckpointSequenceNumber],
463        checkpoint_contents: &[CheckpointSequenceNumber],
464        checkpoint_summaries_by_digest: &[CheckpointDigest],
465    ) -> IotaResult<(
466        Vec<Option<CertifiedCheckpointSummary>>,
467        Vec<Option<CheckpointContents>>,
468        Vec<Option<CertifiedCheckpointSummary>>,
469    )> {
470        let keys = checkpoint_summaries
471            .iter()
472            .map(|cp| Key::CheckpointSummary(*cp))
473            .chain(
474                checkpoint_contents
475                    .iter()
476                    .map(|cp| Key::CheckpointContents(*cp)),
477            )
478            .chain(
479                checkpoint_summaries_by_digest
480                    .iter()
481                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
482            )
483            .collect::<Vec<_>>();
484
485        let summaries_len = checkpoint_summaries.len();
486        let contents_len = checkpoint_contents.len();
487        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
488
489        let fetches = self.multi_fetch(keys).await;
490
491        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
492
493        let result_slices = multi_split_slice(&fetches, &input_slices);
494
495        let summaries_results = result_slices[0]
496            .iter()
497            .zip(checkpoint_summaries.iter())
498            .map(map_fetch)
499            .map(|maybe_bytes| {
500                maybe_bytes
501                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
502            })
503            .collect::<Vec<_>>();
504
505        let contents_results = result_slices[1]
506            .iter()
507            .zip(checkpoint_contents.iter())
508            .map(map_fetch)
509            .map(|maybe_bytes| {
510                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
511            })
512            .collect::<Vec<_>>();
513
514        let summaries_by_digest_results = result_slices[2]
515            .iter()
516            .zip(checkpoint_summaries_by_digest.iter())
517            .map(map_fetch)
518            .map(|maybe_bytes| {
519                maybe_bytes.and_then(|(bytes, digest)| {
520                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
521                })
522            })
523            .collect::<Vec<_>>();
524
525        Ok((
526            summaries_results,
527            contents_results,
528            summaries_by_digest_results,
529        ))
530    }
531
532    #[instrument(level = "trace", skip_all)]
533    async fn get_transaction_perpetual_checkpoint(
534        &self,
535        digest: TransactionDigest,
536    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
537        let key = Key::TransactionToCheckpoint(digest);
538        self.fetch(key).await.map(|maybe| {
539            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
540        })
541    }
542
543    #[instrument(level = "trace", skip_all)]
544    async fn get_object(
545        &self,
546        object_id: ObjectID,
547        version: SequenceNumber,
548    ) -> IotaResult<Option<Object>> {
549        let key = Key::ObjectKey(object_id, version);
550        self.fetch(key)
551            .await
552            .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
553    }
554
555    #[instrument(level = "trace", skip_all)]
556    async fn multi_get_transactions_perpetual_checkpoints(
557        &self,
558        digests: &[TransactionDigest],
559    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
560        let keys = digests
561            .iter()
562            .map(|digest| Key::TransactionToCheckpoint(*digest))
563            .collect::<Vec<_>>();
564
565        let fetches = self.multi_fetch(keys).await;
566
567        let results = fetches
568            .iter()
569            .zip(digests.iter())
570            .map(map_fetch)
571            .map(|maybe_bytes| {
572                maybe_bytes
573                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
574            })
575            .collect::<Vec<_>>();
576
577        Ok(results)
578    }
579
580    #[instrument(level = "trace", skip_all)]
581    async fn multi_get_events_by_tx_digests(
582        &self,
583        digests: &[TransactionDigest],
584    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
585        let keys = digests
586            .iter()
587            .map(|digest| Key::EventsByTransactionDigest(*digest))
588            .collect::<Vec<_>>();
589        Ok(self
590            .multi_fetch(keys)
591            .await
592            .iter()
593            .zip(digests.iter())
594            .map(map_fetch)
595            .map(|maybe_bytes| {
596                maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
597            })
598            .collect::<Vec<_>>())
599    }
600}