Skip to main content

iota_storage/
http_key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_sdk_types::ObjectId;
12use iota_types::{
13    base_types::{IotaAddress, SequenceNumber},
14    digests::{CheckpointDigest, TransactionDigest},
15    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
16    error::{IotaError, IotaResult},
17    messages_checkpoint::{
18        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
19    },
20    object::Object,
21    storage::ObjectKey,
22    transaction::Transaction,
23};
24use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
25use reqwest::{
26    Client, Url,
27    header::{CONTENT_LENGTH, HeaderValue},
28};
29use serde::{Deserialize, Serialize};
30use tap::TapFallible;
31use tracing::{error, info, instrument, trace, warn};
32
33use crate::{
34    key_value_store::{
35        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
36    },
37    key_value_store_metrics::KeyValueStoreMetrics,
38};
39
40pub struct HttpKVStore {
41    base_url: Url,
42    client: Client,
43    cache: MokaCache<Url, Bytes>,
44    metrics: Arc<KeyValueStoreMetrics>,
45}
46
47pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
48    base64_url::encode(digest)
49}
50
51// for non-digest keys, we need a tag to make sure we don't have collisions
52#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
53pub enum TaggedKey {
54    CheckpointSequenceNumber(CheckpointSequenceNumber),
55}
56
57pub fn encoded_tagged_key(key: &TaggedKey) -> String {
58    let bytes = bcs::to_bytes(key).expect("failed to serialize key");
59    base64_url::encode(&bytes)
60}
61
62pub fn encode_object_key(object_key: &ObjectKey) -> String {
63    let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
64    base64_url::encode(&bytes)
65}
66
67trait IntoIotaResult<T> {
68    fn into_iota_result(self) -> IotaResult<T>;
69}
70
71impl<T, E> IntoIotaResult<T> for Result<T, E>
72where
73    E: std::error::Error,
74{
75    fn into_iota_result(self) -> IotaResult<T> {
76        self.map_err(|e| IotaError::Storage(e.to_string()))
77    }
78}
79
80/// Represents the supported items the REST API accepts when fetching the data
81/// based on Digest or Sequence number.
82#[derive(
83    Clone,
84    Copy,
85    Debug,
86    PartialEq,
87    Eq,
88    PartialOrd,
89    Ord,
90    Hash,
91    Deserialize,
92    strum::EnumString,
93    strum::Display,
94)]
95pub enum ItemType {
96    #[strum(serialize = "tx")]
97    #[serde(rename = "tx")]
98    Transaction,
99    #[strum(serialize = "fx")]
100    #[serde(rename = "fx")]
101    TransactionEffects,
102    #[strum(serialize = "cc")]
103    #[serde(rename = "cc")]
104    CheckpointContents,
105    #[strum(serialize = "cs")]
106    #[serde(rename = "cs")]
107    CheckpointSummary,
108    #[strum(serialize = "tx2c")]
109    #[serde(rename = "tx2c")]
110    TransactionToCheckpoint,
111    #[strum(serialize = "ob")]
112    #[serde(rename = "ob")]
113    Object,
114    #[strum(serialize = "evtx")]
115    #[serde(rename = "evtx")]
116    EventTransactionDigest,
117    #[strum(serialize = "txa")]
118    #[serde(rename = "txa")]
119    TransactionDigestsByAddress,
120}
121
122#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
123pub enum Key {
124    Transaction(TransactionDigest),
125    TransactionEffects(TransactionDigest),
126    CheckpointContents(CheckpointSequenceNumber),
127    CheckpointSummary(CheckpointSequenceNumber),
128    CheckpointSummaryByDigest(CheckpointDigest),
129    TransactionToCheckpoint(TransactionDigest),
130    ObjectKey(ObjectKey),
131    EventsByTransactionDigest(TransactionDigest),
132    TransactionDigestsByAddress(IotaAddress),
133}
134
135impl Key {
136    // Create a [`Key`] instance based on the provided item type and
137    /// [`base64_url`] encoded string.
138    ///
139    /// # Example
140    ///
141    /// ```rust
142    /// use std::str::FromStr;
143    ///
144    /// use iota_storage::http_key_value_store::Key;
145    /// use iota_types::digests::TransactionDigest;
146    ///
147    /// let key = Key::new("tx", "7jb54RvJduLj9HdV9L41UJqZ5KWdzYY2rl1eL8AVl9o").unwrap();
148    /// assert_eq!(
149    ///     key,
150    ///     Key::Transaction(
151    ///         TransactionDigest::from_str("H2tetNL3CfroDF3iJNA7wFo6oRQiJedGTeykZi6HAGqP").unwrap()
152    ///     )
153    /// );
154    /// ```
155    pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
156        let item_type =
157            ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
158        let decoded_key = base64_url::decode(encoded_key)
159            .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
160
161        match item_type {
162            ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::from_bytes(
163                decoded_key.as_slice(),
164            )?)),
165            ItemType::TransactionEffects => Ok(Key::TransactionEffects(
166                TransactionDigest::from_bytes(decoded_key.as_slice())?,
167            )),
168            ItemType::CheckpointContents => {
169                let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
170                    anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
171                })?;
172                match tagged_key {
173                    TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
174                }
175            }
176            ItemType::CheckpointSummary => {
177                // first try to decode as digest, otherwise try to decode as tagged key
178                match CheckpointDigest::from_bytes(decoded_key.clone()) {
179                    Err(_) => {
180                        let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
181                            anyhow::anyhow!(
182                                "failed to deserialize checkpoint sequence number: {err}"
183                            )
184                        })?;
185                        match tagged_key {
186                            TaggedKey::CheckpointSequenceNumber(seq) => {
187                                Ok(Key::CheckpointSummary(seq))
188                            }
189                        }
190                    }
191                    Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
192                }
193            }
194            ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
195                TransactionDigest::from_bytes(decoded_key.as_slice())?,
196            )),
197            ItemType::Object => {
198                let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
199                    .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
200
201                Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
202            }
203            ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
204                TransactionDigest::from_bytes(decoded_key.as_slice())?,
205            )),
206            ItemType::TransactionDigestsByAddress => Ok(Key::TransactionDigestsByAddress(
207                IotaAddress::from_bytes(decoded_key.as_slice())?,
208            )),
209        }
210    }
211
212    /// Get the REST API resource type.
213    ///
214    /// This method returns the corresponding resource type string
215    /// for a given `Key` variant.
216    ///
217    /// This is used to construct the REST API route,
218    /// typically in the format `/{item_type}/{digest}`.
219    ///
220    /// # Example
221    /// ```rust
222    /// use iota_storage::http_key_value_store::{ItemType, Key};
223    /// use iota_types::digests::TransactionDigest;
224    ///
225    /// let item_type = Key::CheckpointContents(1).item_type();
226    /// assert_eq!(item_type, ItemType::CheckpointContents);
227    /// let item_type = Key::Transaction(TransactionDigest::random()).item_type();
228    /// assert_eq!(item_type, ItemType::Transaction);
229    /// ```
230    pub fn item_type(&self) -> ItemType {
231        match self {
232            Key::Transaction(_) => ItemType::Transaction,
233            Key::TransactionEffects(_) => ItemType::TransactionEffects,
234            Key::CheckpointContents(_) => ItemType::CheckpointContents,
235            Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
236                ItemType::CheckpointSummary
237            }
238            Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
239            Key::ObjectKey(_) => ItemType::Object,
240            Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
241            Key::TransactionDigestsByAddress(_) => ItemType::TransactionDigestsByAddress,
242        }
243    }
244
245    /// Returns a tuple containing the resource type and the encoded key.
246    ///
247    /// This is used to construct the REST API route, typically in the format
248    /// `/:item_type/:digest`.
249    ///
250    /// # Examples
251    ///
252    /// ```rust
253    /// use iota_storage::http_key_value_store::{
254    ///     ItemType, Key, TaggedKey, encode_digest, encode_object_key, encoded_tagged_key,
255    /// };
256    /// use iota_types::digests::TransactionDigest;
257    ///
258    /// let tx_digest = TransactionDigest::random();
259    /// // encode the tx_digest as base64 url
260    /// let expected_encoded_digest = encode_digest(&tx_digest);
261    /// let key = Key::Transaction(tx_digest);
262    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
263    /// assert_eq!(resource_type, ItemType::Transaction);
264    /// assert_eq!(encoded_key_digest, expected_encoded_digest);
265    ///
266    /// let chk_seq_num = 123;
267    /// let key = Key::CheckpointSummary(chk_seq_num);
268    /// // encode the checkpoint sequence number as base64 url
269    /// let expected_encoded_seq_num =
270    ///     encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(chk_seq_num));
271    /// let (resource_type, encoded_key_digest) = key.to_path_elements();
272    /// assert_eq!(resource_type, ItemType::CheckpointSummary);
273    /// assert_eq!(encoded_key_digest, expected_encoded_seq_num);
274    /// ```
275    pub fn to_path_elements(&self) -> (ItemType, String) {
276        let encoded_key_digest = match self {
277            Key::Transaction(digest) => encode_digest(digest),
278            Key::TransactionEffects(digest) => encode_digest(digest),
279            Key::CheckpointContents(seq) => {
280                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
281            }
282            Key::CheckpointSummary(seq) => {
283                encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
284            }
285            Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
286            Key::TransactionToCheckpoint(digest) => encode_digest(digest),
287            Key::ObjectKey(object_key) => encode_object_key(object_key),
288            Key::EventsByTransactionDigest(digest) => encode_digest(digest),
289            // TODO: `encode_digest` could be renamed to `encode` to fit more use cases.
290            // tracking issue: https://github.com/iotaledger/iota/issues/11754
291            Key::TransactionDigestsByAddress(address) => encode_digest(address),
292        };
293
294        (self.item_type(), encoded_key_digest)
295    }
296}
297
298#[derive(Clone, Debug)]
299enum Value {
300    Tx(Box<Transaction>),
301    Fx(Box<TransactionEffects>),
302    Events(Box<TransactionEvents>),
303    CheckpointContents(Box<CheckpointContents>),
304    CheckpointSummary(Box<CertifiedCheckpointSummary>),
305    TxToCheckpoint(CheckpointSequenceNumber),
306}
307
308impl HttpKVStore {
309    pub fn new_kv(
310        base_url: &str,
311        cache_size: u64,
312        metrics: Arc<KeyValueStoreMetrics>,
313    ) -> IotaResult<TransactionKeyValueStore> {
314        let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
315        Ok(TransactionKeyValueStore::new("http", metrics, inner))
316    }
317
318    pub fn new(
319        base_url: &str,
320        cache_size: u64,
321        metrics: Arc<KeyValueStoreMetrics>,
322    ) -> IotaResult<Self> {
323        info!("creating HttpKVStore with base_url: {}", base_url);
324
325        let client = Client::builder().http2_prior_knowledge().build().unwrap();
326
327        let base_url = if base_url.ends_with('/') {
328            base_url.to_string()
329        } else {
330            format!("{base_url}/")
331        };
332
333        let base_url = Url::parse(&base_url).into_iota_result()?;
334
335        let cache = MokaCacheBuilder::new(cache_size)
336            .time_to_idle(Duration::from_secs(600))
337            .build();
338
339        Ok(Self {
340            base_url,
341            client,
342            cache,
343            metrics,
344        })
345    }
346
347    fn get_url(&self, key: &Key) -> IotaResult<Url> {
348        let (item_type, digest) = key.to_path_elements();
349        let joined = self
350            .base_url
351            .join(&format!("{item_type}/{digest}"))
352            .into_iota_result()?;
353        Url::from_str(joined.as_str()).into_iota_result()
354    }
355
356    async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
357        let uris_vec = uris.to_vec();
358        let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
359        fetches.buffered(uris.len()).collect::<Vec<_>>().await
360    }
361
362    async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
363        let url = self.get_url(&key)?;
364
365        trace!("fetching url: {}", url);
366
367        if let Some(res) = self.cache.get(&url) {
368            trace!("found cached data for url: {}, len: {:?}", url, res.len());
369            self.metrics
370                .key_value_store_num_fetches_success
371                .with_label_values(&["http_cache", "url"])
372                .inc();
373            return Ok(Some(res));
374        }
375
376        self.metrics
377            .key_value_store_num_fetches_not_found
378            .with_label_values(&["http_cache", "url"])
379            .inc();
380
381        let resp = self
382            .client
383            .get(url.clone())
384            .send()
385            .await
386            .into_iota_result()?;
387        trace!(
388            "got response {} for url: {}, len: {:?}",
389            url,
390            resp.status(),
391            resp.headers()
392                .get(CONTENT_LENGTH)
393                .unwrap_or(&HeaderValue::from_static("0"))
394        );
395        // return None if 400
396        if resp.status().is_success() {
397            let bytes = resp.bytes().await.into_iota_result()?;
398            self.cache.insert(url, bytes.clone());
399
400            Ok(Some(bytes))
401        } else {
402            Ok(None)
403        }
404    }
405}
406
407fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
408where
409    K: std::fmt::Debug,
410    T: for<'de> Deserialize<'de>,
411{
412    bcs::from_bytes(bytes)
413        .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
414        .ok()
415}
416
417fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
418where
419    K: std::fmt::Debug,
420{
421    let (fetch, key) = fetch;
422    match fetch {
423        Ok(Some(bytes)) => Some((bytes, key)),
424        Ok(None) => None,
425        Err(err) => {
426            warn!("Error fetching key: {:?}, error: {:?}", key, err);
427            None
428        }
429    }
430}
431
432fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
433    let mut start = 0;
434    lengths
435        .iter()
436        .map(|length| {
437            let end = start + length;
438            let result = &slice[start..end];
439            start = end;
440            result
441        })
442        .collect()
443}
444
445fn deser_check_digest<T, D>(
446    digest: &D,
447    bytes: &Bytes,
448    get_expected_digest: impl FnOnce(&T) -> D,
449) -> Option<T>
450where
451    D: std::fmt::Debug + PartialEq,
452    T: for<'de> Deserialize<'de>,
453{
454    deser(digest, bytes).and_then(|o: T| {
455        let expected_digest = get_expected_digest(&o);
456        if expected_digest == *digest {
457            Some(o)
458        } else {
459            error!(
460                "Digest mismatch - expected: {:?}, got: {:?}",
461                digest, expected_digest,
462            );
463            None
464        }
465    })
466}
467
468#[async_trait]
469impl TransactionKeyValueStoreTrait for HttpKVStore {
470    #[instrument(level = "trace", skip_all)]
471    async fn multi_get(
472        &self,
473        transaction_keys: &[TransactionDigest],
474        effects_keys: &[TransactionDigest],
475    ) -> IotaResult<KVStoreTransactionData> {
476        let num_txns = transaction_keys.len();
477        let num_effects = effects_keys.len();
478
479        let keys = transaction_keys
480            .iter()
481            .map(|tx| Key::Transaction(*tx))
482            .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
483            .collect::<Vec<_>>();
484
485        let fetches = self.multi_fetch(keys).await;
486        let txn_slice = fetches[..num_txns].to_vec();
487        let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
488
489        let txn_results = txn_slice
490            .iter()
491            .take(num_txns)
492            .zip(transaction_keys.iter())
493            .map(map_fetch)
494            .map(|maybe_bytes| {
495                maybe_bytes.and_then(|(bytes, digest)| {
496                    deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
497                })
498            })
499            .collect::<Vec<_>>();
500
501        let fx_results = fx_slice
502            .iter()
503            .take(num_effects)
504            .zip(effects_keys.iter())
505            .map(map_fetch)
506            .map(|maybe_bytes| {
507                maybe_bytes.and_then(|(bytes, digest)| {
508                    deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
509                        *fx.transaction_digest()
510                    })
511                })
512            })
513            .collect::<Vec<_>>();
514
515        Ok((txn_results, fx_results))
516    }
517
518    #[instrument(level = "trace", skip_all)]
519    async fn multi_get_checkpoints(
520        &self,
521        checkpoint_summaries: &[CheckpointSequenceNumber],
522        checkpoint_contents: &[CheckpointSequenceNumber],
523        checkpoint_summaries_by_digest: &[CheckpointDigest],
524    ) -> IotaResult<(
525        Vec<Option<CertifiedCheckpointSummary>>,
526        Vec<Option<CheckpointContents>>,
527        Vec<Option<CertifiedCheckpointSummary>>,
528    )> {
529        let keys = checkpoint_summaries
530            .iter()
531            .map(|cp| Key::CheckpointSummary(*cp))
532            .chain(
533                checkpoint_contents
534                    .iter()
535                    .map(|cp| Key::CheckpointContents(*cp)),
536            )
537            .chain(
538                checkpoint_summaries_by_digest
539                    .iter()
540                    .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
541            )
542            .collect::<Vec<_>>();
543
544        let summaries_len = checkpoint_summaries.len();
545        let contents_len = checkpoint_contents.len();
546        let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
547
548        let fetches = self.multi_fetch(keys).await;
549
550        let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
551
552        let result_slices = multi_split_slice(&fetches, &input_slices);
553
554        let summaries_results = result_slices[0]
555            .iter()
556            .zip(checkpoint_summaries.iter())
557            .map(map_fetch)
558            .map(|maybe_bytes| {
559                maybe_bytes
560                    .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
561            })
562            .collect::<Vec<_>>();
563
564        let contents_results = result_slices[1]
565            .iter()
566            .zip(checkpoint_contents.iter())
567            .map(map_fetch)
568            .map(|maybe_bytes| {
569                maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
570            })
571            .collect::<Vec<_>>();
572
573        let summaries_by_digest_results = result_slices[2]
574            .iter()
575            .zip(checkpoint_summaries_by_digest.iter())
576            .map(map_fetch)
577            .map(|maybe_bytes| {
578                maybe_bytes.and_then(|(bytes, digest)| {
579                    deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
580                })
581            })
582            .collect::<Vec<_>>();
583
584        Ok((
585            summaries_results,
586            contents_results,
587            summaries_by_digest_results,
588        ))
589    }
590
591    #[instrument(level = "trace", skip_all)]
592    async fn get_transaction_perpetual_checkpoint(
593        &self,
594        digest: TransactionDigest,
595    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
596        let key = Key::TransactionToCheckpoint(digest);
597        self.fetch(key).await.map(|maybe| {
598            maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
599        })
600    }
601
602    #[instrument(level = "trace", skip_all)]
603    async fn get_object(
604        &self,
605        object_id: ObjectId,
606        version: SequenceNumber,
607    ) -> IotaResult<Option<Object>> {
608        let key = Key::ObjectKey(ObjectKey(object_id, version));
609        self.fetch(key)
610            .await
611            .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
612    }
613
614    #[instrument(level = "trace", skip_all)]
615    async fn multi_get_objects(
616        &self,
617        object_keys: &[ObjectKey],
618    ) -> IotaResult<Vec<Option<Object>>> {
619        let keys = object_keys
620            .iter()
621            .map(|key| Key::ObjectKey(*key))
622            .collect::<Vec<_>>();
623
624        let fetches = self.multi_fetch(keys).await;
625
626        let results = fetches
627            .iter()
628            .zip(object_keys.iter())
629            .map(map_fetch)
630            .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
631            .collect::<Vec<_>>();
632
633        Ok(results)
634    }
635
636    #[instrument(level = "trace", skip_all)]
637    async fn multi_get_transactions_perpetual_checkpoints(
638        &self,
639        digests: &[TransactionDigest],
640    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
641        let keys = digests
642            .iter()
643            .map(|digest| Key::TransactionToCheckpoint(*digest))
644            .collect::<Vec<_>>();
645
646        let fetches = self.multi_fetch(keys).await;
647
648        let results = fetches
649            .iter()
650            .zip(digests.iter())
651            .map(map_fetch)
652            .map(|maybe_bytes| {
653                maybe_bytes
654                    .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
655            })
656            .collect::<Vec<_>>();
657
658        Ok(results)
659    }
660
661    #[instrument(level = "trace", skip_all)]
662    async fn multi_get_events_by_tx_digests(
663        &self,
664        digests: &[TransactionDigest],
665    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
666        let keys = digests
667            .iter()
668            .map(|digest| Key::EventsByTransactionDigest(*digest))
669            .collect::<Vec<_>>();
670        Ok(self
671            .multi_fetch(keys)
672            .await
673            .iter()
674            .zip(digests.iter())
675            .map(map_fetch)
676            .map(|maybe_bytes| {
677                maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
678            })
679            .collect::<Vec<_>>())
680    }
681}