1use std::{str::FromStr, sync::Arc, time::Duration};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12 base_types::{ObjectID, SequenceNumber, VersionNumber},
13 digests::{CheckpointDigest, TransactionDigest},
14 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15 error::{IotaError, IotaResult},
16 messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18 },
19 object::Object,
20 storage::ObjectKey,
21 transaction::Transaction,
22};
23use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
24use reqwest::{
25 Client, Url,
26 header::{CONTENT_LENGTH, HeaderValue},
27};
28use serde::{Deserialize, Serialize};
29use tap::TapFallible;
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33 key_value_store::{
34 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
35 },
36 key_value_store_metrics::KeyValueStoreMetrics,
37};
38
39pub struct HttpKVStore {
40 base_url: Url,
41 client: Client,
42 cache: MokaCache<Url, Bytes>,
43 metrics: Arc<KeyValueStoreMetrics>,
44}
45
46pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
47 base64_url::encode(digest)
48}
49
50#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
52pub enum TaggedKey {
53 CheckpointSequenceNumber(CheckpointSequenceNumber),
54}
55
56pub fn encoded_tagged_key(key: &TaggedKey) -> String {
57 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
58 base64_url::encode(&bytes)
59}
60
61pub fn encode_object_key(object_id: &ObjectID, version: &VersionNumber) -> String {
62 let bytes =
63 bcs::to_bytes(&ObjectKey(*object_id, *version)).expect("failed to serialize object key");
64 base64_url::encode(&bytes)
65}
66
67trait IntoIotaResult<T> {
68 fn into_iota_result(self) -> IotaResult<T>;
69}
70
71impl<T, E> IntoIotaResult<T> for Result<T, E>
72where
73 E: std::error::Error,
74{
75 fn into_iota_result(self) -> IotaResult<T> {
76 self.map_err(|e| IotaError::Storage(e.to_string()))
77 }
78}
79
80#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, strum::EnumString, strum::Display)]
83pub enum ItemType {
84 #[strum(serialize = "tx")]
85 #[serde(rename = "tx")]
86 Transaction,
87 #[strum(serialize = "fx")]
88 #[serde(rename = "fx")]
89 TransactionEffects,
90 #[strum(serialize = "cc")]
91 #[serde(rename = "cc")]
92 CheckpointContents,
93 #[strum(serialize = "cs")]
94 #[serde(rename = "cs")]
95 CheckpointSummary,
96 #[strum(serialize = "tx2c")]
97 #[serde(rename = "tx2c")]
98 TransactionToCheckpoint,
99 #[strum(serialize = "ob")]
100 #[serde(rename = "ob")]
101 Object,
102 #[strum(serialize = "evtx")]
103 #[serde(rename = "evtx")]
104 EventTransactionDigest,
105}
106
107#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub enum Key {
109 Transaction(TransactionDigest),
110 TransactionEffects(TransactionDigest),
111 CheckpointContents(CheckpointSequenceNumber),
112 CheckpointSummary(CheckpointSequenceNumber),
113 CheckpointSummaryByDigest(CheckpointDigest),
114 TransactionToCheckpoint(TransactionDigest),
115 ObjectKey(ObjectID, VersionNumber),
116 EventsByTransactionDigest(TransactionDigest),
117}
118
119impl Key {
120 pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
140 let item_type =
141 ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
142 let decoded_key = base64_url::decode(encoded_key)
143 .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
144
145 match item_type {
146 ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
147 decoded_key.as_slice(),
148 )?)),
149 ItemType::TransactionEffects => Ok(Key::TransactionEffects(
150 TransactionDigest::try_from(decoded_key.as_slice())?,
151 )),
152 ItemType::CheckpointContents => {
153 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
154 anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
155 })?;
156 match tagged_key {
157 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
158 }
159 }
160 ItemType::CheckpointSummary => {
161 match CheckpointDigest::try_from(decoded_key.clone()) {
163 Err(_) => {
164 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
165 anyhow::anyhow!(
166 "failed to deserialize checkpoint sequence number: {err}"
167 )
168 })?;
169 match tagged_key {
170 TaggedKey::CheckpointSequenceNumber(seq) => {
171 Ok(Key::CheckpointSummary(seq))
172 }
173 }
174 }
175 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
176 }
177 }
178 ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
179 TransactionDigest::try_from(decoded_key.as_slice())?,
180 )),
181 ItemType::Object => {
182 let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
183 .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
184
185 Ok(Key::ObjectKey(object_key.0, object_key.1))
186 }
187 ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
188 TransactionDigest::try_from(decoded_key.as_slice())?,
189 )),
190 }
191 }
192
193 pub fn item_type(&self) -> ItemType {
212 match self {
213 Key::Transaction(_) => ItemType::Transaction,
214 Key::TransactionEffects(_) => ItemType::TransactionEffects,
215 Key::CheckpointContents(_) => ItemType::CheckpointContents,
216 Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
217 ItemType::CheckpointSummary
218 }
219 Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
220 Key::ObjectKey(_, _) => ItemType::Object,
221 Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
222 }
223 }
224
225 pub fn to_path_elements(&self) -> (ItemType, String) {
256 let encoded_key_digest = match self {
257 Key::Transaction(digest) => encode_digest(digest),
258 Key::TransactionEffects(digest) => encode_digest(digest),
259 Key::CheckpointContents(seq) => {
260 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
261 }
262 Key::CheckpointSummary(seq) => {
263 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
264 }
265 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
266 Key::TransactionToCheckpoint(digest) => encode_digest(digest),
267 Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
268 Key::EventsByTransactionDigest(digest) => encode_digest(digest),
269 };
270
271 (self.item_type(), encoded_key_digest)
272 }
273}
274
275#[derive(Clone, Debug)]
276enum Value {
277 Tx(Box<Transaction>),
278 Fx(Box<TransactionEffects>),
279 Events(Box<TransactionEvents>),
280 CheckpointContents(Box<CheckpointContents>),
281 CheckpointSummary(Box<CertifiedCheckpointSummary>),
282 TxToCheckpoint(CheckpointSequenceNumber),
283}
284
285impl HttpKVStore {
286 pub fn new_kv(
287 base_url: &str,
288 cache_size: u64,
289 metrics: Arc<KeyValueStoreMetrics>,
290 ) -> IotaResult<TransactionKeyValueStore> {
291 let inner = Arc::new(Self::new(base_url, cache_size, metrics.clone())?);
292 Ok(TransactionKeyValueStore::new("http", metrics, inner))
293 }
294
295 pub fn new(
296 base_url: &str,
297 cache_size: u64,
298 metrics: Arc<KeyValueStoreMetrics>,
299 ) -> IotaResult<Self> {
300 info!("creating HttpKVStore with base_url: {}", base_url);
301
302 let client = Client::builder().http2_prior_knowledge().build().unwrap();
303
304 let base_url = if base_url.ends_with('/') {
305 base_url.to_string()
306 } else {
307 format!("{}/", base_url)
308 };
309
310 let base_url = Url::parse(&base_url).into_iota_result()?;
311
312 let cache = MokaCacheBuilder::new(cache_size)
313 .time_to_idle(Duration::from_secs(600))
314 .build();
315
316 Ok(Self {
317 base_url,
318 client,
319 cache,
320 metrics,
321 })
322 }
323
324 fn get_url(&self, key: &Key) -> IotaResult<Url> {
325 let (item_type, digest) = key.to_path_elements();
326 let joined = self
327 .base_url
328 .join(&format!("{item_type}/{digest}"))
329 .into_iota_result()?;
330 Url::from_str(joined.as_str()).into_iota_result()
331 }
332
333 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
334 let uris_vec = uris.to_vec();
335 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
336 fetches.buffered(uris.len()).collect::<Vec<_>>().await
337 }
338
339 async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
340 let url = self.get_url(&key)?;
341
342 trace!("fetching url: {}", url);
343
344 if let Some(res) = self.cache.get(&url) {
345 trace!("found cached data for url: {}, len: {:?}", url, res.len());
346 self.metrics
347 .key_value_store_num_fetches_success
348 .with_label_values(&["http_cache", "url"])
349 .inc();
350 return Ok(Some(res));
351 }
352
353 self.metrics
354 .key_value_store_num_fetches_not_found
355 .with_label_values(&["http_cache", "url"])
356 .inc();
357
358 let resp = self
359 .client
360 .get(url.clone())
361 .send()
362 .await
363 .into_iota_result()?;
364 trace!(
365 "got response {} for url: {}, len: {:?}",
366 url,
367 resp.status(),
368 resp.headers()
369 .get(CONTENT_LENGTH)
370 .unwrap_or(&HeaderValue::from_static("0"))
371 );
372 if resp.status().is_success() {
374 let bytes = resp.bytes().await.into_iota_result()?;
375 self.cache.insert(url, bytes.clone());
376
377 Ok(Some(bytes))
378 } else {
379 Ok(None)
380 }
381 }
382}
383
384fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
385where
386 K: std::fmt::Debug,
387 T: for<'de> Deserialize<'de>,
388{
389 bcs::from_bytes(bytes)
390 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
391 .ok()
392}
393
394fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
395where
396 K: std::fmt::Debug,
397{
398 let (fetch, key) = fetch;
399 match fetch {
400 Ok(Some(bytes)) => Some((bytes, key)),
401 Ok(None) => None,
402 Err(err) => {
403 warn!("Error fetching key: {:?}, error: {:?}", key, err);
404 None
405 }
406 }
407}
408
409fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
410 let mut start = 0;
411 lengths
412 .iter()
413 .map(|length| {
414 let end = start + length;
415 let result = &slice[start..end];
416 start = end;
417 result
418 })
419 .collect()
420}
421
422fn deser_check_digest<T, D>(
423 digest: &D,
424 bytes: &Bytes,
425 get_expected_digest: impl FnOnce(&T) -> D,
426) -> Option<T>
427where
428 D: std::fmt::Debug + PartialEq,
429 T: for<'de> Deserialize<'de>,
430{
431 deser(digest, bytes).and_then(|o: T| {
432 let expected_digest = get_expected_digest(&o);
433 if expected_digest == *digest {
434 Some(o)
435 } else {
436 error!(
437 "Digest mismatch - expected: {:?}, got: {:?}",
438 digest, expected_digest,
439 );
440 None
441 }
442 })
443}
444
445#[async_trait]
446impl TransactionKeyValueStoreTrait for HttpKVStore {
447 #[instrument(level = "trace", skip_all)]
448 async fn multi_get(
449 &self,
450 transaction_keys: &[TransactionDigest],
451 effects_keys: &[TransactionDigest],
452 ) -> IotaResult<KVStoreTransactionData> {
453 let num_txns = transaction_keys.len();
454 let num_effects = effects_keys.len();
455
456 let keys = transaction_keys
457 .iter()
458 .map(|tx| Key::Transaction(*tx))
459 .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
460 .collect::<Vec<_>>();
461
462 let fetches = self.multi_fetch(keys).await;
463 let txn_slice = fetches[..num_txns].to_vec();
464 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
465
466 let txn_results = txn_slice
467 .iter()
468 .take(num_txns)
469 .zip(transaction_keys.iter())
470 .map(map_fetch)
471 .map(|maybe_bytes| {
472 maybe_bytes.and_then(|(bytes, digest)| {
473 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
474 })
475 })
476 .collect::<Vec<_>>();
477
478 let fx_results = fx_slice
479 .iter()
480 .take(num_effects)
481 .zip(effects_keys.iter())
482 .map(map_fetch)
483 .map(|maybe_bytes| {
484 maybe_bytes.and_then(|(bytes, digest)| {
485 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
486 *fx.transaction_digest()
487 })
488 })
489 })
490 .collect::<Vec<_>>();
491
492 Ok((txn_results, fx_results))
493 }
494
495 #[instrument(level = "trace", skip_all)]
496 async fn multi_get_checkpoints(
497 &self,
498 checkpoint_summaries: &[CheckpointSequenceNumber],
499 checkpoint_contents: &[CheckpointSequenceNumber],
500 checkpoint_summaries_by_digest: &[CheckpointDigest],
501 ) -> IotaResult<(
502 Vec<Option<CertifiedCheckpointSummary>>,
503 Vec<Option<CheckpointContents>>,
504 Vec<Option<CertifiedCheckpointSummary>>,
505 )> {
506 let keys = checkpoint_summaries
507 .iter()
508 .map(|cp| Key::CheckpointSummary(*cp))
509 .chain(
510 checkpoint_contents
511 .iter()
512 .map(|cp| Key::CheckpointContents(*cp)),
513 )
514 .chain(
515 checkpoint_summaries_by_digest
516 .iter()
517 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
518 )
519 .collect::<Vec<_>>();
520
521 let summaries_len = checkpoint_summaries.len();
522 let contents_len = checkpoint_contents.len();
523 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
524
525 let fetches = self.multi_fetch(keys).await;
526
527 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
528
529 let result_slices = multi_split_slice(&fetches, &input_slices);
530
531 let summaries_results = result_slices[0]
532 .iter()
533 .zip(checkpoint_summaries.iter())
534 .map(map_fetch)
535 .map(|maybe_bytes| {
536 maybe_bytes
537 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
538 })
539 .collect::<Vec<_>>();
540
541 let contents_results = result_slices[1]
542 .iter()
543 .zip(checkpoint_contents.iter())
544 .map(map_fetch)
545 .map(|maybe_bytes| {
546 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
547 })
548 .collect::<Vec<_>>();
549
550 let summaries_by_digest_results = result_slices[2]
551 .iter()
552 .zip(checkpoint_summaries_by_digest.iter())
553 .map(map_fetch)
554 .map(|maybe_bytes| {
555 maybe_bytes.and_then(|(bytes, digest)| {
556 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
557 })
558 })
559 .collect::<Vec<_>>();
560
561 Ok((
562 summaries_results,
563 contents_results,
564 summaries_by_digest_results,
565 ))
566 }
567
568 #[instrument(level = "trace", skip_all)]
569 async fn get_transaction_perpetual_checkpoint(
570 &self,
571 digest: TransactionDigest,
572 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
573 let key = Key::TransactionToCheckpoint(digest);
574 self.fetch(key).await.map(|maybe| {
575 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
576 })
577 }
578
579 #[instrument(level = "trace", skip_all)]
580 async fn get_object(
581 &self,
582 object_id: ObjectID,
583 version: SequenceNumber,
584 ) -> IotaResult<Option<Object>> {
585 let key = Key::ObjectKey(object_id, version);
586 self.fetch(key)
587 .await
588 .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
589 }
590
591 #[instrument(level = "trace", skip_all)]
592 async fn multi_get_transactions_perpetual_checkpoints(
593 &self,
594 digests: &[TransactionDigest],
595 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
596 let keys = digests
597 .iter()
598 .map(|digest| Key::TransactionToCheckpoint(*digest))
599 .collect::<Vec<_>>();
600
601 let fetches = self.multi_fetch(keys).await;
602
603 let results = fetches
604 .iter()
605 .zip(digests.iter())
606 .map(map_fetch)
607 .map(|maybe_bytes| {
608 maybe_bytes
609 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
610 })
611 .collect::<Vec<_>>();
612
613 Ok(results)
614 }
615
616 #[instrument(level = "trace", skip_all)]
617 async fn multi_get_events_by_tx_digests(
618 &self,
619 digests: &[TransactionDigest],
620 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
621 let keys = digests
622 .iter()
623 .map(|digest| Key::EventsByTransactionDigest(*digest))
624 .collect::<Vec<_>>();
625 Ok(self
626 .multi_fetch(keys)
627 .await
628 .iter()
629 .zip(digests.iter())
630 .map(map_fetch)
631 .map(|maybe_bytes| {
632 maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
633 })
634 .collect::<Vec<_>>())
635 }
636}