1use std::{sync::Arc, time::Instant};
9
10use async_trait::async_trait;
11use iota_types::{
12 base_types::{ObjectID, SequenceNumber, VersionNumber},
13 digests::{CheckpointDigest, TransactionDigest},
14 effects::{TransactionEffects, TransactionEvents},
15 error::{IotaError, IotaResult, UserInputError},
16 messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18 },
19 object::Object,
20 storage::ObjectKey,
21 transaction::Transaction,
22};
23use tracing::instrument;
24
25use crate::key_value_store_metrics::KeyValueStoreMetrics;
26
27pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
28
29pub type KVStoreCheckpointData = (
30 Vec<Option<CertifiedCheckpointSummary>>,
31 Vec<Option<CheckpointContents>>,
32 Vec<Option<CertifiedCheckpointSummary>>,
33);
34
35pub struct TransactionKeyValueStore {
36 store_name: &'static str,
37 metrics: Arc<KeyValueStoreMetrics>,
38 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
39}
40
41impl TransactionKeyValueStore {
42 pub fn new(
43 store_name: &'static str,
44 metrics: Arc<KeyValueStoreMetrics>,
45 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
46 ) -> Self {
47 Self {
48 store_name,
49 metrics,
50 inner,
51 }
52 }
53
54 pub async fn multi_get(
57 &self,
58 transaction_keys: &[TransactionDigest],
59 effects_keys: &[TransactionDigest],
60 ) -> IotaResult<KVStoreTransactionData> {
61 let start = Instant::now();
62 let res = self.inner.multi_get(transaction_keys, effects_keys).await;
63 let elapsed = start.elapsed();
64
65 let num_txns = transaction_keys.len() as u64;
66 let num_effects = effects_keys.len() as u64;
67 let total_keys = num_txns + num_effects;
68
69 self.metrics
70 .key_value_store_num_fetches_latency_ms
71 .with_label_values(&[self.store_name, "tx"])
72 .observe(elapsed.as_millis() as f64);
73 self.metrics
74 .key_value_store_num_fetches_batch_size
75 .with_label_values(&[self.store_name, "tx"])
76 .observe(total_keys as f64);
77
78 if let Ok((transactions, effects)) = &res {
79 let txns_not_found = transactions.iter().filter(|v| v.is_none()).count() as u64;
80 let effects_not_found = effects.iter().filter(|v| v.is_none()).count() as u64;
81
82 if num_txns > 0 {
83 self.metrics
84 .key_value_store_num_fetches_success
85 .with_label_values(&[self.store_name, "tx"])
86 .inc_by(num_txns);
87 }
88 if num_effects > 0 {
89 self.metrics
90 .key_value_store_num_fetches_success
91 .with_label_values(&[self.store_name, "fx"])
92 .inc_by(num_effects);
93 }
94
95 if txns_not_found > 0 {
96 self.metrics
97 .key_value_store_num_fetches_not_found
98 .with_label_values(&[self.store_name, "tx"])
99 .inc_by(txns_not_found);
100 }
101 if effects_not_found > 0 {
102 self.metrics
103 .key_value_store_num_fetches_not_found
104 .with_label_values(&[self.store_name, "fx"])
105 .inc_by(effects_not_found);
106 }
107 } else {
108 self.metrics
109 .key_value_store_num_fetches_error
110 .with_label_values(&[self.store_name, "tx"])
111 .inc_by(num_txns);
112 self.metrics
113 .key_value_store_num_fetches_error
114 .with_label_values(&[self.store_name, "fx"])
115 .inc_by(num_effects);
116 }
117
118 res
119 }
120
121 pub async fn multi_get_checkpoints(
122 &self,
123 checkpoint_summaries: &[CheckpointSequenceNumber],
124 checkpoint_contents: &[CheckpointSequenceNumber],
125 checkpoint_summaries_by_digest: &[CheckpointDigest],
126 ) -> IotaResult<(
127 Vec<Option<CertifiedCheckpointSummary>>,
128 Vec<Option<CheckpointContents>>,
129 Vec<Option<CertifiedCheckpointSummary>>,
130 )> {
131 let start = Instant::now();
132 let res = self
133 .inner
134 .multi_get_checkpoints(
135 checkpoint_summaries,
136 checkpoint_contents,
137 checkpoint_summaries_by_digest,
138 )
139 .await;
140 let elapsed = start.elapsed();
141
142 let num_summaries =
143 checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
144 let num_contents = checkpoint_contents.len() as u64;
145
146 self.metrics
147 .key_value_store_num_fetches_latency_ms
148 .with_label_values(&[self.store_name, "checkpoint"])
149 .observe(elapsed.as_millis() as f64);
150 self.metrics
151 .key_value_store_num_fetches_batch_size
152 .with_label_values(&[self.store_name, "checkpoint_summary"])
153 .observe(num_summaries as f64);
154 self.metrics
155 .key_value_store_num_fetches_batch_size
156 .with_label_values(&[self.store_name, "checkpoint_content"])
157 .observe(num_contents as f64);
158
159 if let Ok((summaries, contents, summaries_by_digest)) = &res {
160 let summaries_not_found = summaries.iter().filter(|v| v.is_none()).count() as u64
161 + summaries_by_digest.iter().filter(|v| v.is_none()).count() as u64;
162 let contents_not_found = contents.iter().filter(|v| v.is_none()).count() as u64;
163
164 if num_summaries > 0 {
165 self.metrics
166 .key_value_store_num_fetches_success
167 .with_label_values(&[self.store_name, "ckpt_summary"])
168 .inc_by(num_summaries);
169 }
170 if num_contents > 0 {
171 self.metrics
172 .key_value_store_num_fetches_success
173 .with_label_values(&[self.store_name, "ckpt_contents"])
174 .inc_by(num_contents);
175 }
176
177 if summaries_not_found > 0 {
178 self.metrics
179 .key_value_store_num_fetches_not_found
180 .with_label_values(&[self.store_name, "ckpt_summary"])
181 .inc_by(summaries_not_found);
182 }
183 if contents_not_found > 0 {
184 self.metrics
185 .key_value_store_num_fetches_not_found
186 .with_label_values(&[self.store_name, "ckpt_contents"])
187 .inc_by(contents_not_found);
188 }
189 } else {
190 self.metrics
191 .key_value_store_num_fetches_error
192 .with_label_values(&[self.store_name, "ckpt_summary"])
193 .inc_by(num_summaries);
194 self.metrics
195 .key_value_store_num_fetches_error
196 .with_label_values(&[self.store_name, "ckpt_contents"])
197 .inc_by(num_contents);
198 }
199
200 res
201 }
202
203 pub async fn multi_get_checkpoints_summaries(
204 &self,
205 keys: &[CheckpointSequenceNumber],
206 ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
207 self.multi_get_checkpoints(keys, &[], &[])
208 .await
209 .map(|(summaries, _, _)| summaries)
210 }
211
212 pub async fn multi_get_checkpoints_contents(
213 &self,
214 keys: &[CheckpointSequenceNumber],
215 ) -> IotaResult<Vec<Option<CheckpointContents>>> {
216 self.multi_get_checkpoints(&[], keys, &[])
217 .await
218 .map(|(_, contents, _)| contents)
219 }
220
221 pub async fn multi_get_checkpoints_summaries_by_digest(
222 &self,
223 keys: &[CheckpointDigest],
224 ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
225 self.multi_get_checkpoints(&[], &[], keys)
226 .await
227 .map(|(_, _, summaries)| summaries)
228 }
229
230 pub async fn multi_get_tx(
231 &self,
232 keys: &[TransactionDigest],
233 ) -> IotaResult<Vec<Option<Transaction>>> {
234 self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
235 }
236
237 pub async fn multi_get_fx_by_tx_digest(
238 &self,
239 keys: &[TransactionDigest],
240 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
241 self.multi_get(&[], keys).await.map(|(_, fx)| fx)
242 }
243
244 pub async fn get_tx(&self, digest: TransactionDigest) -> IotaResult<Transaction> {
247 self.multi_get_tx(&[digest])
248 .await?
249 .into_iter()
250 .next()
251 .flatten()
252 .ok_or(IotaError::TransactionNotFound { digest })
253 }
254
255 pub async fn get_fx_by_tx_digest(
259 &self,
260 digest: TransactionDigest,
261 ) -> IotaResult<TransactionEffects> {
262 self.multi_get_fx_by_tx_digest(&[digest])
263 .await?
264 .into_iter()
265 .next()
266 .flatten()
267 .ok_or(IotaError::TransactionNotFound { digest })
268 }
269
270 pub async fn get_checkpoint_summary(
274 &self,
275 checkpoint: CheckpointSequenceNumber,
276 ) -> IotaResult<CertifiedCheckpointSummary> {
277 self.multi_get_checkpoints_summaries(&[checkpoint])
278 .await?
279 .into_iter()
280 .next()
281 .flatten()
282 .ok_or(IotaError::UserInput {
283 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
284 })
285 }
286
287 pub async fn get_checkpoint_contents(
291 &self,
292 checkpoint: CheckpointSequenceNumber,
293 ) -> IotaResult<CheckpointContents> {
294 self.multi_get_checkpoints_contents(&[checkpoint])
295 .await?
296 .into_iter()
297 .next()
298 .flatten()
299 .ok_or(IotaError::UserInput {
300 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
301 })
302 }
303
304 pub async fn get_checkpoint_summary_by_digest(
308 &self,
309 digest: CheckpointDigest,
310 ) -> IotaResult<CertifiedCheckpointSummary> {
311 self.multi_get_checkpoints_summaries_by_digest(&[digest])
312 .await?
313 .into_iter()
314 .next()
315 .flatten()
316 .ok_or(IotaError::UserInput {
317 error: UserInputError::VerifiedCheckpointDigestNotFound(format!("{digest:?}")),
318 })
319 }
320
321 pub async fn get_transaction_perpetual_checkpoint(
322 &self,
323 digest: TransactionDigest,
324 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
325 self.inner
326 .get_transaction_perpetual_checkpoint(digest)
327 .await
328 }
329
330 pub async fn get_object(
331 &self,
332 object_id: ObjectID,
333 version: VersionNumber,
334 ) -> IotaResult<Option<Object>> {
335 self.inner.get_object(object_id, version).await
336 }
337
338 pub async fn multi_get_objects(
339 &self,
340 object_keys: &[ObjectKey],
341 ) -> IotaResult<Vec<Option<Object>>> {
342 self.inner.multi_get_objects(object_keys).await
343 }
344
345 pub async fn multi_get_transactions_perpetual_checkpoints(
346 &self,
347 digests: &[TransactionDigest],
348 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
349 self.inner
350 .multi_get_transactions_perpetual_checkpoints(digests)
351 .await
352 }
353
354 pub async fn multi_get_events_by_tx_digests(
355 &self,
356 digests: &[TransactionDigest],
357 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
358 self.inner.multi_get_events_by_tx_digests(digests).await
359 }
360}
361
362#[async_trait]
366pub trait TransactionKeyValueStoreTrait {
367 async fn multi_get(
370 &self,
371 transaction_keys: &[TransactionDigest],
372 effects_keys: &[TransactionDigest],
373 ) -> IotaResult<KVStoreTransactionData>;
374
375 async fn multi_get_checkpoints(
378 &self,
379 checkpoint_summaries: &[CheckpointSequenceNumber],
380 checkpoint_contents: &[CheckpointSequenceNumber],
381 checkpoint_summaries_by_digest: &[CheckpointDigest],
382 ) -> IotaResult<KVStoreCheckpointData>;
383
384 async fn get_transaction_perpetual_checkpoint(
385 &self,
386 digest: TransactionDigest,
387 ) -> IotaResult<Option<CheckpointSequenceNumber>>;
388
389 async fn get_object(
390 &self,
391 object_id: ObjectID,
392 version: SequenceNumber,
393 ) -> IotaResult<Option<Object>>;
394
395 async fn multi_get_objects(&self, object_keys: &[ObjectKey])
396 -> IotaResult<Vec<Option<Object>>>;
397
398 async fn multi_get_transactions_perpetual_checkpoints(
399 &self,
400 digests: &[TransactionDigest],
401 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>>;
402
403 async fn multi_get_events_by_tx_digests(
404 &self,
405 digests: &[TransactionDigest],
406 ) -> IotaResult<Vec<Option<TransactionEvents>>>;
407}
408
409pub struct FallbackTransactionKVStore {
415 primary: TransactionKeyValueStore,
416 fallback: TransactionKeyValueStore,
417}
418
419impl FallbackTransactionKVStore {
420 pub fn new_kv(
421 primary: TransactionKeyValueStore,
422 fallback: TransactionKeyValueStore,
423 metrics: Arc<KeyValueStoreMetrics>,
424 label: &'static str,
425 ) -> TransactionKeyValueStore {
426 let store = Arc::new(Self { primary, fallback });
427 TransactionKeyValueStore::new(label, metrics, store)
428 }
429}
430
431#[async_trait]
432impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
433 #[instrument(level = "trace", skip_all)]
434 async fn multi_get(
435 &self,
436 transaction_keys: &[TransactionDigest],
437 effects_keys: &[TransactionDigest],
438 ) -> IotaResult<KVStoreTransactionData> {
439 let (mut transactions, mut effects) = self
440 .primary
441 .multi_get(transaction_keys, effects_keys)
442 .await?;
443
444 let (fallback_transaction_keys, indices_transactions) =
445 find_fallback(&transactions, transaction_keys);
446 let (fallback_effects_keys, indices_effects) = find_fallback(&effects, effects_keys);
447
448 if fallback_transaction_keys.is_empty() && fallback_effects_keys.is_empty() {
449 return Ok((transactions, effects));
450 }
451
452 let (fallback_transactions, fallback_effects) = self
453 .fallback
454 .multi_get(&fallback_transaction_keys, &fallback_effects_keys)
455 .await?;
456
457 merge_res(
458 &mut transactions,
459 fallback_transactions,
460 &indices_transactions,
461 );
462 merge_res(&mut effects, fallback_effects, &indices_effects);
463
464 Ok((transactions, effects))
465 }
466
467 #[instrument(level = "trace", skip_all)]
468 async fn multi_get_checkpoints(
469 &self,
470 checkpoint_summaries: &[CheckpointSequenceNumber],
471 checkpoint_contents: &[CheckpointSequenceNumber],
472 checkpoint_summaries_by_digest: &[CheckpointDigest],
473 ) -> IotaResult<(
474 Vec<Option<CertifiedCheckpointSummary>>,
475 Vec<Option<CheckpointContents>>,
476 Vec<Option<CertifiedCheckpointSummary>>,
477 )> {
478 let (mut summaries, mut contents, mut summaries_by_digest) = self
479 .primary
480 .multi_get_checkpoints(
481 checkpoint_summaries,
482 checkpoint_contents,
483 checkpoint_summaries_by_digest,
484 )
485 .await?;
486
487 let (fallback_summaries, indices_summaries) =
488 find_fallback(&summaries, checkpoint_summaries);
489 let (fallback_contents, indices_contents) = find_fallback(&contents, checkpoint_contents);
490 let (fallback_summaries_by_digest, indices_summaries_by_digest) =
491 find_fallback(&summaries_by_digest, checkpoint_summaries_by_digest);
492
493 if fallback_summaries.is_empty()
494 && fallback_contents.is_empty()
495 && fallback_summaries_by_digest.is_empty()
496 {
497 return Ok((summaries, contents, summaries_by_digest));
498 }
499
500 let (fallback_summaries, fallback_contents, fallback_summaries_by_digest) = self
501 .fallback
502 .multi_get_checkpoints(
503 &fallback_summaries,
504 &fallback_contents,
505 &fallback_summaries_by_digest,
506 )
507 .await?;
508
509 merge_res(&mut summaries, fallback_summaries, &indices_summaries);
510 merge_res(&mut contents, fallback_contents, &indices_contents);
511 merge_res(
512 &mut summaries_by_digest,
513 fallback_summaries_by_digest,
514 &indices_summaries_by_digest,
515 );
516
517 Ok((summaries, contents, summaries_by_digest))
518 }
519
520 #[instrument(level = "trace", skip_all)]
521 async fn get_transaction_perpetual_checkpoint(
522 &self,
523 digest: TransactionDigest,
524 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
525 let mut res = self
526 .primary
527 .get_transaction_perpetual_checkpoint(digest)
528 .await?;
529 if res.is_none() {
530 res = self
531 .fallback
532 .get_transaction_perpetual_checkpoint(digest)
533 .await?;
534 }
535 Ok(res)
536 }
537
538 #[instrument(level = "trace", skip_all)]
539 async fn get_object(
540 &self,
541 object_id: ObjectID,
542 version: SequenceNumber,
543 ) -> IotaResult<Option<Object>> {
544 let mut res = self.primary.get_object(object_id, version).await?;
545 if res.is_none() {
546 res = self.fallback.get_object(object_id, version).await?;
547 }
548 Ok(res)
549 }
550
551 #[instrument(level = "trace", skip_all)]
552 async fn multi_get_objects(
553 &self,
554 object_keys: &[ObjectKey],
555 ) -> IotaResult<Vec<Option<Object>>> {
556 let mut res = self.primary.multi_get_objects(object_keys).await?;
557
558 let (fallback, indices) = find_fallback(&res, object_keys);
559
560 if fallback.is_empty() {
561 return Ok(res);
562 }
563
564 let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
565
566 merge_res(&mut res, secondary_res, &indices);
567
568 Ok(res)
569 }
570
571 #[instrument(level = "trace", skip_all)]
572 async fn multi_get_transactions_perpetual_checkpoints(
573 &self,
574 digests: &[TransactionDigest],
575 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
576 let mut res = self
577 .primary
578 .multi_get_transactions_perpetual_checkpoints(digests)
579 .await?;
580
581 let (fallback, indices) = find_fallback(&res, digests);
582
583 if fallback.is_empty() {
584 return Ok(res);
585 }
586
587 let secondary_res = self
588 .fallback
589 .multi_get_transactions_perpetual_checkpoints(&fallback)
590 .await?;
591
592 merge_res(&mut res, secondary_res, &indices);
593
594 Ok(res)
595 }
596
597 #[instrument(level = "trace", skip_all)]
598 async fn multi_get_events_by_tx_digests(
599 &self,
600 digests: &[TransactionDigest],
601 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
602 let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
603 let (fallback, indices) = find_fallback(&res, digests);
604 if fallback.is_empty() {
605 return Ok(res);
606 }
607 let secondary_res = self
608 .fallback
609 .multi_get_events_by_tx_digests(&fallback)
610 .await?;
611 merge_res(&mut res, secondary_res, &indices);
612 Ok(res)
613 }
614}
615
616fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
617 let num_nones = values.iter().filter(|v| v.is_none()).count();
618 let mut fallback_keys = Vec::with_capacity(num_nones);
619 let mut fallback_indices = Vec::with_capacity(num_nones);
620 for (i, value) in values.iter().enumerate() {
621 if value.is_none() {
622 fallback_keys.push(keys[i].clone());
623 fallback_indices.push(i);
624 }
625 }
626 (fallback_keys, fallback_indices)
627}
628
629fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
630 for (&index, fallback_value) in indices.iter().zip(fallback_values) {
631 values[index] = fallback_value;
632 }
633}