1use std::{sync::Arc, time::Instant};
9
10use async_trait::async_trait;
11use iota_types::{
12 base_types::{ObjectID, SequenceNumber, VersionNumber},
13 digests::{CheckpointDigest, TransactionDigest},
14 effects::{TransactionEffects, TransactionEvents},
15 error::{IotaError, IotaResult, UserInputError},
16 messages_checkpoint::{
17 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18 },
19 object::Object,
20 transaction::Transaction,
21};
22use tracing::instrument;
23
24use crate::key_value_store_metrics::KeyValueStoreMetrics;
25
26pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
27
28pub type KVStoreCheckpointData = (
29 Vec<Option<CertifiedCheckpointSummary>>,
30 Vec<Option<CheckpointContents>>,
31 Vec<Option<CertifiedCheckpointSummary>>,
32);
33
34pub struct TransactionKeyValueStore {
35 store_name: &'static str,
36 metrics: Arc<KeyValueStoreMetrics>,
37 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
38}
39
40impl TransactionKeyValueStore {
41 pub fn new(
42 store_name: &'static str,
43 metrics: Arc<KeyValueStoreMetrics>,
44 inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
45 ) -> Self {
46 Self {
47 store_name,
48 metrics,
49 inner,
50 }
51 }
52
53 pub async fn multi_get(
56 &self,
57 transaction_keys: &[TransactionDigest],
58 effects_keys: &[TransactionDigest],
59 ) -> IotaResult<KVStoreTransactionData> {
60 let start = Instant::now();
61 let res = self.inner.multi_get(transaction_keys, effects_keys).await;
62 let elapsed = start.elapsed();
63
64 let num_txns = transaction_keys.len() as u64;
65 let num_effects = effects_keys.len() as u64;
66 let total_keys = num_txns + num_effects;
67
68 self.metrics
69 .key_value_store_num_fetches_latency_ms
70 .with_label_values(&[self.store_name, "tx"])
71 .observe(elapsed.as_millis() as f64);
72 self.metrics
73 .key_value_store_num_fetches_batch_size
74 .with_label_values(&[self.store_name, "tx"])
75 .observe(total_keys as f64);
76
77 if let Ok((transactions, effects)) = &res {
78 let txns_not_found = transactions.iter().filter(|v| v.is_none()).count() as u64;
79 let effects_not_found = effects.iter().filter(|v| v.is_none()).count() as u64;
80
81 if num_txns > 0 {
82 self.metrics
83 .key_value_store_num_fetches_success
84 .with_label_values(&[self.store_name, "tx"])
85 .inc_by(num_txns);
86 }
87 if num_effects > 0 {
88 self.metrics
89 .key_value_store_num_fetches_success
90 .with_label_values(&[self.store_name, "fx"])
91 .inc_by(num_effects);
92 }
93
94 if txns_not_found > 0 {
95 self.metrics
96 .key_value_store_num_fetches_not_found
97 .with_label_values(&[self.store_name, "tx"])
98 .inc_by(txns_not_found);
99 }
100 if effects_not_found > 0 {
101 self.metrics
102 .key_value_store_num_fetches_not_found
103 .with_label_values(&[self.store_name, "fx"])
104 .inc_by(effects_not_found);
105 }
106 } else {
107 self.metrics
108 .key_value_store_num_fetches_error
109 .with_label_values(&[self.store_name, "tx"])
110 .inc_by(num_txns);
111 self.metrics
112 .key_value_store_num_fetches_error
113 .with_label_values(&[self.store_name, "fx"])
114 .inc_by(num_effects);
115 }
116
117 res
118 }
119
120 pub async fn multi_get_checkpoints(
121 &self,
122 checkpoint_summaries: &[CheckpointSequenceNumber],
123 checkpoint_contents: &[CheckpointSequenceNumber],
124 checkpoint_summaries_by_digest: &[CheckpointDigest],
125 ) -> IotaResult<(
126 Vec<Option<CertifiedCheckpointSummary>>,
127 Vec<Option<CheckpointContents>>,
128 Vec<Option<CertifiedCheckpointSummary>>,
129 )> {
130 let start = Instant::now();
131 let res = self
132 .inner
133 .multi_get_checkpoints(
134 checkpoint_summaries,
135 checkpoint_contents,
136 checkpoint_summaries_by_digest,
137 )
138 .await;
139 let elapsed = start.elapsed();
140
141 let num_summaries =
142 checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
143 let num_contents = checkpoint_contents.len() as u64;
144
145 self.metrics
146 .key_value_store_num_fetches_latency_ms
147 .with_label_values(&[self.store_name, "checkpoint"])
148 .observe(elapsed.as_millis() as f64);
149 self.metrics
150 .key_value_store_num_fetches_batch_size
151 .with_label_values(&[self.store_name, "checkpoint_summary"])
152 .observe(num_summaries as f64);
153 self.metrics
154 .key_value_store_num_fetches_batch_size
155 .with_label_values(&[self.store_name, "checkpoint_content"])
156 .observe(num_contents as f64);
157
158 if let Ok((summaries, contents, summaries_by_digest)) = &res {
159 let summaries_not_found = summaries.iter().filter(|v| v.is_none()).count() as u64
160 + summaries_by_digest.iter().filter(|v| v.is_none()).count() as u64;
161 let contents_not_found = contents.iter().filter(|v| v.is_none()).count() as u64;
162
163 if num_summaries > 0 {
164 self.metrics
165 .key_value_store_num_fetches_success
166 .with_label_values(&[self.store_name, "ckpt_summary"])
167 .inc_by(num_summaries);
168 }
169 if num_contents > 0 {
170 self.metrics
171 .key_value_store_num_fetches_success
172 .with_label_values(&[self.store_name, "ckpt_contents"])
173 .inc_by(num_contents);
174 }
175
176 if summaries_not_found > 0 {
177 self.metrics
178 .key_value_store_num_fetches_not_found
179 .with_label_values(&[self.store_name, "ckpt_summary"])
180 .inc_by(summaries_not_found);
181 }
182 if contents_not_found > 0 {
183 self.metrics
184 .key_value_store_num_fetches_not_found
185 .with_label_values(&[self.store_name, "ckpt_contents"])
186 .inc_by(contents_not_found);
187 }
188 } else {
189 self.metrics
190 .key_value_store_num_fetches_error
191 .with_label_values(&[self.store_name, "ckpt_summary"])
192 .inc_by(num_summaries);
193 self.metrics
194 .key_value_store_num_fetches_error
195 .with_label_values(&[self.store_name, "ckpt_contents"])
196 .inc_by(num_contents);
197 }
198
199 res
200 }
201
202 pub async fn multi_get_checkpoints_summaries(
203 &self,
204 keys: &[CheckpointSequenceNumber],
205 ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
206 self.multi_get_checkpoints(keys, &[], &[])
207 .await
208 .map(|(summaries, _, _)| summaries)
209 }
210
211 pub async fn multi_get_checkpoints_contents(
212 &self,
213 keys: &[CheckpointSequenceNumber],
214 ) -> IotaResult<Vec<Option<CheckpointContents>>> {
215 self.multi_get_checkpoints(&[], keys, &[])
216 .await
217 .map(|(_, contents, _)| contents)
218 }
219
220 pub async fn multi_get_checkpoints_summaries_by_digest(
221 &self,
222 keys: &[CheckpointDigest],
223 ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
224 self.multi_get_checkpoints(&[], &[], keys)
225 .await
226 .map(|(_, _, summaries)| summaries)
227 }
228
229 pub async fn multi_get_tx(
230 &self,
231 keys: &[TransactionDigest],
232 ) -> IotaResult<Vec<Option<Transaction>>> {
233 self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
234 }
235
236 pub async fn multi_get_fx_by_tx_digest(
237 &self,
238 keys: &[TransactionDigest],
239 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
240 self.multi_get(&[], keys).await.map(|(_, fx)| fx)
241 }
242
243 pub async fn get_tx(&self, digest: TransactionDigest) -> IotaResult<Transaction> {
246 self.multi_get_tx(&[digest])
247 .await?
248 .into_iter()
249 .next()
250 .flatten()
251 .ok_or(IotaError::TransactionNotFound { digest })
252 }
253
254 pub async fn get_fx_by_tx_digest(
258 &self,
259 digest: TransactionDigest,
260 ) -> IotaResult<TransactionEffects> {
261 self.multi_get_fx_by_tx_digest(&[digest])
262 .await?
263 .into_iter()
264 .next()
265 .flatten()
266 .ok_or(IotaError::TransactionNotFound { digest })
267 }
268
269 pub async fn get_checkpoint_summary(
273 &self,
274 checkpoint: CheckpointSequenceNumber,
275 ) -> IotaResult<CertifiedCheckpointSummary> {
276 self.multi_get_checkpoints_summaries(&[checkpoint])
277 .await?
278 .into_iter()
279 .next()
280 .flatten()
281 .ok_or(IotaError::UserInput {
282 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
283 })
284 }
285
286 pub async fn get_checkpoint_contents(
290 &self,
291 checkpoint: CheckpointSequenceNumber,
292 ) -> IotaResult<CheckpointContents> {
293 self.multi_get_checkpoints_contents(&[checkpoint])
294 .await?
295 .into_iter()
296 .next()
297 .flatten()
298 .ok_or(IotaError::UserInput {
299 error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
300 })
301 }
302
303 pub async fn get_checkpoint_summary_by_digest(
307 &self,
308 digest: CheckpointDigest,
309 ) -> IotaResult<CertifiedCheckpointSummary> {
310 self.multi_get_checkpoints_summaries_by_digest(&[digest])
311 .await?
312 .into_iter()
313 .next()
314 .flatten()
315 .ok_or(IotaError::UserInput {
316 error: UserInputError::VerifiedCheckpointDigestNotFound(format!("{:?}", digest)),
317 })
318 }
319
320 pub async fn get_transaction_perpetual_checkpoint(
321 &self,
322 digest: TransactionDigest,
323 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
324 self.inner
325 .get_transaction_perpetual_checkpoint(digest)
326 .await
327 }
328
329 pub async fn get_object(
330 &self,
331 object_id: ObjectID,
332 version: VersionNumber,
333 ) -> IotaResult<Option<Object>> {
334 self.inner.get_object(object_id, version).await
335 }
336
337 pub async fn multi_get_transactions_perpetual_checkpoints(
338 &self,
339 digests: &[TransactionDigest],
340 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
341 self.inner
342 .multi_get_transactions_perpetual_checkpoints(digests)
343 .await
344 }
345
346 pub async fn multi_get_events_by_tx_digests(
347 &self,
348 digests: &[TransactionDigest],
349 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
350 self.inner.multi_get_events_by_tx_digests(digests).await
351 }
352}
353
354#[async_trait]
358pub trait TransactionKeyValueStoreTrait {
359 async fn multi_get(
362 &self,
363 transaction_keys: &[TransactionDigest],
364 effects_keys: &[TransactionDigest],
365 ) -> IotaResult<KVStoreTransactionData>;
366
367 async fn multi_get_checkpoints(
370 &self,
371 checkpoint_summaries: &[CheckpointSequenceNumber],
372 checkpoint_contents: &[CheckpointSequenceNumber],
373 checkpoint_summaries_by_digest: &[CheckpointDigest],
374 ) -> IotaResult<KVStoreCheckpointData>;
375
376 async fn get_transaction_perpetual_checkpoint(
377 &self,
378 digest: TransactionDigest,
379 ) -> IotaResult<Option<CheckpointSequenceNumber>>;
380
381 async fn get_object(
382 &self,
383 object_id: ObjectID,
384 version: SequenceNumber,
385 ) -> IotaResult<Option<Object>>;
386
387 async fn multi_get_transactions_perpetual_checkpoints(
388 &self,
389 digests: &[TransactionDigest],
390 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>>;
391
392 async fn multi_get_events_by_tx_digests(
393 &self,
394 digests: &[TransactionDigest],
395 ) -> IotaResult<Vec<Option<TransactionEvents>>>;
396}
397
398pub struct FallbackTransactionKVStore {
404 primary: TransactionKeyValueStore,
405 fallback: TransactionKeyValueStore,
406}
407
408impl FallbackTransactionKVStore {
409 pub fn new_kv(
410 primary: TransactionKeyValueStore,
411 fallback: TransactionKeyValueStore,
412 metrics: Arc<KeyValueStoreMetrics>,
413 label: &'static str,
414 ) -> TransactionKeyValueStore {
415 let store = Arc::new(Self { primary, fallback });
416 TransactionKeyValueStore::new(label, metrics, store)
417 }
418}
419
420#[async_trait]
421impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
422 #[instrument(level = "trace", skip_all)]
423 async fn multi_get(
424 &self,
425 transaction_keys: &[TransactionDigest],
426 effects_keys: &[TransactionDigest],
427 ) -> IotaResult<KVStoreTransactionData> {
428 let (mut transactions, mut effects) = self
429 .primary
430 .multi_get(transaction_keys, effects_keys)
431 .await?;
432
433 let (fallback_transaction_keys, indices_transactions) =
434 find_fallback(&transactions, transaction_keys);
435 let (fallback_effects_keys, indices_effects) = find_fallback(&effects, effects_keys);
436
437 if fallback_transaction_keys.is_empty() && fallback_effects_keys.is_empty() {
438 return Ok((transactions, effects));
439 }
440
441 let (fallback_transactions, fallback_effects) = self
442 .fallback
443 .multi_get(&fallback_transaction_keys, &fallback_effects_keys)
444 .await?;
445
446 merge_res(
447 &mut transactions,
448 fallback_transactions,
449 &indices_transactions,
450 );
451 merge_res(&mut effects, fallback_effects, &indices_effects);
452
453 Ok((transactions, effects))
454 }
455
456 #[instrument(level = "trace", skip_all)]
457 async fn multi_get_checkpoints(
458 &self,
459 checkpoint_summaries: &[CheckpointSequenceNumber],
460 checkpoint_contents: &[CheckpointSequenceNumber],
461 checkpoint_summaries_by_digest: &[CheckpointDigest],
462 ) -> IotaResult<(
463 Vec<Option<CertifiedCheckpointSummary>>,
464 Vec<Option<CheckpointContents>>,
465 Vec<Option<CertifiedCheckpointSummary>>,
466 )> {
467 let (mut summaries, mut contents, mut summaries_by_digest) = self
468 .primary
469 .multi_get_checkpoints(
470 checkpoint_summaries,
471 checkpoint_contents,
472 checkpoint_summaries_by_digest,
473 )
474 .await?;
475
476 let (fallback_summaries, indices_summaries) =
477 find_fallback(&summaries, checkpoint_summaries);
478 let (fallback_contents, indices_contents) = find_fallback(&contents, checkpoint_contents);
479 let (fallback_summaries_by_digest, indices_summaries_by_digest) =
480 find_fallback(&summaries_by_digest, checkpoint_summaries_by_digest);
481
482 if fallback_summaries.is_empty()
483 && fallback_contents.is_empty()
484 && fallback_summaries_by_digest.is_empty()
485 {
486 return Ok((summaries, contents, summaries_by_digest));
487 }
488
489 let (fallback_summaries, fallback_contents, fallback_summaries_by_digest) = self
490 .fallback
491 .multi_get_checkpoints(
492 &fallback_summaries,
493 &fallback_contents,
494 &fallback_summaries_by_digest,
495 )
496 .await?;
497
498 merge_res(&mut summaries, fallback_summaries, &indices_summaries);
499 merge_res(&mut contents, fallback_contents, &indices_contents);
500 merge_res(
501 &mut summaries_by_digest,
502 fallback_summaries_by_digest,
503 &indices_summaries_by_digest,
504 );
505
506 Ok((summaries, contents, summaries_by_digest))
507 }
508
509 #[instrument(level = "trace", skip_all)]
510 async fn get_transaction_perpetual_checkpoint(
511 &self,
512 digest: TransactionDigest,
513 ) -> IotaResult<Option<CheckpointSequenceNumber>> {
514 let mut res = self
515 .primary
516 .get_transaction_perpetual_checkpoint(digest)
517 .await?;
518 if res.is_none() {
519 res = self
520 .fallback
521 .get_transaction_perpetual_checkpoint(digest)
522 .await?;
523 }
524 Ok(res)
525 }
526
527 #[instrument(level = "trace", skip_all)]
528 async fn get_object(
529 &self,
530 object_id: ObjectID,
531 version: SequenceNumber,
532 ) -> IotaResult<Option<Object>> {
533 let mut res = self.primary.get_object(object_id, version).await?;
534 if res.is_none() {
535 res = self.fallback.get_object(object_id, version).await?;
536 }
537 Ok(res)
538 }
539
540 #[instrument(level = "trace", skip_all)]
541 async fn multi_get_transactions_perpetual_checkpoints(
542 &self,
543 digests: &[TransactionDigest],
544 ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
545 let mut res = self
546 .primary
547 .multi_get_transactions_perpetual_checkpoints(digests)
548 .await?;
549
550 let (fallback, indices) = find_fallback(&res, digests);
551
552 if fallback.is_empty() {
553 return Ok(res);
554 }
555
556 let secondary_res = self
557 .fallback
558 .multi_get_transactions_perpetual_checkpoints(&fallback)
559 .await?;
560
561 merge_res(&mut res, secondary_res, &indices);
562
563 Ok(res)
564 }
565
566 #[instrument(level = "trace", skip_all)]
567 async fn multi_get_events_by_tx_digests(
568 &self,
569 digests: &[TransactionDigest],
570 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
571 let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
572 let (fallback, indices) = find_fallback(&res, digests);
573 if fallback.is_empty() {
574 return Ok(res);
575 }
576 let secondary_res = self
577 .fallback
578 .multi_get_events_by_tx_digests(&fallback)
579 .await?;
580 merge_res(&mut res, secondary_res, &indices);
581 Ok(res)
582 }
583}
584
585fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
586 let num_nones = values.iter().filter(|v| v.is_none()).count();
587 let mut fallback_keys = Vec::with_capacity(num_nones);
588 let mut fallback_indices = Vec::with_capacity(num_nones);
589 for (i, value) in values.iter().enumerate() {
590 if value.is_none() {
591 fallback_keys.push(keys[i].clone());
592 fallback_indices.push(i);
593 }
594 }
595 (fallback_keys, fallback_indices)
596}
597
598fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
599 for (&index, fallback_value) in indices.iter().zip(fallback_values) {
600 values[index] = fallback_value;
601 }
602}