1use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12 base_types::{ObjectID, SequenceNumber, VersionNumber},
13 digests::{CheckpointDigest, TransactionDigest},
14 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15 error::{IotaError, IotaResult},
16 messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18 },
19 object::Object,
20 storage::ObjectKey,
21 transaction::Transaction,
22};
23use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
24use reqwest::{
25 Client, Url,
26 header::{CONTENT_LENGTH, HeaderValue},
27};
28use serde::{Deserialize, Serialize};
29use tap::TapFallible;
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33 key_value_store::{
34 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
35 },
36 key_value_store_metrics::KeyValueStoreMetrics,
37};
38
39pub struct HttpKVStore {
40 base_url: Url,
41 client: Client,
42 cache: MokaCache<Url, Bytes>,
43 metrics: Arc<KeyValueStoreMetrics>,
44}
45
46pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
47 base64_url::encode(digest)
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52pub enum TaggedKey {
53 CheckpointSequenceNumber(CheckpointSequenceNumber),
54}
55
56pub fn encoded_tagged_key(key: &TaggedKey) -> String {
57 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
58 base64_url::encode(&bytes)
59}
60
61pub fn encode_object_key(object_id: &ObjectID, version: &VersionNumber) -> String {
62 let bytes =
63 bcs::to_bytes(&ObjectKey(*object_id, *version)).expect("failed to serialize object key");
64 base64_url::encode(&bytes)
65}
66
67trait IntoIotaResult<T> {
68 fn into_iota_result(self) -> IotaResult<T>;
69}
70
71impl<T, E> IntoIotaResult<T> for Result<T, E>
72where
73 E: std::error::Error,
74{
75 fn into_iota_result(self) -> IotaResult<T> {
76 self.map_err(|e| IotaError::Storage(e.to_string()))
77 }
78}
79
80#[derive(
83 Clone,
84 Copy,
85 Debug,
86 PartialEq,
87 Eq,
88 PartialOrd,
89 Ord,
90 Hash,
91 Deserialize,
92 strum::EnumString,
93 strum::Display,
94)]
95pub enum ItemType {
96 #[strum(serialize = "tx")]
97 #[serde(rename = "tx")]
98 Transaction,
99 #[strum(serialize = "fx")]
100 #[serde(rename = "fx")]
101 TransactionEffects,
102 #[strum(serialize = "cc")]
103 #[serde(rename = "cc")]
104 CheckpointContents,
105 #[strum(serialize = "cs")]
106 #[serde(rename = "cs")]
107 CheckpointSummary,
108 #[strum(serialize = "tx2c")]
109 #[serde(rename = "tx2c")]
110 TransactionToCheckpoint,
111 #[strum(serialize = "ob")]
112 #[serde(rename = "ob")]
113 Object,
114 #[strum(serialize = "evtx")]
115 #[serde(rename = "evtx")]
116 EventTransactionDigest,
117}
118
119#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
120pub enum Key {
121 Transaction(TransactionDigest),
122 TransactionEffects(TransactionDigest),
123 CheckpointContents(CheckpointSequenceNumber),
124 CheckpointSummary(CheckpointSequenceNumber),
125 CheckpointSummaryByDigest(CheckpointDigest),
126 TransactionToCheckpoint(TransactionDigest),
127 ObjectKey(ObjectID, VersionNumber),
128 EventsByTransactionDigest(TransactionDigest),
129}
130
131impl Key {
132 pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
152 let item_type =
153 ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
154 let decoded_key = base64_url::decode(encoded_key)
155 .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
156
157 match item_type {
158 ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
159 decoded_key.as_slice(),
160 )?)),
161 ItemType::TransactionEffects => Ok(Key::TransactionEffects(
162 TransactionDigest::try_from(decoded_key.as_slice())?,
163 )),
164 ItemType::CheckpointContents => {
165 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
166 anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
167 })?;
168 match tagged_key {
169 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
170 }
171 }
172 ItemType::CheckpointSummary => {
173 match CheckpointDigest::try_from(decoded_key.clone()) {
175 Err(_) => {
176 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
177 anyhow::anyhow!(
178 "failed to deserialize checkpoint sequence number: {err}"
179 )
180 })?;
181 match tagged_key {
182 TaggedKey::CheckpointSequenceNumber(seq) => {
183 Ok(Key::CheckpointSummary(seq))
184 }
185 }
186 }
187 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
188 }
189 }
190 ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
191 TransactionDigest::try_from(decoded_key.as_slice())?,
192 )),
193 ItemType::Object => {
194 let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
195 .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
196
197 Ok(Key::ObjectKey(object_key.0, object_key.1))
198 }
199 ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
200 TransactionDigest::try_from(decoded_key.as_slice())?,
201 )),
202 }
203 }
204
205 pub fn item_type(&self) -> ItemType {
224 match self {
225 Key::Transaction(_) => ItemType::Transaction,
226 Key::TransactionEffects(_) => ItemType::TransactionEffects,
227 Key::CheckpointContents(_) => ItemType::CheckpointContents,
228 Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
229 ItemType::CheckpointSummary
230 }
231 Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
232 Key::ObjectKey(_, _) => ItemType::Object,
233 Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
234 }
235 }
236
237 pub fn to_path_elements(&self) -> (ItemType, String) {
268 let encoded_key_digest = match self {
269 Key::Transaction(digest) => encode_digest(digest),
270 Key::TransactionEffects(digest) => encode_digest(digest),
271 Key::CheckpointContents(seq) => {
272 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
273 }
274 Key::CheckpointSummary(seq) => {
275 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
276 }
277 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
278 Key::TransactionToCheckpoint(digest) => encode_digest(digest),
279 Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
280 Key::EventsByTransactionDigest(digest) => encode_digest(digest),
281 };
282
283 (self.item_type(), encoded_key_digest)
284 }
285}
286
287#[derive(Clone, Debug)]
288enum Value {
289 Tx(Box<Transaction>),
290 Fx(Box<TransactionEffects>),
291 Events(Box<TransactionEvents>),
292 CheckpointContents(Box<CheckpointContents>),
293 CheckpointSummary(Box<CertifiedCheckpointSummary>),
294 TxToCheckpoint(CheckpointSequenceNumber),
295}
296
297impl HttpKVStore {
298 pub fn new_kv(
299 base_url: &str,
300 cache_size: u64,
301 metrics: Arc<KeyValueStoreMetrics>,
302 ) -> IotaResult<TransactionKeyValueStore> {
303 let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
304 Ok(TransactionKeyValueStore::new("http", metrics, inner))
305 }
306
307 pub fn new(
308 base_url: &str,
309 cache_size: u64,
310 metrics: Arc<KeyValueStoreMetrics>,
311 ) -> IotaResult<Self> {
312 info!("creating HttpKVStore with base_url: {}", base_url);
313
314 let client = Client::builder().http2_prior_knowledge().build().unwrap();
315
316 let base_url = if base_url.ends_with('/') {
317 base_url.to_string()
318 } else {
319 format!("{base_url}/")
320 };
321
322 let base_url = Url::parse(&base_url).into_iota_result()?;
323
324 let cache = MokaCacheBuilder::new(cache_size)
325 .time_to_idle(Duration::from_secs(600))
326 .build();
327
328 Ok(Self {
329 base_url,
330 client,
331 cache,
332 metrics,
333 })
334 }
335
336 fn get_url(&self, key: &Key) -> IotaResult<Url> {
337 let (item_type, digest) = key.to_path_elements();
338 let joined = self
339 .base_url
340 .join(&format!("{item_type}/{digest}"))
341 .into_iota_result()?;
342 Url::from_str(joined.as_str()).into_iota_result()
343 }
344
345 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
346 let uris_vec = uris.to_vec();
347 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
348 fetches.buffered(uris.len()).collect::<Vec<_>>().await
349 }
350
351 async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
352 let url = self.get_url(&key)?;
353
354 trace!("fetching url: {}", url);
355
356 if let Some(res) = self.cache.get(&url) {
357 trace!("found cached data for url: {}, len: {:?}", url, res.len());
358 self.metrics
359 .key_value_store_num_fetches_success
360 .with_label_values(&["http_cache", "url"])
361 .inc();
362 return Ok(Some(res));
363 }
364
365 self.metrics
366 .key_value_store_num_fetches_not_found
367 .with_label_values(&["http_cache", "url"])
368 .inc();
369
370 let resp = self
371 .client
372 .get(url.clone())
373 .send()
374 .await
375 .into_iota_result()?;
376 trace!(
377 "got response {} for url: {}, len: {:?}",
378 url,
379 resp.status(),
380 resp.headers()
381 .get(CONTENT_LENGTH)
382 .unwrap_or(&HeaderValue::from_static("0"))
383 );
384 if resp.status().is_success() {
386 let bytes = resp.bytes().await.into_iota_result()?;
387 self.cache.insert(url, bytes.clone());
388
389 Ok(Some(bytes))
390 } else {
391 Ok(None)
392 }
393 }
394}
395
396fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
397where
398 K: std::fmt::Debug,
399 T: for<'de> Deserialize<'de>,
400{
401 bcs::from_bytes(bytes)
402 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
403 .ok()
404}
405
406fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
407where
408 K: std::fmt::Debug,
409{
410 let (fetch, key) = fetch;
411 match fetch {
412 Ok(Some(bytes)) => Some((bytes, key)),
413 Ok(None) => None,
414 Err(err) => {
415 warn!("Error fetching key: {:?}, error: {:?}", key, err);
416 None
417 }
418 }
419}
420
421fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
422 let mut start = 0;
423 lengths
424 .iter()
425 .map(|length| {
426 let end = start + length;
427 let result = &slice[start..end];
428 start = end;
429 result
430 })
431 .collect()
432}
433
434fn deser_check_digest<T, D>(
435 digest: &D,
436 bytes: &Bytes,
437 get_expected_digest: impl FnOnce(&T) -> D,
438) -> Option<T>
439where
440 D: std::fmt::Debug + PartialEq,
441 T: for<'de> Deserialize<'de>,
442{
443 deser(digest, bytes).and_then(|o: T| {
444 let expected_digest = get_expected_digest(&o);
445 if expected_digest == *digest {
446 Some(o)
447 } else {
448 error!(
449 "Digest mismatch - expected: {:?}, got: {:?}",
450 digest, expected_digest,
451 );
452 None
453 }
454 })
455}
456
457#[async_trait]
458impl TransactionKeyValueStoreTrait for HttpKVStore {
459 #[instrument(level = "trace", skip_all)]
460 async fn multi_get(
461 &self,
462 transaction_keys: &[TransactionDigest],
463 effects_keys: &[TransactionDigest],
464 ) -> IotaResult<KVStoreTransactionData> {
465 let num_txns = transaction_keys.len();
466 let num_effects = effects_keys.len();
467
468 let keys = transaction_keys
469 .iter()
470 .map(|tx| Key::Transaction(*tx))
471 .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
472 .collect::<Vec<_>>();
473
474 let fetches = self.multi_fetch(keys).await;
475 let txn_slice = fetches[..num_txns].to_vec();
476 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
477
478 let txn_results = txn_slice
479 .iter()
480 .take(num_txns)
481 .zip(transaction_keys.iter())
482 .map(map_fetch)
483 .map(|maybe_bytes| {
484 maybe_bytes.and_then(|(bytes, digest)| {
485 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
486 })
487 })
488 .collect::<Vec<_>>();
489
490 let fx_results = fx_slice
491 .iter()
492 .take(num_effects)
493 .zip(effects_keys.iter())
494 .map(map_fetch)
495 .map(|maybe_bytes| {
496 maybe_bytes.and_then(|(bytes, digest)| {
497 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
498 *fx.transaction_digest()
499 })
500 })
501 })
502 .collect::<Vec<_>>();
503
504 Ok((txn_results, fx_results))
505 }
506
507 #[instrument(level = "trace", skip_all)]
508 async fn multi_get_checkpoints(
509 &self,
510 checkpoint_summaries: &[CheckpointSequenceNumber],
511 checkpoint_contents: &[CheckpointSequenceNumber],
512 checkpoint_summaries_by_digest: &[CheckpointDigest],
513 ) -> IotaResult<(
514 Vec<Option<CertifiedCheckpointSummary>>,
515 Vec<Option<CheckpointContents>>,
516 Vec<Option<CertifiedCheckpointSummary>>,
517 )> {
518 let keys = checkpoint_summaries
519 .iter()
520 .map(|cp| Key::CheckpointSummary(*cp))
521 .chain(
522 checkpoint_contents
523 .iter()
524 .map(|cp| Key::CheckpointContents(*cp)),
525 )
526 .chain(
527 checkpoint_summaries_by_digest
528 .iter()
529 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
530 )
531 .collect::<Vec<_>>();
532
533 let summaries_len = checkpoint_summaries.len();
534 let contents_len = checkpoint_contents.len();
535 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
536
537 let fetches = self.multi_fetch(keys).await;
538
539 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
540
541 let result_slices = multi_split_slice(&fetches, &input_slices);
542
543 let summaries_results = result_slices[0]
544 .iter()
545 .zip(checkpoint_summaries.iter())
546 .map(map_fetch)
547 .map(|maybe_bytes| {
548 maybe_bytes
549 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
550 })
551 .collect::<Vec<_>>();
552
553 let contents_results = result_slices[1]
554 .iter()
555 .zip(checkpoint_contents.iter())
556 .map(map_fetch)
557 .map(|maybe_bytes| {
558 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
559 })
560 .collect::<Vec<_>>();
561
562 let summaries_by_digest_results = result_slices[2]
563 .iter()
564 .zip(checkpoint_summaries_by_digest.iter())
565 .map(map_fetch)
566 .map(|maybe_bytes| {
567 maybe_bytes.and_then(|(bytes, digest)| {
568 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
569 })
570 })
571 .collect::<Vec<_>>();
572
573 Ok((
574 summaries_results,
575 contents_results,
576 summaries_by_digest_results,
577 ))
578 }
579
580 #[instrument(level = "trace", skip_all)]
581 async fn get_transaction_perpetual_checkpoint(
582 &self,
583 digest: TransactionDigest,
584 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
585 let key = Key::TransactionToCheckpoint(digest);
586 self.fetch(key).await.map(|maybe| {
587 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
588 })
589 }
590
591 #[instrument(level = "trace", skip_all)]
592 async fn get_object(
593 &self,
594 object_id: ObjectID,
595 version: SequenceNumber,
596 ) -> IotaResult<Option<Object>> {
597 let key = Key::ObjectKey(object_id, version);
598 self.fetch(key)
599 .await
600 .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
601 }
602
603 #[instrument(level = "trace", skip_all)]
604 async fn multi_get_transactions_perpetual_checkpoints(
605 &self,
606 digests: &[TransactionDigest],
607 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
608 let keys = digests
609 .iter()
610 .map(|digest| Key::TransactionToCheckpoint(*digest))
611 .collect::<Vec<_>>();
612
613 let fetches = self.multi_fetch(keys).await;
614
615 let results = fetches
616 .iter()
617 .zip(digests.iter())
618 .map(map_fetch)
619 .map(|maybe_bytes| {
620 maybe_bytes
621 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
622 })
623 .collect::<Vec<_>>();
624
625 Ok(results)
626 }
627
628 #[instrument(level = "trace", skip_all)]
629 async fn multi_get_events_by_tx_digests(
630 &self,
631 digests: &[TransactionDigest],
632 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
633 let keys = digests
634 .iter()
635 .map(|digest| Key::EventsByTransactionDigest(*digest))
636 .collect::<Vec<_>>();
637 Ok(self
638 .multi_fetch(keys)
639 .await
640 .iter()
641 .zip(digests.iter())
642 .map(map_fetch)
643 .map(|maybe_bytes| {
644 maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
645 })
646 .collect::<Vec<_>>())
647 }
648}