1use std::{sync::Arc, time::Instant};
9
10use async_trait::async_trait;
11use iota_sdk_types::ObjectId;
12use iota_types::{
13 base_types::{SequenceNumber, VersionNumber},
14 digests::{CheckpointDigest, TransactionDigest},
15 effects::{TransactionEffects, TransactionEvents},
16 error::{IotaError, IotaResult, UserInputError},
17 messages_checkpoint::{
18 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
19 },
20 object::Object,
21 storage::ObjectKey,
22 transaction::Transaction,
23};
24use tracing::instrument;
25
26use crate::key_value_store_metrics::KeyValueStoreMetrics;
27
28pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
29
30pub type KVStoreCheckpointData = (
31 Vec<Option<CertifiedCheckpointSummary>>,
32 Vec<Option<CheckpointContents>>,
33 Vec<Option<CertifiedCheckpointSummary>>,
34);
35
36pub struct TransactionKeyValueStore {
37 store_name: &'static str,
38 metrics: Arc<KeyValueStoreMetrics>,
39 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
40}
41
42impl TransactionKeyValueStore {
43 pub fn new(
44 store_name: &'static str,
45 metrics: Arc<KeyValueStoreMetrics>,
46 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
47 ) -> Self {
48 Self {
49 store_name,
50 metrics,
51 inner,
52 }
53 }
54
55 pub async fn multi_get(
58 &self,
59 transaction_keys: &[TransactionDigest],
60 effects_keys: &[TransactionDigest],
61 ) -> IotaResult<KVStoreTransactionData> {
62 let start = Instant::now();
63 let res = self.inner.multi_get(transaction_keys, effects_keys).await;
64 let elapsed = start.elapsed();
65
66 let num_txns = transaction_keys.len() as u64;
67 let num_effects = effects_keys.len() as u64;
68 let total_keys = num_txns + num_effects;
69
70 self.metrics
71 .key_value_store_num_fetches_latency_ms
72 .with_label_values(&[self.store_name, "tx"])
73 .observe(elapsed.as_millis() as f64);
74 self.metrics
75 .key_value_store_num_fetches_batch_size
76 .with_label_values(&[self.store_name, "tx"])
77 .observe(total_keys as f64);
78
79 if let Ok((transactions, effects)) = &res {
80 let txns_not_found = transactions.iter().filter(|v| v.is_none()).count() as u64;
81 let effects_not_found = effects.iter().filter(|v| v.is_none()).count() as u64;
82
83 if num_txns > 0 {
84 self.metrics
85 .key_value_store_num_fetches_success
86 .with_label_values(&[self.store_name, "tx"])
87 .inc_by(num_txns);
88 }
89 if num_effects > 0 {
90 self.metrics
91 .key_value_store_num_fetches_success
92 .with_label_values(&[self.store_name, "fx"])
93 .inc_by(num_effects);
94 }
95
96 if txns_not_found > 0 {
97 self.metrics
98 .key_value_store_num_fetches_not_found
99 .with_label_values(&[self.store_name, "tx"])
100 .inc_by(txns_not_found);
101 }
102 if effects_not_found > 0 {
103 self.metrics
104 .key_value_store_num_fetches_not_found
105 .with_label_values(&[self.store_name, "fx"])
106 .inc_by(effects_not_found);
107 }
108 } else {
109 self.metrics
110 .key_value_store_num_fetches_error
111 .with_label_values(&[self.store_name, "tx"])
112 .inc_by(num_txns);
113 self.metrics
114 .key_value_store_num_fetches_error
115 .with_label_values(&[self.store_name, "fx"])
116 .inc_by(num_effects);
117 }
118
119 res
120 }
121
122 pub async fn multi_get_checkpoints(
123 &self,
124 checkpoint_summaries: &[CheckpointSequenceNumber],
125 checkpoint_contents: &[CheckpointSequenceNumber],
126 checkpoint_summaries_by_digest: &[CheckpointDigest],
127 ) -> IotaResult<(
128 Vec<Option<CertifiedCheckpointSummary>>,
129 Vec<Option<CheckpointContents>>,
130 Vec<Option<CertifiedCheckpointSummary>>,
131 )> {
132 let start = Instant::now();
133 let res = self
134 .inner
135 .multi_get_checkpoints(
136 checkpoint_summaries,
137 checkpoint_contents,
138 checkpoint_summaries_by_digest,
139 )
140 .await;
141 let elapsed = start.elapsed();
142
143 let num_summaries =
144 checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
145 let num_contents = checkpoint_contents.len() as u64;
146
147 self.metrics
148 .key_value_store_num_fetches_latency_ms
149 .with_label_values(&[self.store_name, "checkpoint"])
150 .observe(elapsed.as_millis() as f64);
151 self.metrics
152 .key_value_store_num_fetches_batch_size
153 .with_label_values(&[self.store_name, "checkpoint_summary"])
154 .observe(num_summaries as f64);
155 self.metrics
156 .key_value_store_num_fetches_batch_size
157 .with_label_values(&[self.store_name, "checkpoint_content"])
158 .observe(num_contents as f64);
159
160 if let Ok((summaries, contents, summaries_by_digest)) = &res {
161 let summaries_not_found = summaries.iter().filter(|v| v.is_none()).count() as u64
162 + summaries_by_digest.iter().filter(|v| v.is_none()).count() as u64;
163 let contents_not_found = contents.iter().filter(|v| v.is_none()).count() as u64;
164
165 if num_summaries > 0 {
166 self.metrics
167 .key_value_store_num_fetches_success
168 .with_label_values(&[self.store_name, "ckpt_summary"])
169 .inc_by(num_summaries);
170 }
171 if num_contents > 0 {
172 self.metrics
173 .key_value_store_num_fetches_success
174 .with_label_values(&[self.store_name, "ckpt_contents"])
175 .inc_by(num_contents);
176 }
177
178 if summaries_not_found > 0 {
179 self.metrics
180 .key_value_store_num_fetches_not_found
181 .with_label_values(&[self.store_name, "ckpt_summary"])
182 .inc_by(summaries_not_found);
183 }
184 if contents_not_found > 0 {
185 self.metrics
186 .key_value_store_num_fetches_not_found
187 .with_label_values(&[self.store_name, "ckpt_contents"])
188 .inc_by(contents_not_found);
189 }
190 } else {
191 self.metrics
192 .key_value_store_num_fetches_error
193 .with_label_values(&[self.store_name, "ckpt_summary"])
194 .inc_by(num_summaries);
195 self.metrics
196 .key_value_store_num_fetches_error
197 .with_label_values(&[self.store_name, "ckpt_contents"])
198 .inc_by(num_contents);
199 }
200
201 res
202 }
203
204 pub async fn multi_get_checkpoints_summaries(
205 &self,
206 keys: &[CheckpointSequenceNumber],
207 ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
208 self.multi_get_checkpoints(keys, &[], &[])
209 .await
210 .map(|(summaries, _, _)| summaries)
211 }
212
213 pub async fn multi_get_checkpoints_contents(
214 &self,
215 keys: &[CheckpointSequenceNumber],
216 ) -> IotaResult<Vec<Option<CheckpointContents>>> {
217 self.multi_get_checkpoints(&[], keys, &[])
218 .await
219 .map(|(_, contents, _)| contents)
220 }
221
222 pub async fn multi_get_checkpoints_summaries_by_digest(
223 &self,
224 keys: &[CheckpointDigest],
225 ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
226 self.multi_get_checkpoints(&[], &[], keys)
227 .await
228 .map(|(_, _, summaries)| summaries)
229 }
230
231 pub async fn multi_get_tx(
232 &self,
233 keys: &[TransactionDigest],
234 ) -> IotaResult<Vec<Option<Transaction>>> {
235 self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
236 }
237
238 pub async fn multi_get_fx_by_tx_digest(
239 &self,
240 keys: &[TransactionDigest],
241 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
242 self.multi_get(&[], keys).await.map(|(_, fx)| fx)
243 }
244
245 pub async fn get_tx(&self, digest: TransactionDigest) -> IotaResult<Transaction> {
248 self.multi_get_tx(&[digest])
249 .await?
250 .into_iter()
251 .next()
252 .flatten()
253 .ok_or(IotaError::TransactionNotFound { digest })
254 }
255
256 pub async fn get_fx_by_tx_digest(
260 &self,
261 digest: TransactionDigest,
262 ) -> IotaResult<TransactionEffects> {
263 self.multi_get_fx_by_tx_digest(&[digest])
264 .await?
265 .into_iter()
266 .next()
267 .flatten()
268 .ok_or(IotaError::TransactionNotFound { digest })
269 }
270
271 pub async fn get_checkpoint_summary(
275 &self,
276 checkpoint: CheckpointSequenceNumber,
277 ) -> IotaResult<CertifiedCheckpointSummary> {
278 self.multi_get_checkpoints_summaries(&[checkpoint])
279 .await?
280 .into_iter()
281 .next()
282 .flatten()
283 .ok_or(IotaError::UserInput {
284 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
285 })
286 }
287
288 pub async fn get_checkpoint_contents(
292 &self,
293 checkpoint: CheckpointSequenceNumber,
294 ) -> IotaResult<CheckpointContents> {
295 self.multi_get_checkpoints_contents(&[checkpoint])
296 .await?
297 .into_iter()
298 .next()
299 .flatten()
300 .ok_or(IotaError::UserInput {
301 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
302 })
303 }
304
305 pub async fn get_checkpoint_summary_by_digest(
309 &self,
310 digest: CheckpointDigest,
311 ) -> IotaResult<CertifiedCheckpointSummary> {
312 self.multi_get_checkpoints_summaries_by_digest(&[digest])
313 .await?
314 .into_iter()
315 .next()
316 .flatten()
317 .ok_or(IotaError::UserInput {
318 error: UserInputError::VerifiedCheckpointDigestNotFound(format!("{digest}")),
319 })
320 }
321
322 pub async fn get_transaction_perpetual_checkpoint(
323 &self,
324 digest: TransactionDigest,
325 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
326 self.inner
327 .get_transaction_perpetual_checkpoint(digest)
328 .await
329 }
330
331 pub async fn get_object(
332 &self,
333 object_id: ObjectId,
334 version: VersionNumber,
335 ) -> IotaResult<Option<Object>> {
336 self.inner.get_object(object_id, version).await
337 }
338
339 pub async fn multi_get_objects(
340 &self,
341 object_keys: &[ObjectKey],
342 ) -> IotaResult<Vec<Option<Object>>> {
343 self.inner.multi_get_objects(object_keys).await
344 }
345
346 pub async fn multi_get_transactions_perpetual_checkpoints(
347 &self,
348 digests: &[TransactionDigest],
349 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
350 self.inner
351 .multi_get_transactions_perpetual_checkpoints(digests)
352 .await
353 }
354
355 pub async fn multi_get_events_by_tx_digests(
356 &self,
357 digests: &[TransactionDigest],
358 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
359 self.inner.multi_get_events_by_tx_digests(digests).await
360 }
361}
362
363#[async_trait]
367pub trait TransactionKeyValueStoreTrait {
368 async fn multi_get(
371 &self,
372 transaction_keys: &[TransactionDigest],
373 effects_keys: &[TransactionDigest],
374 ) -> IotaResult<KVStoreTransactionData>;
375
376 async fn multi_get_checkpoints(
379 &self,
380 checkpoint_summaries: &[CheckpointSequenceNumber],
381 checkpoint_contents: &[CheckpointSequenceNumber],
382 checkpoint_summaries_by_digest: &[CheckpointDigest],
383 ) -> IotaResult<KVStoreCheckpointData>;
384
385 async fn get_transaction_perpetual_checkpoint(
386 &self,
387 digest: TransactionDigest,
388 ) -> IotaResult<Option<CheckpointSequenceNumber>>;
389
390 async fn get_object(
391 &self,
392 object_id: ObjectId,
393 version: SequenceNumber,
394 ) -> IotaResult<Option<Object>>;
395
396 async fn multi_get_objects(&self, object_keys: &[ObjectKey])
397 -> IotaResult<Vec<Option<Object>>>;
398
399 async fn multi_get_transactions_perpetual_checkpoints(
400 &self,
401 digests: &[TransactionDigest],
402 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>>;
403
404 async fn multi_get_events_by_tx_digests(
405 &self,
406 digests: &[TransactionDigest],
407 ) -> IotaResult<Vec<Option<TransactionEvents>>>;
408}
409
410pub struct FallbackTransactionKVStore {
416 primary: TransactionKeyValueStore,
417 fallback: TransactionKeyValueStore,
418}
419
420impl FallbackTransactionKVStore {
421 pub fn new_kv(
422 primary: TransactionKeyValueStore,
423 fallback: TransactionKeyValueStore,
424 metrics: Arc<KeyValueStoreMetrics>,
425 label: &'static str,
426 ) -> TransactionKeyValueStore {
427 let store = Arc::new(Self { primary, fallback });
428 TransactionKeyValueStore::new(label, metrics, store)
429 }
430}
431
432#[async_trait]
433impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
434 #[instrument(level = "trace", skip_all)]
435 async fn multi_get(
436 &self,
437 transaction_keys: &[TransactionDigest],
438 effects_keys: &[TransactionDigest],
439 ) -> IotaResult<KVStoreTransactionData> {
440 let (mut transactions, mut effects) = self
441 .primary
442 .multi_get(transaction_keys, effects_keys)
443 .await?;
444
445 let (fallback_transaction_keys, indices_transactions) =
446 find_fallback(&transactions, transaction_keys);
447 let (fallback_effects_keys, indices_effects) = find_fallback(&effects, effects_keys);
448
449 if fallback_transaction_keys.is_empty() && fallback_effects_keys.is_empty() {
450 return Ok((transactions, effects));
451 }
452
453 let (fallback_transactions, fallback_effects) = self
454 .fallback
455 .multi_get(&fallback_transaction_keys, &fallback_effects_keys)
456 .await?;
457
458 merge_res(
459 &mut transactions,
460 fallback_transactions,
461 &indices_transactions,
462 );
463 merge_res(&mut effects, fallback_effects, &indices_effects);
464
465 Ok((transactions, effects))
466 }
467
468 #[instrument(level = "trace", skip_all)]
469 async fn multi_get_checkpoints(
470 &self,
471 checkpoint_summaries: &[CheckpointSequenceNumber],
472 checkpoint_contents: &[CheckpointSequenceNumber],
473 checkpoint_summaries_by_digest: &[CheckpointDigest],
474 ) -> IotaResult<(
475 Vec<Option<CertifiedCheckpointSummary>>,
476 Vec<Option<CheckpointContents>>,
477 Vec<Option<CertifiedCheckpointSummary>>,
478 )> {
479 let (mut summaries, mut contents, mut summaries_by_digest) = self
480 .primary
481 .multi_get_checkpoints(
482 checkpoint_summaries,
483 checkpoint_contents,
484 checkpoint_summaries_by_digest,
485 )
486 .await?;
487
488 let (fallback_summaries, indices_summaries) =
489 find_fallback(&summaries, checkpoint_summaries);
490 let (fallback_contents, indices_contents) = find_fallback(&contents, checkpoint_contents);
491 let (fallback_summaries_by_digest, indices_summaries_by_digest) =
492 find_fallback(&summaries_by_digest, checkpoint_summaries_by_digest);
493
494 if fallback_summaries.is_empty()
495 && fallback_contents.is_empty()
496 && fallback_summaries_by_digest.is_empty()
497 {
498 return Ok((summaries, contents, summaries_by_digest));
499 }
500
501 let (fallback_summaries, fallback_contents, fallback_summaries_by_digest) = self
502 .fallback
503 .multi_get_checkpoints(
504 &fallback_summaries,
505 &fallback_contents,
506 &fallback_summaries_by_digest,
507 )
508 .await?;
509
510 merge_res(&mut summaries, fallback_summaries, &indices_summaries);
511 merge_res(&mut contents, fallback_contents, &indices_contents);
512 merge_res(
513 &mut summaries_by_digest,
514 fallback_summaries_by_digest,
515 &indices_summaries_by_digest,
516 );
517
518 Ok((summaries, contents, summaries_by_digest))
519 }
520
521 #[instrument(level = "trace", skip_all)]
522 async fn get_transaction_perpetual_checkpoint(
523 &self,
524 digest: TransactionDigest,
525 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
526 let mut res = self
527 .primary
528 .get_transaction_perpetual_checkpoint(digest)
529 .await?;
530 if res.is_none() {
531 res = self
532 .fallback
533 .get_transaction_perpetual_checkpoint(digest)
534 .await?;
535 }
536 Ok(res)
537 }
538
539 #[instrument(level = "trace", skip_all)]
540 async fn get_object(
541 &self,
542 object_id: ObjectId,
543 version: SequenceNumber,
544 ) -> IotaResult<Option<Object>> {
545 let mut res = self.primary.get_object(object_id, version).await?;
546 if res.is_none() {
547 res = self.fallback.get_object(object_id, version).await?;
548 }
549 Ok(res)
550 }
551
552 #[instrument(level = "trace", skip_all)]
553 async fn multi_get_objects(
554 &self,
555 object_keys: &[ObjectKey],
556 ) -> IotaResult<Vec<Option<Object>>> {
557 let mut res = self.primary.multi_get_objects(object_keys).await?;
558
559 let (fallback, indices) = find_fallback(&res, object_keys);
560
561 if fallback.is_empty() {
562 return Ok(res);
563 }
564
565 let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
566
567 merge_res(&mut res, secondary_res, &indices);
568
569 Ok(res)
570 }
571
572 #[instrument(level = "trace", skip_all)]
573 async fn multi_get_transactions_perpetual_checkpoints(
574 &self,
575 digests: &[TransactionDigest],
576 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
577 let mut res = self
578 .primary
579 .multi_get_transactions_perpetual_checkpoints(digests)
580 .await?;
581
582 let (fallback, indices) = find_fallback(&res, digests);
583
584 if fallback.is_empty() {
585 return Ok(res);
586 }
587
588 let secondary_res = self
589 .fallback
590 .multi_get_transactions_perpetual_checkpoints(&fallback)
591 .await?;
592
593 merge_res(&mut res, secondary_res, &indices);
594
595 Ok(res)
596 }
597
598 #[instrument(level = "trace", skip_all)]
599 async fn multi_get_events_by_tx_digests(
600 &self,
601 digests: &[TransactionDigest],
602 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
603 let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
604 let (fallback, indices) = find_fallback(&res, digests);
605 if fallback.is_empty() {
606 return Ok(res);
607 }
608 let secondary_res = self
609 .fallback
610 .multi_get_events_by_tx_digests(&fallback)
611 .await?;
612 merge_res(&mut res, secondary_res, &indices);
613 Ok(res)
614 }
615}
616
617fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
618 let num_nones = values.iter().filter(|v| v.is_none()).count();
619 let mut fallback_keys = Vec::with_capacity(num_nones);
620 let mut fallback_indices = Vec::with_capacity(num_nones);
621 for (i, value) in values.iter().enumerate() {
622 if value.is_none() {
623 fallback_keys.push(keys[i].clone());
624 fallback_indices.push(i);
625 }
626 }
627 (fallback_keys, fallback_indices)
628}
629
630fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
631 for (&index, fallback_value) in indices.iter().zip(fallback_values) {
632 values[index] = fallback_value;
633 }
634}