1use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_sdk_types::ObjectId;
12use iota_types::{
13 base_types::{IotaAddress, SequenceNumber},
14 digests::{CheckpointDigest, TransactionDigest},
15 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
16 error::{IotaError, IotaResult},
17 messages_checkpoint::{
18 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
19 },
20 object::Object,
21 storage::ObjectKey,
22 transaction::Transaction,
23};
24use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
25use reqwest::{
26 Client, Url,
27 header::{CONTENT_LENGTH, HeaderValue},
28};
29use serde::{Deserialize, Serialize};
30use tap::TapFallible;
31use tracing::{error, info, instrument, trace, warn};
32
33use crate::{
34 key_value_store::{
35 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
36 },
37 key_value_store_metrics::KeyValueStoreMetrics,
38};
39
40pub struct HttpKVStore {
41 base_url: Url,
42 client: Client,
43 cache: MokaCache<Url, Bytes>,
44 metrics: Arc<KeyValueStoreMetrics>,
45}
46
47pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
48 base64_url::encode(digest)
49}
50
51#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
53pub enum TaggedKey {
54 CheckpointSequenceNumber(CheckpointSequenceNumber),
55}
56
57pub fn encoded_tagged_key(key: &TaggedKey) -> String {
58 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
59 base64_url::encode(&bytes)
60}
61
62pub fn encode_object_key(object_key: &ObjectKey) -> String {
63 let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
64 base64_url::encode(&bytes)
65}
66
67trait IntoIotaResult<T> {
68 fn into_iota_result(self) -> IotaResult<T>;
69}
70
71impl<T, E> IntoIotaResult<T> for Result<T, E>
72where
73 E: std::error::Error,
74{
75 fn into_iota_result(self) -> IotaResult<T> {
76 self.map_err(|e| IotaError::Storage(e.to_string()))
77 }
78}
79
80#[derive(
83 Clone,
84 Copy,
85 Debug,
86 PartialEq,
87 Eq,
88 PartialOrd,
89 Ord,
90 Hash,
91 Deserialize,
92 strum::EnumString,
93 strum::Display,
94)]
95pub enum ItemType {
96 #[strum(serialize = "tx")]
97 #[serde(rename = "tx")]
98 Transaction,
99 #[strum(serialize = "fx")]
100 #[serde(rename = "fx")]
101 TransactionEffects,
102 #[strum(serialize = "cc")]
103 #[serde(rename = "cc")]
104 CheckpointContents,
105 #[strum(serialize = "cs")]
106 #[serde(rename = "cs")]
107 CheckpointSummary,
108 #[strum(serialize = "tx2c")]
109 #[serde(rename = "tx2c")]
110 TransactionToCheckpoint,
111 #[strum(serialize = "ob")]
112 #[serde(rename = "ob")]
113 Object,
114 #[strum(serialize = "evtx")]
115 #[serde(rename = "evtx")]
116 EventTransactionDigest,
117 #[strum(serialize = "txa")]
118 #[serde(rename = "txa")]
119 TransactionDigestsByAddress,
120}
121
122#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
123pub enum Key {
124 Transaction(TransactionDigest),
125 TransactionEffects(TransactionDigest),
126 CheckpointContents(CheckpointSequenceNumber),
127 CheckpointSummary(CheckpointSequenceNumber),
128 CheckpointSummaryByDigest(CheckpointDigest),
129 TransactionToCheckpoint(TransactionDigest),
130 ObjectKey(ObjectKey),
131 EventsByTransactionDigest(TransactionDigest),
132 TransactionDigestsByAddress(IotaAddress),
133}
134
135impl Key {
136 pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
156 let item_type =
157 ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
158 let decoded_key = base64_url::decode(encoded_key)
159 .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
160
161 match item_type {
162 ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::from_bytes(
163 decoded_key.as_slice(),
164 )?)),
165 ItemType::TransactionEffects => Ok(Key::TransactionEffects(
166 TransactionDigest::from_bytes(decoded_key.as_slice())?,
167 )),
168 ItemType::CheckpointContents => {
169 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
170 anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
171 })?;
172 match tagged_key {
173 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
174 }
175 }
176 ItemType::CheckpointSummary => {
177 match CheckpointDigest::from_bytes(decoded_key.clone()) {
179 Err(_) => {
180 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
181 anyhow::anyhow!(
182 "failed to deserialize checkpoint sequence number: {err}"
183 )
184 })?;
185 match tagged_key {
186 TaggedKey::CheckpointSequenceNumber(seq) => {
187 Ok(Key::CheckpointSummary(seq))
188 }
189 }
190 }
191 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
192 }
193 }
194 ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
195 TransactionDigest::from_bytes(decoded_key.as_slice())?,
196 )),
197 ItemType::Object => {
198 let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
199 .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
200
201 Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
202 }
203 ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
204 TransactionDigest::from_bytes(decoded_key.as_slice())?,
205 )),
206 ItemType::TransactionDigestsByAddress => Ok(Key::TransactionDigestsByAddress(
207 IotaAddress::from_bytes(decoded_key.as_slice())?,
208 )),
209 }
210 }
211
212 pub fn item_type(&self) -> ItemType {
231 match self {
232 Key::Transaction(_) => ItemType::Transaction,
233 Key::TransactionEffects(_) => ItemType::TransactionEffects,
234 Key::CheckpointContents(_) => ItemType::CheckpointContents,
235 Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
236 ItemType::CheckpointSummary
237 }
238 Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
239 Key::ObjectKey(_) => ItemType::Object,
240 Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
241 Key::TransactionDigestsByAddress(_) => ItemType::TransactionDigestsByAddress,
242 }
243 }
244
245 pub fn to_path_elements(&self) -> (ItemType, String) {
276 let encoded_key_digest = match self {
277 Key::Transaction(digest) => encode_digest(digest),
278 Key::TransactionEffects(digest) => encode_digest(digest),
279 Key::CheckpointContents(seq) => {
280 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
281 }
282 Key::CheckpointSummary(seq) => {
283 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
284 }
285 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
286 Key::TransactionToCheckpoint(digest) => encode_digest(digest),
287 Key::ObjectKey(object_key) => encode_object_key(object_key),
288 Key::EventsByTransactionDigest(digest) => encode_digest(digest),
289 Key::TransactionDigestsByAddress(address) => encode_digest(address),
292 };
293
294 (self.item_type(), encoded_key_digest)
295 }
296}
297
298#[derive(Clone, Debug)]
299enum Value {
300 Tx(Box<Transaction>),
301 Fx(Box<TransactionEffects>),
302 Events(Box<TransactionEvents>),
303 CheckpointContents(Box<CheckpointContents>),
304 CheckpointSummary(Box<CertifiedCheckpointSummary>),
305 TxToCheckpoint(CheckpointSequenceNumber),
306}
307
308impl HttpKVStore {
309 pub fn new_kv(
310 base_url: &str,
311 cache_size: u64,
312 metrics: Arc<KeyValueStoreMetrics>,
313 ) -> IotaResult<TransactionKeyValueStore> {
314 let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
315 Ok(TransactionKeyValueStore::new("http", metrics, inner))
316 }
317
318 pub fn new(
319 base_url: &str,
320 cache_size: u64,
321 metrics: Arc<KeyValueStoreMetrics>,
322 ) -> IotaResult<Self> {
323 info!("creating HttpKVStore with base_url: {}", base_url);
324
325 let client = Client::builder().http2_prior_knowledge().build().unwrap();
326
327 let base_url = if base_url.ends_with('/') {
328 base_url.to_string()
329 } else {
330 format!("{base_url}/")
331 };
332
333 let base_url = Url::parse(&base_url).into_iota_result()?;
334
335 let cache = MokaCacheBuilder::new(cache_size)
336 .time_to_idle(Duration::from_secs(600))
337 .build();
338
339 Ok(Self {
340 base_url,
341 client,
342 cache,
343 metrics,
344 })
345 }
346
347 fn get_url(&self, key: &Key) -> IotaResult<Url> {
348 let (item_type, digest) = key.to_path_elements();
349 let joined = self
350 .base_url
351 .join(&format!("{item_type}/{digest}"))
352 .into_iota_result()?;
353 Url::from_str(joined.as_str()).into_iota_result()
354 }
355
356 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
357 let uris_vec = uris.to_vec();
358 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
359 fetches.buffered(uris.len()).collect::<Vec<_>>().await
360 }
361
362 async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
363 let url = self.get_url(&key)?;
364
365 trace!("fetching url: {}", url);
366
367 if let Some(res) = self.cache.get(&url) {
368 trace!("found cached data for url: {}, len: {:?}", url, res.len());
369 self.metrics
370 .key_value_store_num_fetches_success
371 .with_label_values(&["http_cache", "url"])
372 .inc();
373 return Ok(Some(res));
374 }
375
376 self.metrics
377 .key_value_store_num_fetches_not_found
378 .with_label_values(&["http_cache", "url"])
379 .inc();
380
381 let resp = self
382 .client
383 .get(url.clone())
384 .send()
385 .await
386 .into_iota_result()?;
387 trace!(
388 "got response {} for url: {}, len: {:?}",
389 url,
390 resp.status(),
391 resp.headers()
392 .get(CONTENT_LENGTH)
393 .unwrap_or(&HeaderValue::from_static("0"))
394 );
395 if resp.status().is_success() {
397 let bytes = resp.bytes().await.into_iota_result()?;
398 self.cache.insert(url, bytes.clone());
399
400 Ok(Some(bytes))
401 } else {
402 Ok(None)
403 }
404 }
405}
406
407fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
408where
409 K: std::fmt::Debug,
410 T: for<'de> Deserialize<'de>,
411{
412 bcs::from_bytes(bytes)
413 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
414 .ok()
415}
416
417fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
418where
419 K: std::fmt::Debug,
420{
421 let (fetch, key) = fetch;
422 match fetch {
423 Ok(Some(bytes)) => Some((bytes, key)),
424 Ok(None) => None,
425 Err(err) => {
426 warn!("Error fetching key: {:?}, error: {:?}", key, err);
427 None
428 }
429 }
430}
431
432fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
433 let mut start = 0;
434 lengths
435 .iter()
436 .map(|length| {
437 let end = start + length;
438 let result = &slice[start..end];
439 start = end;
440 result
441 })
442 .collect()
443}
444
445fn deser_check_digest<T, D>(
446 digest: &D,
447 bytes: &Bytes,
448 get_expected_digest: impl FnOnce(&T) -> D,
449) -> Option<T>
450where
451 D: std::fmt::Debug + PartialEq,
452 T: for<'de> Deserialize<'de>,
453{
454 deser(digest, bytes).and_then(|o: T| {
455 let expected_digest = get_expected_digest(&o);
456 if expected_digest == *digest {
457 Some(o)
458 } else {
459 error!(
460 "Digest mismatch - expected: {:?}, got: {:?}",
461 digest, expected_digest,
462 );
463 None
464 }
465 })
466}
467
468#[async_trait]
469impl TransactionKeyValueStoreTrait for HttpKVStore {
470 #[instrument(level = "trace", skip_all)]
471 async fn multi_get(
472 &self,
473 transaction_keys: &[TransactionDigest],
474 effects_keys: &[TransactionDigest],
475 ) -> IotaResult<KVStoreTransactionData> {
476 let num_txns = transaction_keys.len();
477 let num_effects = effects_keys.len();
478
479 let keys = transaction_keys
480 .iter()
481 .map(|tx| Key::Transaction(*tx))
482 .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
483 .collect::<Vec<_>>();
484
485 let fetches = self.multi_fetch(keys).await;
486 let txn_slice = fetches[..num_txns].to_vec();
487 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
488
489 let txn_results = txn_slice
490 .iter()
491 .take(num_txns)
492 .zip(transaction_keys.iter())
493 .map(map_fetch)
494 .map(|maybe_bytes| {
495 maybe_bytes.and_then(|(bytes, digest)| {
496 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
497 })
498 })
499 .collect::<Vec<_>>();
500
501 let fx_results = fx_slice
502 .iter()
503 .take(num_effects)
504 .zip(effects_keys.iter())
505 .map(map_fetch)
506 .map(|maybe_bytes| {
507 maybe_bytes.and_then(|(bytes, digest)| {
508 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
509 *fx.transaction_digest()
510 })
511 })
512 })
513 .collect::<Vec<_>>();
514
515 Ok((txn_results, fx_results))
516 }
517
518 #[instrument(level = "trace", skip_all)]
519 async fn multi_get_checkpoints(
520 &self,
521 checkpoint_summaries: &[CheckpointSequenceNumber],
522 checkpoint_contents: &[CheckpointSequenceNumber],
523 checkpoint_summaries_by_digest: &[CheckpointDigest],
524 ) -> IotaResult<(
525 Vec<Option<CertifiedCheckpointSummary>>,
526 Vec<Option<CheckpointContents>>,
527 Vec<Option<CertifiedCheckpointSummary>>,
528 )> {
529 let keys = checkpoint_summaries
530 .iter()
531 .map(|cp| Key::CheckpointSummary(*cp))
532 .chain(
533 checkpoint_contents
534 .iter()
535 .map(|cp| Key::CheckpointContents(*cp)),
536 )
537 .chain(
538 checkpoint_summaries_by_digest
539 .iter()
540 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
541 )
542 .collect::<Vec<_>>();
543
544 let summaries_len = checkpoint_summaries.len();
545 let contents_len = checkpoint_contents.len();
546 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
547
548 let fetches = self.multi_fetch(keys).await;
549
550 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
551
552 let result_slices = multi_split_slice(&fetches, &input_slices);
553
554 let summaries_results = result_slices[0]
555 .iter()
556 .zip(checkpoint_summaries.iter())
557 .map(map_fetch)
558 .map(|maybe_bytes| {
559 maybe_bytes
560 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
561 })
562 .collect::<Vec<_>>();
563
564 let contents_results = result_slices[1]
565 .iter()
566 .zip(checkpoint_contents.iter())
567 .map(map_fetch)
568 .map(|maybe_bytes| {
569 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
570 })
571 .collect::<Vec<_>>();
572
573 let summaries_by_digest_results = result_slices[2]
574 .iter()
575 .zip(checkpoint_summaries_by_digest.iter())
576 .map(map_fetch)
577 .map(|maybe_bytes| {
578 maybe_bytes.and_then(|(bytes, digest)| {
579 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
580 })
581 })
582 .collect::<Vec<_>>();
583
584 Ok((
585 summaries_results,
586 contents_results,
587 summaries_by_digest_results,
588 ))
589 }
590
591 #[instrument(level = "trace", skip_all)]
592 async fn get_transaction_perpetual_checkpoint(
593 &self,
594 digest: TransactionDigest,
595 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
596 let key = Key::TransactionToCheckpoint(digest);
597 self.fetch(key).await.map(|maybe| {
598 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
599 })
600 }
601
602 #[instrument(level = "trace", skip_all)]
603 async fn get_object(
604 &self,
605 object_id: ObjectId,
606 version: SequenceNumber,
607 ) -> IotaResult<Option<Object>> {
608 let key = Key::ObjectKey(ObjectKey(object_id, version));
609 self.fetch(key)
610 .await
611 .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
612 }
613
614 #[instrument(level = "trace", skip_all)]
615 async fn multi_get_objects(
616 &self,
617 object_keys: &[ObjectKey],
618 ) -> IotaResult<Vec<Option<Object>>> {
619 let keys = object_keys
620 .iter()
621 .map(|key| Key::ObjectKey(*key))
622 .collect::<Vec<_>>();
623
624 let fetches = self.multi_fetch(keys).await;
625
626 let results = fetches
627 .iter()
628 .zip(object_keys.iter())
629 .map(map_fetch)
630 .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
631 .collect::<Vec<_>>();
632
633 Ok(results)
634 }
635
636 #[instrument(level = "trace", skip_all)]
637 async fn multi_get_transactions_perpetual_checkpoints(
638 &self,
639 digests: &[TransactionDigest],
640 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
641 let keys = digests
642 .iter()
643 .map(|digest| Key::TransactionToCheckpoint(*digest))
644 .collect::<Vec<_>>();
645
646 let fetches = self.multi_fetch(keys).await;
647
648 let results = fetches
649 .iter()
650 .zip(digests.iter())
651 .map(map_fetch)
652 .map(|maybe_bytes| {
653 maybe_bytes
654 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
655 })
656 .collect::<Vec<_>>();
657
658 Ok(results)
659 }
660
661 #[instrument(level = "trace", skip_all)]
662 async fn multi_get_events_by_tx_digests(
663 &self,
664 digests: &[TransactionDigest],
665 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
666 let keys = digests
667 .iter()
668 .map(|digest| Key::EventsByTransactionDigest(*digest))
669 .collect::<Vec<_>>();
670 Ok(self
671 .multi_fetch(keys)
672 .await
673 .iter()
674 .zip(digests.iter())
675 .map(map_fetch)
676 .map(|maybe_bytes| {
677 maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
678 })
679 .collect::<Vec<_>>())
680 }
681}