1use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12 base_types::{ObjectID, SequenceNumber},
13 digests::{CheckpointDigest, TransactionDigest},
14 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15 error::{IotaError, IotaResult},
16 messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18 },
19 object::Object,
20 storage::ObjectKey,
21 transaction::Transaction,
22};
23use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
24use reqwest::{
25 Client, Url,
26 header::{CONTENT_LENGTH, HeaderValue},
27};
28use serde::{Deserialize, Serialize};
29use tap::TapFallible;
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33 key_value_store::{
34 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
35 },
36 key_value_store_metrics::KeyValueStoreMetrics,
37};
38
39pub struct HttpKVStore {
40 base_url: Url,
41 client: Client,
42 cache: MokaCache<Url, Bytes>,
43 metrics: Arc<KeyValueStoreMetrics>,
44}
45
46pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
47 base64_url::encode(digest)
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52pub enum TaggedKey {
53 CheckpointSequenceNumber(CheckpointSequenceNumber),
54}
55
56pub fn encoded_tagged_key(key: &TaggedKey) -> String {
57 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
58 base64_url::encode(&bytes)
59}
60
61pub fn encode_object_key(object_key: &ObjectKey) -> String {
62 let bytes = bcs::to_bytes(object_key).expect("failed to serialize object key");
63 base64_url::encode(&bytes)
64}
65
66trait IntoIotaResult<T> {
67 fn into_iota_result(self) -> IotaResult<T>;
68}
69
70impl<T, E> IntoIotaResult<T> for Result<T, E>
71where
72 E: std::error::Error,
73{
74 fn into_iota_result(self) -> IotaResult<T> {
75 self.map_err(|e| IotaError::Storage(e.to_string()))
76 }
77}
78
79#[derive(
82 Clone,
83 Copy,
84 Debug,
85 PartialEq,
86 Eq,
87 PartialOrd,
88 Ord,
89 Hash,
90 Deserialize,
91 strum::EnumString,
92 strum::Display,
93)]
94pub enum ItemType {
95 #[strum(serialize = "tx")]
96 #[serde(rename = "tx")]
97 Transaction,
98 #[strum(serialize = "fx")]
99 #[serde(rename = "fx")]
100 TransactionEffects,
101 #[strum(serialize = "cc")]
102 #[serde(rename = "cc")]
103 CheckpointContents,
104 #[strum(serialize = "cs")]
105 #[serde(rename = "cs")]
106 CheckpointSummary,
107 #[strum(serialize = "tx2c")]
108 #[serde(rename = "tx2c")]
109 TransactionToCheckpoint,
110 #[strum(serialize = "ob")]
111 #[serde(rename = "ob")]
112 Object,
113 #[strum(serialize = "evtx")]
114 #[serde(rename = "evtx")]
115 EventTransactionDigest,
116}
117
118#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
119pub enum Key {
120 Transaction(TransactionDigest),
121 TransactionEffects(TransactionDigest),
122 CheckpointContents(CheckpointSequenceNumber),
123 CheckpointSummary(CheckpointSequenceNumber),
124 CheckpointSummaryByDigest(CheckpointDigest),
125 TransactionToCheckpoint(TransactionDigest),
126 ObjectKey(ObjectKey),
127 EventsByTransactionDigest(TransactionDigest),
128}
129
130impl Key {
131 pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
151 let item_type =
152 ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
153 let decoded_key = base64_url::decode(encoded_key)
154 .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
155
156 match item_type {
157 ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
158 decoded_key.as_slice(),
159 )?)),
160 ItemType::TransactionEffects => Ok(Key::TransactionEffects(
161 TransactionDigest::try_from(decoded_key.as_slice())?,
162 )),
163 ItemType::CheckpointContents => {
164 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
165 anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
166 })?;
167 match tagged_key {
168 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
169 }
170 }
171 ItemType::CheckpointSummary => {
172 match CheckpointDigest::try_from(decoded_key.clone()) {
174 Err(_) => {
175 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
176 anyhow::anyhow!(
177 "failed to deserialize checkpoint sequence number: {err}"
178 )
179 })?;
180 match tagged_key {
181 TaggedKey::CheckpointSequenceNumber(seq) => {
182 Ok(Key::CheckpointSummary(seq))
183 }
184 }
185 }
186 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
187 }
188 }
189 ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
190 TransactionDigest::try_from(decoded_key.as_slice())?,
191 )),
192 ItemType::Object => {
193 let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
194 .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
195
196 Ok(Key::ObjectKey(ObjectKey(object_key.0, object_key.1)))
197 }
198 ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
199 TransactionDigest::try_from(decoded_key.as_slice())?,
200 )),
201 }
202 }
203
204 pub fn item_type(&self) -> ItemType {
223 match self {
224 Key::Transaction(_) => ItemType::Transaction,
225 Key::TransactionEffects(_) => ItemType::TransactionEffects,
226 Key::CheckpointContents(_) => ItemType::CheckpointContents,
227 Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
228 ItemType::CheckpointSummary
229 }
230 Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
231 Key::ObjectKey(_) => ItemType::Object,
232 Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
233 }
234 }
235
236 pub fn to_path_elements(&self) -> (ItemType, String) {
267 let encoded_key_digest = match self {
268 Key::Transaction(digest) => encode_digest(digest),
269 Key::TransactionEffects(digest) => encode_digest(digest),
270 Key::CheckpointContents(seq) => {
271 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
272 }
273 Key::CheckpointSummary(seq) => {
274 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
275 }
276 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
277 Key::TransactionToCheckpoint(digest) => encode_digest(digest),
278 Key::ObjectKey(object_key) => encode_object_key(object_key),
279 Key::EventsByTransactionDigest(digest) => encode_digest(digest),
280 };
281
282 (self.item_type(), encoded_key_digest)
283 }
284}
285
286#[derive(Clone, Debug)]
287enum Value {
288 Tx(Box<Transaction>),
289 Fx(Box<TransactionEffects>),
290 Events(Box<TransactionEvents>),
291 CheckpointContents(Box<CheckpointContents>),
292 CheckpointSummary(Box<CertifiedCheckpointSummary>),
293 TxToCheckpoint(CheckpointSequenceNumber),
294}
295
296impl HttpKVStore {
297 pub fn new_kv(
298 base_url: &str,
299 cache_size: u64,
300 metrics: Arc<KeyValueStoreMetrics>,
301 ) -> IotaResult<TransactionKeyValueStore> {
302 let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
303 Ok(TransactionKeyValueStore::new("http", metrics, inner))
304 }
305
306 pub fn new(
307 base_url: &str,
308 cache_size: u64,
309 metrics: Arc<KeyValueStoreMetrics>,
310 ) -> IotaResult<Self> {
311 info!("creating HttpKVStore with base_url: {}", base_url);
312
313 let client = Client::builder().http2_prior_knowledge().build().unwrap();
314
315 let base_url = if base_url.ends_with('/') {
316 base_url.to_string()
317 } else {
318 format!("{base_url}/")
319 };
320
321 let base_url = Url::parse(&base_url).into_iota_result()?;
322
323 let cache = MokaCacheBuilder::new(cache_size)
324 .time_to_idle(Duration::from_secs(600))
325 .build();
326
327 Ok(Self {
328 base_url,
329 client,
330 cache,
331 metrics,
332 })
333 }
334
335 fn get_url(&self, key: &Key) -> IotaResult<Url> {
336 let (item_type, digest) = key.to_path_elements();
337 let joined = self
338 .base_url
339 .join(&format!("{item_type}/{digest}"))
340 .into_iota_result()?;
341 Url::from_str(joined.as_str()).into_iota_result()
342 }
343
344 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
345 let uris_vec = uris.to_vec();
346 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
347 fetches.buffered(uris.len()).collect::<Vec<_>>().await
348 }
349
350 async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
351 let url = self.get_url(&key)?;
352
353 trace!("fetching url: {}", url);
354
355 if let Some(res) = self.cache.get(&url) {
356 trace!("found cached data for url: {}, len: {:?}", url, res.len());
357 self.metrics
358 .key_value_store_num_fetches_success
359 .with_label_values(&["http_cache", "url"])
360 .inc();
361 return Ok(Some(res));
362 }
363
364 self.metrics
365 .key_value_store_num_fetches_not_found
366 .with_label_values(&["http_cache", "url"])
367 .inc();
368
369 let resp = self
370 .client
371 .get(url.clone())
372 .send()
373 .await
374 .into_iota_result()?;
375 trace!(
376 "got response {} for url: {}, len: {:?}",
377 url,
378 resp.status(),
379 resp.headers()
380 .get(CONTENT_LENGTH)
381 .unwrap_or(&HeaderValue::from_static("0"))
382 );
383 if resp.status().is_success() {
385 let bytes = resp.bytes().await.into_iota_result()?;
386 self.cache.insert(url, bytes.clone());
387
388 Ok(Some(bytes))
389 } else {
390 Ok(None)
391 }
392 }
393}
394
395fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
396where
397 K: std::fmt::Debug,
398 T: for<'de> Deserialize<'de>,
399{
400 bcs::from_bytes(bytes)
401 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
402 .ok()
403}
404
405fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
406where
407 K: std::fmt::Debug,
408{
409 let (fetch, key) = fetch;
410 match fetch {
411 Ok(Some(bytes)) => Some((bytes, key)),
412 Ok(None) => None,
413 Err(err) => {
414 warn!("Error fetching key: {:?}, error: {:?}", key, err);
415 None
416 }
417 }
418}
419
420fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
421 let mut start = 0;
422 lengths
423 .iter()
424 .map(|length| {
425 let end = start + length;
426 let result = &slice[start..end];
427 start = end;
428 result
429 })
430 .collect()
431}
432
433fn deser_check_digest<T, D>(
434 digest: &D,
435 bytes: &Bytes,
436 get_expected_digest: impl FnOnce(&T) -> D,
437) -> Option<T>
438where
439 D: std::fmt::Debug + PartialEq,
440 T: for<'de> Deserialize<'de>,
441{
442 deser(digest, bytes).and_then(|o: T| {
443 let expected_digest = get_expected_digest(&o);
444 if expected_digest == *digest {
445 Some(o)
446 } else {
447 error!(
448 "Digest mismatch - expected: {:?}, got: {:?}",
449 digest, expected_digest,
450 );
451 None
452 }
453 })
454}
455
456#[async_trait]
457impl TransactionKeyValueStoreTrait for HttpKVStore {
458 #[instrument(level = "trace", skip_all)]
459 async fn multi_get(
460 &self,
461 transaction_keys: &[TransactionDigest],
462 effects_keys: &[TransactionDigest],
463 ) -> IotaResult<KVStoreTransactionData> {
464 let num_txns = transaction_keys.len();
465 let num_effects = effects_keys.len();
466
467 let keys = transaction_keys
468 .iter()
469 .map(|tx| Key::Transaction(*tx))
470 .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
471 .collect::<Vec<_>>();
472
473 let fetches = self.multi_fetch(keys).await;
474 let txn_slice = fetches[..num_txns].to_vec();
475 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
476
477 let txn_results = txn_slice
478 .iter()
479 .take(num_txns)
480 .zip(transaction_keys.iter())
481 .map(map_fetch)
482 .map(|maybe_bytes| {
483 maybe_bytes.and_then(|(bytes, digest)| {
484 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
485 })
486 })
487 .collect::<Vec<_>>();
488
489 let fx_results = fx_slice
490 .iter()
491 .take(num_effects)
492 .zip(effects_keys.iter())
493 .map(map_fetch)
494 .map(|maybe_bytes| {
495 maybe_bytes.and_then(|(bytes, digest)| {
496 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
497 *fx.transaction_digest()
498 })
499 })
500 })
501 .collect::<Vec<_>>();
502
503 Ok((txn_results, fx_results))
504 }
505
506 #[instrument(level = "trace", skip_all)]
507 async fn multi_get_checkpoints(
508 &self,
509 checkpoint_summaries: &[CheckpointSequenceNumber],
510 checkpoint_contents: &[CheckpointSequenceNumber],
511 checkpoint_summaries_by_digest: &[CheckpointDigest],
512 ) -> IotaResult<(
513 Vec<Option<CertifiedCheckpointSummary>>,
514 Vec<Option<CheckpointContents>>,
515 Vec<Option<CertifiedCheckpointSummary>>,
516 )> {
517 let keys = checkpoint_summaries
518 .iter()
519 .map(|cp| Key::CheckpointSummary(*cp))
520 .chain(
521 checkpoint_contents
522 .iter()
523 .map(|cp| Key::CheckpointContents(*cp)),
524 )
525 .chain(
526 checkpoint_summaries_by_digest
527 .iter()
528 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
529 )
530 .collect::<Vec<_>>();
531
532 let summaries_len = checkpoint_summaries.len();
533 let contents_len = checkpoint_contents.len();
534 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
535
536 let fetches = self.multi_fetch(keys).await;
537
538 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
539
540 let result_slices = multi_split_slice(&fetches, &input_slices);
541
542 let summaries_results = result_slices[0]
543 .iter()
544 .zip(checkpoint_summaries.iter())
545 .map(map_fetch)
546 .map(|maybe_bytes| {
547 maybe_bytes
548 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
549 })
550 .collect::<Vec<_>>();
551
552 let contents_results = result_slices[1]
553 .iter()
554 .zip(checkpoint_contents.iter())
555 .map(map_fetch)
556 .map(|maybe_bytes| {
557 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
558 })
559 .collect::<Vec<_>>();
560
561 let summaries_by_digest_results = result_slices[2]
562 .iter()
563 .zip(checkpoint_summaries_by_digest.iter())
564 .map(map_fetch)
565 .map(|maybe_bytes| {
566 maybe_bytes.and_then(|(bytes, digest)| {
567 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
568 })
569 })
570 .collect::<Vec<_>>();
571
572 Ok((
573 summaries_results,
574 contents_results,
575 summaries_by_digest_results,
576 ))
577 }
578
579 #[instrument(level = "trace", skip_all)]
580 async fn get_transaction_perpetual_checkpoint(
581 &self,
582 digest: TransactionDigest,
583 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
584 let key = Key::TransactionToCheckpoint(digest);
585 self.fetch(key).await.map(|maybe| {
586 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
587 })
588 }
589
590 #[instrument(level = "trace", skip_all)]
591 async fn get_object(
592 &self,
593 object_id: ObjectID,
594 version: SequenceNumber,
595 ) -> IotaResult<Option<Object>> {
596 let key = Key::ObjectKey(ObjectKey(object_id, version));
597 self.fetch(key)
598 .await
599 .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
600 }
601
602 #[instrument(level = "trace", skip_all)]
603 async fn multi_get_objects(
604 &self,
605 object_keys: &[ObjectKey],
606 ) -> IotaResult<Vec<Option<Object>>> {
607 let keys = object_keys
608 .iter()
609 .map(|key| Key::ObjectKey(*key))
610 .collect::<Vec<_>>();
611
612 let fetches = self.multi_fetch(keys).await;
613
614 let results = fetches
615 .iter()
616 .zip(object_keys.iter())
617 .map(map_fetch)
618 .map(|maybe_bytes| maybe_bytes.and_then(|(bytes, key)| deser::<_, Object>(&key, bytes)))
619 .collect::<Vec<_>>();
620
621 Ok(results)
622 }
623
624 #[instrument(level = "trace", skip_all)]
625 async fn multi_get_transactions_perpetual_checkpoints(
626 &self,
627 digests: &[TransactionDigest],
628 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
629 let keys = digests
630 .iter()
631 .map(|digest| Key::TransactionToCheckpoint(*digest))
632 .collect::<Vec<_>>();
633
634 let fetches = self.multi_fetch(keys).await;
635
636 let results = fetches
637 .iter()
638 .zip(digests.iter())
639 .map(map_fetch)
640 .map(|maybe_bytes| {
641 maybe_bytes
642 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
643 })
644 .collect::<Vec<_>>();
645
646 Ok(results)
647 }
648
649 #[instrument(level = "trace", skip_all)]
650 async fn multi_get_events_by_tx_digests(
651 &self,
652 digests: &[TransactionDigest],
653 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
654 let keys = digests
655 .iter()
656 .map(|digest| Key::EventsByTransactionDigest(*digest))
657 .collect::<Vec<_>>();
658 Ok(self
659 .multi_fetch(keys)
660 .await
661 .iter()
662 .zip(digests.iter())
663 .map(map_fetch)
664 .map(|maybe_bytes| {
665 maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
666 })
667 .collect::<Vec<_>>())
668 }
669}