iota_storage/
key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Immutable key/value store trait for storing/retrieving transactions,
6//! effects, and events to/from a scalable.
7
8use std::{sync::Arc, time::Instant};
9
10use async_trait::async_trait;
11use iota_types::{
12    base_types::{ObjectID, SequenceNumber, VersionNumber},
13    digests::{CheckpointDigest, TransactionDigest},
14    effects::{TransactionEffects, TransactionEvents},
15    error::{IotaError, IotaResult, UserInputError},
16    messages_checkpoint::{
17        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18    },
19    object::Object,
20    storage::ObjectKey,
21    transaction::Transaction,
22};
23use tracing::instrument;
24
25use crate::key_value_store_metrics::KeyValueStoreMetrics;
26
27pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
28
29pub type KVStoreCheckpointData = (
30    Vec<Option<CertifiedCheckpointSummary>>,
31    Vec<Option<CheckpointContents>>,
32    Vec<Option<CertifiedCheckpointSummary>>,
33);
34
35pub struct TransactionKeyValueStore {
36    store_name: &'static str,
37    metrics: Arc<KeyValueStoreMetrics>,
38    inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
39}
40
41impl TransactionKeyValueStore {
42    pub fn new(
43        store_name: &'static str,
44        metrics: Arc<KeyValueStoreMetrics>,
45        inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
46    ) -> Self {
47        Self {
48            store_name,
49            metrics,
50            inner,
51        }
52    }
53
54    /// Generic multi_get, allows implementors to get heterogenous values with a
55    /// single round trip.
56    pub async fn multi_get(
57        &self,
58        transaction_keys: &[TransactionDigest],
59        effects_keys: &[TransactionDigest],
60    ) -> IotaResult<KVStoreTransactionData> {
61        let start = Instant::now();
62        let res = self.inner.multi_get(transaction_keys, effects_keys).await;
63        let elapsed = start.elapsed();
64
65        let num_txns = transaction_keys.len() as u64;
66        let num_effects = effects_keys.len() as u64;
67        let total_keys = num_txns + num_effects;
68
69        self.metrics
70            .key_value_store_num_fetches_latency_ms
71            .with_label_values(&[self.store_name, "tx"])
72            .observe(elapsed.as_millis() as f64);
73        self.metrics
74            .key_value_store_num_fetches_batch_size
75            .with_label_values(&[self.store_name, "tx"])
76            .observe(total_keys as f64);
77
78        if let Ok((transactions, effects)) = &res {
79            let txns_not_found = transactions.iter().filter(|v| v.is_none()).count() as u64;
80            let effects_not_found = effects.iter().filter(|v| v.is_none()).count() as u64;
81
82            if num_txns > 0 {
83                self.metrics
84                    .key_value_store_num_fetches_success
85                    .with_label_values(&[self.store_name, "tx"])
86                    .inc_by(num_txns);
87            }
88            if num_effects > 0 {
89                self.metrics
90                    .key_value_store_num_fetches_success
91                    .with_label_values(&[self.store_name, "fx"])
92                    .inc_by(num_effects);
93            }
94
95            if txns_not_found > 0 {
96                self.metrics
97                    .key_value_store_num_fetches_not_found
98                    .with_label_values(&[self.store_name, "tx"])
99                    .inc_by(txns_not_found);
100            }
101            if effects_not_found > 0 {
102                self.metrics
103                    .key_value_store_num_fetches_not_found
104                    .with_label_values(&[self.store_name, "fx"])
105                    .inc_by(effects_not_found);
106            }
107        } else {
108            self.metrics
109                .key_value_store_num_fetches_error
110                .with_label_values(&[self.store_name, "tx"])
111                .inc_by(num_txns);
112            self.metrics
113                .key_value_store_num_fetches_error
114                .with_label_values(&[self.store_name, "fx"])
115                .inc_by(num_effects);
116        }
117
118        res
119    }
120
121    pub async fn multi_get_checkpoints(
122        &self,
123        checkpoint_summaries: &[CheckpointSequenceNumber],
124        checkpoint_contents: &[CheckpointSequenceNumber],
125        checkpoint_summaries_by_digest: &[CheckpointDigest],
126    ) -> IotaResult<(
127        Vec<Option<CertifiedCheckpointSummary>>,
128        Vec<Option<CheckpointContents>>,
129        Vec<Option<CertifiedCheckpointSummary>>,
130    )> {
131        let start = Instant::now();
132        let res = self
133            .inner
134            .multi_get_checkpoints(
135                checkpoint_summaries,
136                checkpoint_contents,
137                checkpoint_summaries_by_digest,
138            )
139            .await;
140        let elapsed = start.elapsed();
141
142        let num_summaries =
143            checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
144        let num_contents = checkpoint_contents.len() as u64;
145
146        self.metrics
147            .key_value_store_num_fetches_latency_ms
148            .with_label_values(&[self.store_name, "checkpoint"])
149            .observe(elapsed.as_millis() as f64);
150        self.metrics
151            .key_value_store_num_fetches_batch_size
152            .with_label_values(&[self.store_name, "checkpoint_summary"])
153            .observe(num_summaries as f64);
154        self.metrics
155            .key_value_store_num_fetches_batch_size
156            .with_label_values(&[self.store_name, "checkpoint_content"])
157            .observe(num_contents as f64);
158
159        if let Ok((summaries, contents, summaries_by_digest)) = &res {
160            let summaries_not_found = summaries.iter().filter(|v| v.is_none()).count() as u64
161                + summaries_by_digest.iter().filter(|v| v.is_none()).count() as u64;
162            let contents_not_found = contents.iter().filter(|v| v.is_none()).count() as u64;
163
164            if num_summaries > 0 {
165                self.metrics
166                    .key_value_store_num_fetches_success
167                    .with_label_values(&[self.store_name, "ckpt_summary"])
168                    .inc_by(num_summaries);
169            }
170            if num_contents > 0 {
171                self.metrics
172                    .key_value_store_num_fetches_success
173                    .with_label_values(&[self.store_name, "ckpt_contents"])
174                    .inc_by(num_contents);
175            }
176
177            if summaries_not_found > 0 {
178                self.metrics
179                    .key_value_store_num_fetches_not_found
180                    .with_label_values(&[self.store_name, "ckpt_summary"])
181                    .inc_by(summaries_not_found);
182            }
183            if contents_not_found > 0 {
184                self.metrics
185                    .key_value_store_num_fetches_not_found
186                    .with_label_values(&[self.store_name, "ckpt_contents"])
187                    .inc_by(contents_not_found);
188            }
189        } else {
190            self.metrics
191                .key_value_store_num_fetches_error
192                .with_label_values(&[self.store_name, "ckpt_summary"])
193                .inc_by(num_summaries);
194            self.metrics
195                .key_value_store_num_fetches_error
196                .with_label_values(&[self.store_name, "ckpt_contents"])
197                .inc_by(num_contents);
198        }
199
200        res
201    }
202
203    pub async fn multi_get_checkpoints_summaries(
204        &self,
205        keys: &[CheckpointSequenceNumber],
206    ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
207        self.multi_get_checkpoints(keys, &[], &[])
208            .await
209            .map(|(summaries, _, _)| summaries)
210    }
211
212    pub async fn multi_get_checkpoints_contents(
213        &self,
214        keys: &[CheckpointSequenceNumber],
215    ) -> IotaResult<Vec<Option<CheckpointContents>>> {
216        self.multi_get_checkpoints(&[], keys, &[])
217            .await
218            .map(|(_, contents, _)| contents)
219    }
220
221    pub async fn multi_get_checkpoints_summaries_by_digest(
222        &self,
223        keys: &[CheckpointDigest],
224    ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
225        self.multi_get_checkpoints(&[], &[], keys)
226            .await
227            .map(|(_, _, summaries)| summaries)
228    }
229
230    pub async fn multi_get_tx(
231        &self,
232        keys: &[TransactionDigest],
233    ) -> IotaResult<Vec<Option<Transaction>>> {
234        self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
235    }
236
237    pub async fn multi_get_fx_by_tx_digest(
238        &self,
239        keys: &[TransactionDigest],
240    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
241        self.multi_get(&[], keys).await.map(|(_, fx)| fx)
242    }
243
244    /// Convenience method for fetching single digest, and returning an error if
245    /// it's not found. Prefer using multi_get_tx whenever possible.
246    pub async fn get_tx(&self, digest: TransactionDigest) -> IotaResult<Transaction> {
247        self.multi_get_tx(&[digest])
248            .await?
249            .into_iter()
250            .next()
251            .flatten()
252            .ok_or(IotaError::TransactionNotFound { digest })
253    }
254
255    /// Convenience method for fetching single digest, and returning an error if
256    /// it's not found. Prefer using multi_get_fx_by_tx_digest whenever
257    /// possible.
258    pub async fn get_fx_by_tx_digest(
259        &self,
260        digest: TransactionDigest,
261    ) -> IotaResult<TransactionEffects> {
262        self.multi_get_fx_by_tx_digest(&[digest])
263            .await?
264            .into_iter()
265            .next()
266            .flatten()
267            .ok_or(IotaError::TransactionNotFound { digest })
268    }
269
270    /// Convenience method for fetching single checkpoint, and returning an
271    /// error if it's not found. Prefer using
272    /// multi_get_checkpoints_summaries whenever possible.
273    pub async fn get_checkpoint_summary(
274        &self,
275        checkpoint: CheckpointSequenceNumber,
276    ) -> IotaResult<CertifiedCheckpointSummary> {
277        self.multi_get_checkpoints_summaries(&[checkpoint])
278            .await?
279            .into_iter()
280            .next()
281            .flatten()
282            .ok_or(IotaError::UserInput {
283                error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
284            })
285    }
286
287    /// Convenience method for fetching single checkpoint, and returning an
288    /// error if it's not found. Prefer using multi_get_checkpoints_contents
289    /// whenever possible.
290    pub async fn get_checkpoint_contents(
291        &self,
292        checkpoint: CheckpointSequenceNumber,
293    ) -> IotaResult<CheckpointContents> {
294        self.multi_get_checkpoints_contents(&[checkpoint])
295            .await?
296            .into_iter()
297            .next()
298            .flatten()
299            .ok_or(IotaError::UserInput {
300                error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
301            })
302    }
303
304    /// Convenience method for fetching single checkpoint, and returning an
305    /// error if it's not found. Prefer using
306    /// multi_get_checkpoints_summaries_by_digest whenever possible.
307    pub async fn get_checkpoint_summary_by_digest(
308        &self,
309        digest: CheckpointDigest,
310    ) -> IotaResult<CertifiedCheckpointSummary> {
311        self.multi_get_checkpoints_summaries_by_digest(&[digest])
312            .await?
313            .into_iter()
314            .next()
315            .flatten()
316            .ok_or(IotaError::UserInput {
317                error: UserInputError::VerifiedCheckpointDigestNotFound(format!("{digest:?}")),
318            })
319    }
320
321    pub async fn get_transaction_perpetual_checkpoint(
322        &self,
323        digest: TransactionDigest,
324    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
325        self.inner
326            .get_transaction_perpetual_checkpoint(digest)
327            .await
328    }
329
330    pub async fn get_object(
331        &self,
332        object_id: ObjectID,
333        version: VersionNumber,
334    ) -> IotaResult<Option<Object>> {
335        self.inner.get_object(object_id, version).await
336    }
337
338    pub async fn multi_get_objects(
339        &self,
340        object_keys: &[ObjectKey],
341    ) -> IotaResult<Vec<Option<Object>>> {
342        self.inner.multi_get_objects(object_keys).await
343    }
344
345    pub async fn multi_get_transactions_perpetual_checkpoints(
346        &self,
347        digests: &[TransactionDigest],
348    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
349        self.inner
350            .multi_get_transactions_perpetual_checkpoints(digests)
351            .await
352    }
353
354    pub async fn multi_get_events_by_tx_digests(
355        &self,
356        digests: &[TransactionDigest],
357    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
358        self.inner.multi_get_events_by_tx_digests(digests).await
359    }
360}
361
362/// Immutable key/value store trait for storing/retrieving transactions,
363/// effects, and events. Only defines multi_get/multi_put methods to discourage
364/// single key/value operations.
365#[async_trait]
366pub trait TransactionKeyValueStoreTrait {
367    /// Generic multi_get, allows implementors to get heterogenous values with a
368    /// single round trip.
369    async fn multi_get(
370        &self,
371        transaction_keys: &[TransactionDigest],
372        effects_keys: &[TransactionDigest],
373    ) -> IotaResult<KVStoreTransactionData>;
374
375    /// Generic multi_get to allow implementors to get heterogenous values with
376    /// a single round trip.
377    async fn multi_get_checkpoints(
378        &self,
379        checkpoint_summaries: &[CheckpointSequenceNumber],
380        checkpoint_contents: &[CheckpointSequenceNumber],
381        checkpoint_summaries_by_digest: &[CheckpointDigest],
382    ) -> IotaResult<KVStoreCheckpointData>;
383
384    async fn get_transaction_perpetual_checkpoint(
385        &self,
386        digest: TransactionDigest,
387    ) -> IotaResult<Option<CheckpointSequenceNumber>>;
388
389    async fn get_object(
390        &self,
391        object_id: ObjectID,
392        version: SequenceNumber,
393    ) -> IotaResult<Option<Object>>;
394
395    async fn multi_get_objects(&self, object_keys: &[ObjectKey])
396    -> IotaResult<Vec<Option<Object>>>;
397
398    async fn multi_get_transactions_perpetual_checkpoints(
399        &self,
400        digests: &[TransactionDigest],
401    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>>;
402
403    async fn multi_get_events_by_tx_digests(
404        &self,
405        digests: &[TransactionDigest],
406    ) -> IotaResult<Vec<Option<TransactionEvents>>>;
407}
408
409/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any
410/// key for which the primary store returns None.
411///
412/// Will be used to check the local rocksdb store, before falling back to a
413/// remote scalable store.
414pub struct FallbackTransactionKVStore {
415    primary: TransactionKeyValueStore,
416    fallback: TransactionKeyValueStore,
417}
418
419impl FallbackTransactionKVStore {
420    pub fn new_kv(
421        primary: TransactionKeyValueStore,
422        fallback: TransactionKeyValueStore,
423        metrics: Arc<KeyValueStoreMetrics>,
424        label: &'static str,
425    ) -> TransactionKeyValueStore {
426        let store = Arc::new(Self { primary, fallback });
427        TransactionKeyValueStore::new(label, metrics, store)
428    }
429}
430
431#[async_trait]
432impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
433    #[instrument(level = "trace", skip_all)]
434    async fn multi_get(
435        &self,
436        transaction_keys: &[TransactionDigest],
437        effects_keys: &[TransactionDigest],
438    ) -> IotaResult<KVStoreTransactionData> {
439        let (mut transactions, mut effects) = self
440            .primary
441            .multi_get(transaction_keys, effects_keys)
442            .await?;
443
444        let (fallback_transaction_keys, indices_transactions) =
445            find_fallback(&transactions, transaction_keys);
446        let (fallback_effects_keys, indices_effects) = find_fallback(&effects, effects_keys);
447
448        if fallback_transaction_keys.is_empty() && fallback_effects_keys.is_empty() {
449            return Ok((transactions, effects));
450        }
451
452        let (fallback_transactions, fallback_effects) = self
453            .fallback
454            .multi_get(&fallback_transaction_keys, &fallback_effects_keys)
455            .await?;
456
457        merge_res(
458            &mut transactions,
459            fallback_transactions,
460            &indices_transactions,
461        );
462        merge_res(&mut effects, fallback_effects, &indices_effects);
463
464        Ok((transactions, effects))
465    }
466
467    #[instrument(level = "trace", skip_all)]
468    async fn multi_get_checkpoints(
469        &self,
470        checkpoint_summaries: &[CheckpointSequenceNumber],
471        checkpoint_contents: &[CheckpointSequenceNumber],
472        checkpoint_summaries_by_digest: &[CheckpointDigest],
473    ) -> IotaResult<(
474        Vec<Option<CertifiedCheckpointSummary>>,
475        Vec<Option<CheckpointContents>>,
476        Vec<Option<CertifiedCheckpointSummary>>,
477    )> {
478        let (mut summaries, mut contents, mut summaries_by_digest) = self
479            .primary
480            .multi_get_checkpoints(
481                checkpoint_summaries,
482                checkpoint_contents,
483                checkpoint_summaries_by_digest,
484            )
485            .await?;
486
487        let (fallback_summaries, indices_summaries) =
488            find_fallback(&summaries, checkpoint_summaries);
489        let (fallback_contents, indices_contents) = find_fallback(&contents, checkpoint_contents);
490        let (fallback_summaries_by_digest, indices_summaries_by_digest) =
491            find_fallback(&summaries_by_digest, checkpoint_summaries_by_digest);
492
493        if fallback_summaries.is_empty()
494            && fallback_contents.is_empty()
495            && fallback_summaries_by_digest.is_empty()
496        {
497            return Ok((summaries, contents, summaries_by_digest));
498        }
499
500        let (fallback_summaries, fallback_contents, fallback_summaries_by_digest) = self
501            .fallback
502            .multi_get_checkpoints(
503                &fallback_summaries,
504                &fallback_contents,
505                &fallback_summaries_by_digest,
506            )
507            .await?;
508
509        merge_res(&mut summaries, fallback_summaries, &indices_summaries);
510        merge_res(&mut contents, fallback_contents, &indices_contents);
511        merge_res(
512            &mut summaries_by_digest,
513            fallback_summaries_by_digest,
514            &indices_summaries_by_digest,
515        );
516
517        Ok((summaries, contents, summaries_by_digest))
518    }
519
520    #[instrument(level = "trace", skip_all)]
521    async fn get_transaction_perpetual_checkpoint(
522        &self,
523        digest: TransactionDigest,
524    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
525        let mut res = self
526            .primary
527            .get_transaction_perpetual_checkpoint(digest)
528            .await?;
529        if res.is_none() {
530            res = self
531                .fallback
532                .get_transaction_perpetual_checkpoint(digest)
533                .await?;
534        }
535        Ok(res)
536    }
537
538    #[instrument(level = "trace", skip_all)]
539    async fn get_object(
540        &self,
541        object_id: ObjectID,
542        version: SequenceNumber,
543    ) -> IotaResult<Option<Object>> {
544        let mut res = self.primary.get_object(object_id, version).await?;
545        if res.is_none() {
546            res = self.fallback.get_object(object_id, version).await?;
547        }
548        Ok(res)
549    }
550
551    #[instrument(level = "trace", skip_all)]
552    async fn multi_get_objects(
553        &self,
554        object_keys: &[ObjectKey],
555    ) -> IotaResult<Vec<Option<Object>>> {
556        let mut res = self.primary.multi_get_objects(object_keys).await?;
557
558        let (fallback, indices) = find_fallback(&res, object_keys);
559
560        if fallback.is_empty() {
561            return Ok(res);
562        }
563
564        let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
565
566        merge_res(&mut res, secondary_res, &indices);
567
568        Ok(res)
569    }
570
571    #[instrument(level = "trace", skip_all)]
572    async fn multi_get_transactions_perpetual_checkpoints(
573        &self,
574        digests: &[TransactionDigest],
575    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
576        let mut res = self
577            .primary
578            .multi_get_transactions_perpetual_checkpoints(digests)
579            .await?;
580
581        let (fallback, indices) = find_fallback(&res, digests);
582
583        if fallback.is_empty() {
584            return Ok(res);
585        }
586
587        let secondary_res = self
588            .fallback
589            .multi_get_transactions_perpetual_checkpoints(&fallback)
590            .await?;
591
592        merge_res(&mut res, secondary_res, &indices);
593
594        Ok(res)
595    }
596
597    #[instrument(level = "trace", skip_all)]
598    async fn multi_get_events_by_tx_digests(
599        &self,
600        digests: &[TransactionDigest],
601    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
602        let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
603        let (fallback, indices) = find_fallback(&res, digests);
604        if fallback.is_empty() {
605            return Ok(res);
606        }
607        let secondary_res = self
608            .fallback
609            .multi_get_events_by_tx_digests(&fallback)
610            .await?;
611        merge_res(&mut res, secondary_res, &indices);
612        Ok(res)
613    }
614}
615
616fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
617    let num_nones = values.iter().filter(|v| v.is_none()).count();
618    let mut fallback_keys = Vec::with_capacity(num_nones);
619    let mut fallback_indices = Vec::with_capacity(num_nones);
620    for (i, value) in values.iter().enumerate() {
621        if value.is_none() {
622            fallback_keys.push(keys[i].clone());
623            fallback_indices.push(i);
624        }
625    }
626    (fallback_keys, fallback_indices)
627}
628
629fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
630    for (&index, fallback_value) in indices.iter().zip(fallback_values) {
631        values[index] = fallback_value;
632    }
633}