1use std::{str::FromStr, sync::Arc};
6
7use anyhow;
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::{self, StreamExt};
11use iota_types::{
12 base_types::{ObjectID, SequenceNumber, VersionNumber},
13 digests::{CheckpointDigest, TransactionDigest},
14 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
15 error::{IotaError, IotaResult},
16 messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18 },
19 object::Object,
20 storage::ObjectKey,
21 transaction::Transaction,
22};
23use reqwest::{
24 Client, Url,
25 header::{CONTENT_LENGTH, HeaderValue},
26};
27use serde::{Deserialize, Serialize};
28use tap::TapFallible;
29use tracing::{error, info, instrument, trace, warn};
30
31use crate::{
32 key_value_store::{
33 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
34 },
35 key_value_store_metrics::KeyValueStoreMetrics,
36};
37
38pub struct HttpKVStore {
39 base_url: Url,
40 client: Client,
41}
42
43pub fn encode_digest<T: AsRef<[u8]>>(digest: &T) -> String {
44 base64_url::encode(digest)
45}
46
47#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
49pub enum TaggedKey {
50 CheckpointSequenceNumber(CheckpointSequenceNumber),
51}
52
53pub fn encoded_tagged_key(key: &TaggedKey) -> String {
54 let bytes = bcs::to_bytes(key).expect("failed to serialize key");
55 base64_url::encode(&bytes)
56}
57
58pub fn encode_object_key(object_id: &ObjectID, version: &VersionNumber) -> String {
59 let bytes =
60 bcs::to_bytes(&ObjectKey(*object_id, *version)).expect("failed to serialize object key");
61 base64_url::encode(&bytes)
62}
63
64trait IntoIotaResult<T> {
65 fn into_iota_result(self) -> IotaResult<T>;
66}
67
68impl<T, E> IntoIotaResult<T> for Result<T, E>
69where
70 E: std::error::Error,
71{
72 fn into_iota_result(self) -> IotaResult<T> {
73 self.map_err(|e| IotaError::Storage(e.to_string()))
74 }
75}
76
77#[derive(Clone, Copy, Debug, PartialEq, Eq, Deserialize, strum::EnumString, strum::Display)]
80pub enum ItemType {
81 #[strum(serialize = "tx")]
82 #[serde(rename = "tx")]
83 Transaction,
84 #[strum(serialize = "fx")]
85 #[serde(rename = "fx")]
86 TransactionEffects,
87 #[strum(serialize = "cc")]
88 #[serde(rename = "cc")]
89 CheckpointContents,
90 #[strum(serialize = "cs")]
91 #[serde(rename = "cs")]
92 CheckpointSummary,
93 #[strum(serialize = "tx2c")]
94 #[serde(rename = "tx2c")]
95 TransactionToCheckpoint,
96 #[strum(serialize = "ob")]
97 #[serde(rename = "ob")]
98 Object,
99 #[strum(serialize = "evtx")]
100 #[serde(rename = "evtx")]
101 EventTransactionDigest,
102}
103
104#[derive(Clone, Copy, Debug, PartialEq, Eq)]
105pub enum Key {
106 Transaction(TransactionDigest),
107 TransactionEffects(TransactionDigest),
108 CheckpointContents(CheckpointSequenceNumber),
109 CheckpointSummary(CheckpointSequenceNumber),
110 CheckpointSummaryByDigest(CheckpointDigest),
111 TransactionToCheckpoint(TransactionDigest),
112 ObjectKey(ObjectID, VersionNumber),
113 EventsByTransactionDigest(TransactionDigest),
114}
115
116impl Key {
117 pub fn new(item_type: &str, encoded_key: &str) -> anyhow::Result<Self> {
137 let item_type =
138 ItemType::from_str(item_type).map_err(|e| anyhow::anyhow!("invalid item type: {e}"))?;
139 let decoded_key = base64_url::decode(encoded_key)
140 .map_err(|err| anyhow::anyhow!("invalid base64 url string: {err}"))?;
141
142 match item_type {
143 ItemType::Transaction => Ok(Key::Transaction(TransactionDigest::try_from(
144 decoded_key.as_slice(),
145 )?)),
146 ItemType::TransactionEffects => Ok(Key::TransactionEffects(
147 TransactionDigest::try_from(decoded_key.as_slice())?,
148 )),
149 ItemType::CheckpointContents => {
150 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
151 anyhow::anyhow!("failed to deserialize checkpoint sequence number: {err}")
152 })?;
153 match tagged_key {
154 TaggedKey::CheckpointSequenceNumber(seq) => Ok(Key::CheckpointContents(seq)),
155 }
156 }
157 ItemType::CheckpointSummary => {
158 match CheckpointDigest::try_from(decoded_key.clone()) {
160 Err(_) => {
161 let tagged_key = bcs::from_bytes(&decoded_key).map_err(|err| {
162 anyhow::anyhow!(
163 "failed to deserialize checkpoint sequence number: {err}"
164 )
165 })?;
166 match tagged_key {
167 TaggedKey::CheckpointSequenceNumber(seq) => {
168 Ok(Key::CheckpointSummary(seq))
169 }
170 }
171 }
172 Ok(cs_digest) => Ok(Key::CheckpointSummaryByDigest(cs_digest)),
173 }
174 }
175 ItemType::TransactionToCheckpoint => Ok(Key::TransactionToCheckpoint(
176 TransactionDigest::try_from(decoded_key.as_slice())?,
177 )),
178 ItemType::Object => {
179 let object_key: ObjectKey = bcs::from_bytes(&decoded_key)
180 .map_err(|err| anyhow::anyhow!("failed to deserialize object key: {err}"))?;
181
182 Ok(Key::ObjectKey(object_key.0, object_key.1))
183 }
184 ItemType::EventTransactionDigest => Ok(Key::EventsByTransactionDigest(
185 TransactionDigest::try_from(decoded_key.as_slice())?,
186 )),
187 }
188 }
189
190 pub fn item_type(&self) -> ItemType {
209 match self {
210 Key::Transaction(_) => ItemType::Transaction,
211 Key::TransactionEffects(_) => ItemType::TransactionEffects,
212 Key::CheckpointContents(_) => ItemType::CheckpointContents,
213 Key::CheckpointSummary(_) | Key::CheckpointSummaryByDigest(_) => {
214 ItemType::CheckpointSummary
215 }
216 Key::TransactionToCheckpoint(_) => ItemType::TransactionToCheckpoint,
217 Key::ObjectKey(_, _) => ItemType::Object,
218 Key::EventsByTransactionDigest(_) => ItemType::EventTransactionDigest,
219 }
220 }
221
222 pub fn to_path_elements(&self) -> (ItemType, String) {
253 let encoded_key_digest = match self {
254 Key::Transaction(digest) => encode_digest(digest),
255 Key::TransactionEffects(digest) => encode_digest(digest),
256 Key::CheckpointContents(seq) => {
257 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
258 }
259 Key::CheckpointSummary(seq) => {
260 encoded_tagged_key(&TaggedKey::CheckpointSequenceNumber(*seq))
261 }
262 Key::CheckpointSummaryByDigest(digest) => encode_digest(digest),
263 Key::TransactionToCheckpoint(digest) => encode_digest(digest),
264 Key::ObjectKey(object_id, version) => encode_object_key(object_id, version),
265 Key::EventsByTransactionDigest(digest) => encode_digest(digest),
266 };
267
268 (self.item_type(), encoded_key_digest)
269 }
270}
271
272#[derive(Clone, Debug)]
273enum Value {
274 Tx(Box<Transaction>),
275 Fx(Box<TransactionEffects>),
276 Events(Box<TransactionEvents>),
277 CheckpointContents(Box<CheckpointContents>),
278 CheckpointSummary(Box<CertifiedCheckpointSummary>),
279 TxToCheckpoint(CheckpointSequenceNumber),
280}
281
282impl HttpKVStore {
283 pub fn new_kv(
284 base_url: &str,
285 metrics: Arc<KeyValueStoreMetrics>,
286 ) -> IotaResult<TransactionKeyValueStore> {
287 let inner = Arc::new(Self::new(base_url)?);
288 Ok(TransactionKeyValueStore::new("http", metrics, inner))
289 }
290
291 pub fn new(base_url: &str) -> IotaResult<Self> {
292 info!("creating HttpKVStore with base_url: {}", base_url);
293
294 let client = Client::builder().http2_prior_knowledge().build().unwrap();
295
296 let base_url = if base_url.ends_with('/') {
297 base_url.to_string()
298 } else {
299 format!("{}/", base_url)
300 };
301
302 let base_url = Url::parse(&base_url).into_iota_result()?;
303
304 Ok(Self { base_url, client })
305 }
306
307 fn get_url(&self, key: &Key) -> IotaResult<Url> {
308 let (item_type, digest) = key.to_path_elements();
309 let joined = self
310 .base_url
311 .join(&format!("{item_type}/{digest}"))
312 .into_iota_result()?;
313 Url::from_str(joined.as_str()).into_iota_result()
314 }
315
316 async fn multi_fetch(&self, uris: Vec<Key>) -> Vec<IotaResult<Option<Bytes>>> {
317 let uris_vec = uris.to_vec();
318 let fetches = stream::iter(uris_vec.into_iter().map(|url| self.fetch(url)));
319 fetches.buffered(uris.len()).collect::<Vec<_>>().await
320 }
321
322 async fn fetch(&self, key: Key) -> IotaResult<Option<Bytes>> {
323 let url = self.get_url(&key)?;
324 trace!("fetching url: {}", url);
325 let resp = self
326 .client
327 .get(url.clone())
328 .send()
329 .await
330 .into_iota_result()?;
331 trace!(
332 "got response {} for url: {}, len: {:?}",
333 url,
334 resp.status(),
335 resp.headers()
336 .get(CONTENT_LENGTH)
337 .unwrap_or(&HeaderValue::from_static("0"))
338 );
339 if resp.status().is_success() {
341 resp.bytes().await.map(Some).into_iota_result()
342 } else {
343 Ok(None)
344 }
345 }
346}
347
348fn deser<K, T>(key: &K, bytes: &[u8]) -> Option<T>
349where
350 K: std::fmt::Debug,
351 T: for<'de> Deserialize<'de>,
352{
353 bcs::from_bytes(bytes)
354 .tap_err(|e| warn!("Error deserializing data for key {:?}: {:?}", key, e))
355 .ok()
356}
357
358fn map_fetch<'a, K>(fetch: (&'a IotaResult<Option<Bytes>>, &'a K)) -> Option<(&'a Bytes, &'a K)>
359where
360 K: std::fmt::Debug,
361{
362 let (fetch, key) = fetch;
363 match fetch {
364 Ok(Some(bytes)) => Some((bytes, key)),
365 Ok(None) => None,
366 Err(err) => {
367 warn!("Error fetching key: {:?}, error: {:?}", key, err);
368 None
369 }
370 }
371}
372
373fn multi_split_slice<'a, T>(slice: &'a [T], lengths: &'a [usize]) -> Vec<&'a [T]> {
374 let mut start = 0;
375 lengths
376 .iter()
377 .map(|length| {
378 let end = start + length;
379 let result = &slice[start..end];
380 start = end;
381 result
382 })
383 .collect()
384}
385
386fn deser_check_digest<T, D>(
387 digest: &D,
388 bytes: &Bytes,
389 get_expected_digest: impl FnOnce(&T) -> D,
390) -> Option<T>
391where
392 D: std::fmt::Debug + PartialEq,
393 T: for<'de> Deserialize<'de>,
394{
395 deser(digest, bytes).and_then(|o: T| {
396 let expected_digest = get_expected_digest(&o);
397 if expected_digest == *digest {
398 Some(o)
399 } else {
400 error!(
401 "Digest mismatch - expected: {:?}, got: {:?}",
402 digest, expected_digest,
403 );
404 None
405 }
406 })
407}
408
409#[async_trait]
410impl TransactionKeyValueStoreTrait for HttpKVStore {
411 #[instrument(level = "trace", skip_all)]
412 async fn multi_get(
413 &self,
414 transaction_keys: &[TransactionDigest],
415 effects_keys: &[TransactionDigest],
416 ) -> IotaResult<KVStoreTransactionData> {
417 let num_txns = transaction_keys.len();
418 let num_effects = effects_keys.len();
419
420 let keys = transaction_keys
421 .iter()
422 .map(|tx| Key::Transaction(*tx))
423 .chain(effects_keys.iter().map(|fx| Key::TransactionEffects(*fx)))
424 .collect::<Vec<_>>();
425
426 let fetches = self.multi_fetch(keys).await;
427 let txn_slice = fetches[..num_txns].to_vec();
428 let fx_slice = fetches[num_txns..num_txns + num_effects].to_vec();
429
430 let txn_results = txn_slice
431 .iter()
432 .take(num_txns)
433 .zip(transaction_keys.iter())
434 .map(map_fetch)
435 .map(|maybe_bytes| {
436 maybe_bytes.and_then(|(bytes, digest)| {
437 deser_check_digest(digest, bytes, |tx: &Transaction| *tx.digest())
438 })
439 })
440 .collect::<Vec<_>>();
441
442 let fx_results = fx_slice
443 .iter()
444 .take(num_effects)
445 .zip(effects_keys.iter())
446 .map(map_fetch)
447 .map(|maybe_bytes| {
448 maybe_bytes.and_then(|(bytes, digest)| {
449 deser_check_digest(digest, bytes, |fx: &TransactionEffects| {
450 *fx.transaction_digest()
451 })
452 })
453 })
454 .collect::<Vec<_>>();
455
456 Ok((txn_results, fx_results))
457 }
458
459 #[instrument(level = "trace", skip_all)]
460 async fn multi_get_checkpoints(
461 &self,
462 checkpoint_summaries: &[CheckpointSequenceNumber],
463 checkpoint_contents: &[CheckpointSequenceNumber],
464 checkpoint_summaries_by_digest: &[CheckpointDigest],
465 ) -> IotaResult<(
466 Vec<Option<CertifiedCheckpointSummary>>,
467 Vec<Option<CheckpointContents>>,
468 Vec<Option<CertifiedCheckpointSummary>>,
469 )> {
470 let keys = checkpoint_summaries
471 .iter()
472 .map(|cp| Key::CheckpointSummary(*cp))
473 .chain(
474 checkpoint_contents
475 .iter()
476 .map(|cp| Key::CheckpointContents(*cp)),
477 )
478 .chain(
479 checkpoint_summaries_by_digest
480 .iter()
481 .map(|cp| Key::CheckpointSummaryByDigest(*cp)),
482 )
483 .collect::<Vec<_>>();
484
485 let summaries_len = checkpoint_summaries.len();
486 let contents_len = checkpoint_contents.len();
487 let summaries_by_digest_len = checkpoint_summaries_by_digest.len();
488
489 let fetches = self.multi_fetch(keys).await;
490
491 let input_slices = [summaries_len, contents_len, summaries_by_digest_len];
492
493 let result_slices = multi_split_slice(&fetches, &input_slices);
494
495 let summaries_results = result_slices[0]
496 .iter()
497 .zip(checkpoint_summaries.iter())
498 .map(map_fetch)
499 .map(|maybe_bytes| {
500 maybe_bytes
501 .and_then(|(bytes, seq)| deser::<_, CertifiedCheckpointSummary>(seq, bytes))
502 })
503 .collect::<Vec<_>>();
504
505 let contents_results = result_slices[1]
506 .iter()
507 .zip(checkpoint_contents.iter())
508 .map(map_fetch)
509 .map(|maybe_bytes| {
510 maybe_bytes.and_then(|(bytes, seq)| deser::<_, CheckpointContents>(seq, bytes))
511 })
512 .collect::<Vec<_>>();
513
514 let summaries_by_digest_results = result_slices[2]
515 .iter()
516 .zip(checkpoint_summaries_by_digest.iter())
517 .map(map_fetch)
518 .map(|maybe_bytes| {
519 maybe_bytes.and_then(|(bytes, digest)| {
520 deser_check_digest(digest, bytes, |s: &CertifiedCheckpointSummary| *s.digest())
521 })
522 })
523 .collect::<Vec<_>>();
524
525 Ok((
526 summaries_results,
527 contents_results,
528 summaries_by_digest_results,
529 ))
530 }
531
532 #[instrument(level = "trace", skip_all)]
533 async fn get_transaction_perpetual_checkpoint(
534 &self,
535 digest: TransactionDigest,
536 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
537 let key = Key::TransactionToCheckpoint(digest);
538 self.fetch(key).await.map(|maybe| {
539 maybe.and_then(|bytes| deser::<_, CheckpointSequenceNumber>(&key, bytes.as_ref()))
540 })
541 }
542
543 #[instrument(level = "trace", skip_all)]
544 async fn get_object(
545 &self,
546 object_id: ObjectID,
547 version: SequenceNumber,
548 ) -> IotaResult<Option<Object>> {
549 let key = Key::ObjectKey(object_id, version);
550 self.fetch(key)
551 .await
552 .map(|maybe| maybe.and_then(|bytes| deser::<_, Object>(&key, bytes.as_ref())))
553 }
554
555 #[instrument(level = "trace", skip_all)]
556 async fn multi_get_transactions_perpetual_checkpoints(
557 &self,
558 digests: &[TransactionDigest],
559 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
560 let keys = digests
561 .iter()
562 .map(|digest| Key::TransactionToCheckpoint(*digest))
563 .collect::<Vec<_>>();
564
565 let fetches = self.multi_fetch(keys).await;
566
567 let results = fetches
568 .iter()
569 .zip(digests.iter())
570 .map(map_fetch)
571 .map(|maybe_bytes| {
572 maybe_bytes
573 .and_then(|(bytes, key)| deser::<_, CheckpointSequenceNumber>(&key, bytes))
574 })
575 .collect::<Vec<_>>();
576
577 Ok(results)
578 }
579
580 #[instrument(level = "trace", skip_all)]
581 async fn multi_get_events_by_tx_digests(
582 &self,
583 digests: &[TransactionDigest],
584 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
585 let keys = digests
586 .iter()
587 .map(|digest| Key::EventsByTransactionDigest(*digest))
588 .collect::<Vec<_>>();
589 Ok(self
590 .multi_fetch(keys)
591 .await
592 .iter()
593 .zip(digests.iter())
594 .map(map_fetch)
595 .map(|maybe_bytes| {
596 maybe_bytes.and_then(|(bytes, key)| deser::<_, TransactionEvents>(&key, bytes))
597 })
598 .collect::<Vec<_>>())
599 }
600}