Skip to main content

iota_storage/
key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Immutable key/value store trait for storing/retrieving transactions,
6//! effects, and events to/from a scalable.
7
8use std::{sync::Arc, time::Instant};
9
10use async_trait::async_trait;
11use iota_sdk_types::ObjectId;
12use iota_types::{
13    base_types::{SequenceNumber, VersionNumber},
14    digests::{CheckpointDigest, TransactionDigest},
15    effects::{TransactionEffects, TransactionEvents},
16    error::{IotaError, IotaResult, UserInputError},
17    messages_checkpoint::{
18        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
19    },
20    object::Object,
21    storage::ObjectKey,
22    transaction::Transaction,
23};
24use tracing::instrument;
25
26use crate::key_value_store_metrics::KeyValueStoreMetrics;
27
28pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
29
30pub type KVStoreCheckpointData = (
31    Vec<Option<CertifiedCheckpointSummary>>,
32    Vec<Option<CheckpointContents>>,
33    Vec<Option<CertifiedCheckpointSummary>>,
34);
35
36pub struct TransactionKeyValueStore {
37    store_name: &'static str,
38    metrics: Arc<KeyValueStoreMetrics>,
39    inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
40}
41
42impl TransactionKeyValueStore {
43    pub fn new(
44        store_name: &'static str,
45        metrics: Arc<KeyValueStoreMetrics>,
46        inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
47    ) -> Self {
48        Self {
49            store_name,
50            metrics,
51            inner,
52        }
53    }
54
55    /// Generic multi_get, allows implementors to get heterogenous values with a
56    /// single round trip.
57    pub async fn multi_get(
58        &self,
59        transaction_keys: &[TransactionDigest],
60        effects_keys: &[TransactionDigest],
61    ) -> IotaResult<KVStoreTransactionData> {
62        let start = Instant::now();
63        let res = self.inner.multi_get(transaction_keys, effects_keys).await;
64        let elapsed = start.elapsed();
65
66        let num_txns = transaction_keys.len() as u64;
67        let num_effects = effects_keys.len() as u64;
68        let total_keys = num_txns + num_effects;
69
70        self.metrics
71            .key_value_store_num_fetches_latency_ms
72            .with_label_values(&[self.store_name, "tx"])
73            .observe(elapsed.as_millis() as f64);
74        self.metrics
75            .key_value_store_num_fetches_batch_size
76            .with_label_values(&[self.store_name, "tx"])
77            .observe(total_keys as f64);
78
79        if let Ok((transactions, effects)) = &res {
80            let txns_not_found = transactions.iter().filter(|v| v.is_none()).count() as u64;
81            let effects_not_found = effects.iter().filter(|v| v.is_none()).count() as u64;
82
83            if num_txns > 0 {
84                self.metrics
85                    .key_value_store_num_fetches_success
86                    .with_label_values(&[self.store_name, "tx"])
87                    .inc_by(num_txns);
88            }
89            if num_effects > 0 {
90                self.metrics
91                    .key_value_store_num_fetches_success
92                    .with_label_values(&[self.store_name, "fx"])
93                    .inc_by(num_effects);
94            }
95
96            if txns_not_found > 0 {
97                self.metrics
98                    .key_value_store_num_fetches_not_found
99                    .with_label_values(&[self.store_name, "tx"])
100                    .inc_by(txns_not_found);
101            }
102            if effects_not_found > 0 {
103                self.metrics
104                    .key_value_store_num_fetches_not_found
105                    .with_label_values(&[self.store_name, "fx"])
106                    .inc_by(effects_not_found);
107            }
108        } else {
109            self.metrics
110                .key_value_store_num_fetches_error
111                .with_label_values(&[self.store_name, "tx"])
112                .inc_by(num_txns);
113            self.metrics
114                .key_value_store_num_fetches_error
115                .with_label_values(&[self.store_name, "fx"])
116                .inc_by(num_effects);
117        }
118
119        res
120    }
121
122    pub async fn multi_get_checkpoints(
123        &self,
124        checkpoint_summaries: &[CheckpointSequenceNumber],
125        checkpoint_contents: &[CheckpointSequenceNumber],
126        checkpoint_summaries_by_digest: &[CheckpointDigest],
127    ) -> IotaResult<(
128        Vec<Option<CertifiedCheckpointSummary>>,
129        Vec<Option<CheckpointContents>>,
130        Vec<Option<CertifiedCheckpointSummary>>,
131    )> {
132        let start = Instant::now();
133        let res = self
134            .inner
135            .multi_get_checkpoints(
136                checkpoint_summaries,
137                checkpoint_contents,
138                checkpoint_summaries_by_digest,
139            )
140            .await;
141        let elapsed = start.elapsed();
142
143        let num_summaries =
144            checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
145        let num_contents = checkpoint_contents.len() as u64;
146
147        self.metrics
148            .key_value_store_num_fetches_latency_ms
149            .with_label_values(&[self.store_name, "checkpoint"])
150            .observe(elapsed.as_millis() as f64);
151        self.metrics
152            .key_value_store_num_fetches_batch_size
153            .with_label_values(&[self.store_name, "checkpoint_summary"])
154            .observe(num_summaries as f64);
155        self.metrics
156            .key_value_store_num_fetches_batch_size
157            .with_label_values(&[self.store_name, "checkpoint_content"])
158            .observe(num_contents as f64);
159
160        if let Ok((summaries, contents, summaries_by_digest)) = &res {
161            let summaries_not_found = summaries.iter().filter(|v| v.is_none()).count() as u64
162                + summaries_by_digest.iter().filter(|v| v.is_none()).count() as u64;
163            let contents_not_found = contents.iter().filter(|v| v.is_none()).count() as u64;
164
165            if num_summaries > 0 {
166                self.metrics
167                    .key_value_store_num_fetches_success
168                    .with_label_values(&[self.store_name, "ckpt_summary"])
169                    .inc_by(num_summaries);
170            }
171            if num_contents > 0 {
172                self.metrics
173                    .key_value_store_num_fetches_success
174                    .with_label_values(&[self.store_name, "ckpt_contents"])
175                    .inc_by(num_contents);
176            }
177
178            if summaries_not_found > 0 {
179                self.metrics
180                    .key_value_store_num_fetches_not_found
181                    .with_label_values(&[self.store_name, "ckpt_summary"])
182                    .inc_by(summaries_not_found);
183            }
184            if contents_not_found > 0 {
185                self.metrics
186                    .key_value_store_num_fetches_not_found
187                    .with_label_values(&[self.store_name, "ckpt_contents"])
188                    .inc_by(contents_not_found);
189            }
190        } else {
191            self.metrics
192                .key_value_store_num_fetches_error
193                .with_label_values(&[self.store_name, "ckpt_summary"])
194                .inc_by(num_summaries);
195            self.metrics
196                .key_value_store_num_fetches_error
197                .with_label_values(&[self.store_name, "ckpt_contents"])
198                .inc_by(num_contents);
199        }
200
201        res
202    }
203
204    pub async fn multi_get_checkpoints_summaries(
205        &self,
206        keys: &[CheckpointSequenceNumber],
207    ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
208        self.multi_get_checkpoints(keys, &[], &[])
209            .await
210            .map(|(summaries, _, _)| summaries)
211    }
212
213    pub async fn multi_get_checkpoints_contents(
214        &self,
215        keys: &[CheckpointSequenceNumber],
216    ) -> IotaResult<Vec<Option<CheckpointContents>>> {
217        self.multi_get_checkpoints(&[], keys, &[])
218            .await
219            .map(|(_, contents, _)| contents)
220    }
221
222    pub async fn multi_get_checkpoints_summaries_by_digest(
223        &self,
224        keys: &[CheckpointDigest],
225    ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
226        self.multi_get_checkpoints(&[], &[], keys)
227            .await
228            .map(|(_, _, summaries)| summaries)
229    }
230
231    pub async fn multi_get_tx(
232        &self,
233        keys: &[TransactionDigest],
234    ) -> IotaResult<Vec<Option<Transaction>>> {
235        self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
236    }
237
238    pub async fn multi_get_fx_by_tx_digest(
239        &self,
240        keys: &[TransactionDigest],
241    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
242        self.multi_get(&[], keys).await.map(|(_, fx)| fx)
243    }
244
245    /// Convenience method for fetching single digest, and returning an error if
246    /// it's not found. Prefer using multi_get_tx whenever possible.
247    pub async fn get_tx(&self, digest: TransactionDigest) -> IotaResult<Transaction> {
248        self.multi_get_tx(&[digest])
249            .await?
250            .into_iter()
251            .next()
252            .flatten()
253            .ok_or(IotaError::TransactionNotFound { digest })
254    }
255
256    /// Convenience method for fetching single digest, and returning an error if
257    /// it's not found. Prefer using multi_get_fx_by_tx_digest whenever
258    /// possible.
259    pub async fn get_fx_by_tx_digest(
260        &self,
261        digest: TransactionDigest,
262    ) -> IotaResult<TransactionEffects> {
263        self.multi_get_fx_by_tx_digest(&[digest])
264            .await?
265            .into_iter()
266            .next()
267            .flatten()
268            .ok_or(IotaError::TransactionNotFound { digest })
269    }
270
271    /// Convenience method for fetching single checkpoint, and returning an
272    /// error if it's not found. Prefer using
273    /// multi_get_checkpoints_summaries whenever possible.
274    pub async fn get_checkpoint_summary(
275        &self,
276        checkpoint: CheckpointSequenceNumber,
277    ) -> IotaResult<CertifiedCheckpointSummary> {
278        self.multi_get_checkpoints_summaries(&[checkpoint])
279            .await?
280            .into_iter()
281            .next()
282            .flatten()
283            .ok_or(IotaError::UserInput {
284                error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
285            })
286    }
287
288    /// Convenience method for fetching single checkpoint, and returning an
289    /// error if it's not found. Prefer using multi_get_checkpoints_contents
290    /// whenever possible.
291    pub async fn get_checkpoint_contents(
292        &self,
293        checkpoint: CheckpointSequenceNumber,
294    ) -> IotaResult<CheckpointContents> {
295        self.multi_get_checkpoints_contents(&[checkpoint])
296            .await?
297            .into_iter()
298            .next()
299            .flatten()
300            .ok_or(IotaError::UserInput {
301                error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
302            })
303    }
304
305    /// Convenience method for fetching single checkpoint, and returning an
306    /// error if it's not found. Prefer using
307    /// multi_get_checkpoints_summaries_by_digest whenever possible.
308    pub async fn get_checkpoint_summary_by_digest(
309        &self,
310        digest: CheckpointDigest,
311    ) -> IotaResult<CertifiedCheckpointSummary> {
312        self.multi_get_checkpoints_summaries_by_digest(&[digest])
313            .await?
314            .into_iter()
315            .next()
316            .flatten()
317            .ok_or(IotaError::UserInput {
318                error: UserInputError::VerifiedCheckpointDigestNotFound(format!("{digest}")),
319            })
320    }
321
322    pub async fn get_transaction_perpetual_checkpoint(
323        &self,
324        digest: TransactionDigest,
325    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
326        self.inner
327            .get_transaction_perpetual_checkpoint(digest)
328            .await
329    }
330
331    pub async fn get_object(
332        &self,
333        object_id: ObjectId,
334        version: VersionNumber,
335    ) -> IotaResult<Option<Object>> {
336        self.inner.get_object(object_id, version).await
337    }
338
339    pub async fn multi_get_objects(
340        &self,
341        object_keys: &[ObjectKey],
342    ) -> IotaResult<Vec<Option<Object>>> {
343        self.inner.multi_get_objects(object_keys).await
344    }
345
346    pub async fn multi_get_transactions_perpetual_checkpoints(
347        &self,
348        digests: &[TransactionDigest],
349    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
350        self.inner
351            .multi_get_transactions_perpetual_checkpoints(digests)
352            .await
353    }
354
355    pub async fn multi_get_events_by_tx_digests(
356        &self,
357        digests: &[TransactionDigest],
358    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
359        self.inner.multi_get_events_by_tx_digests(digests).await
360    }
361}
362
363/// Immutable key/value store trait for storing/retrieving transactions,
364/// effects, and events. Only defines multi_get/multi_put methods to discourage
365/// single key/value operations.
366#[async_trait]
367pub trait TransactionKeyValueStoreTrait {
368    /// Generic multi_get, allows implementors to get heterogenous values with a
369    /// single round trip.
370    async fn multi_get(
371        &self,
372        transaction_keys: &[TransactionDigest],
373        effects_keys: &[TransactionDigest],
374    ) -> IotaResult<KVStoreTransactionData>;
375
376    /// Generic multi_get to allow implementors to get heterogenous values with
377    /// a single round trip.
378    async fn multi_get_checkpoints(
379        &self,
380        checkpoint_summaries: &[CheckpointSequenceNumber],
381        checkpoint_contents: &[CheckpointSequenceNumber],
382        checkpoint_summaries_by_digest: &[CheckpointDigest],
383    ) -> IotaResult<KVStoreCheckpointData>;
384
385    async fn get_transaction_perpetual_checkpoint(
386        &self,
387        digest: TransactionDigest,
388    ) -> IotaResult<Option<CheckpointSequenceNumber>>;
389
390    async fn get_object(
391        &self,
392        object_id: ObjectId,
393        version: SequenceNumber,
394    ) -> IotaResult<Option<Object>>;
395
396    async fn multi_get_objects(&self, object_keys: &[ObjectKey])
397    -> IotaResult<Vec<Option<Object>>>;
398
399    async fn multi_get_transactions_perpetual_checkpoints(
400        &self,
401        digests: &[TransactionDigest],
402    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>>;
403
404    async fn multi_get_events_by_tx_digests(
405        &self,
406        digests: &[TransactionDigest],
407    ) -> IotaResult<Vec<Option<TransactionEvents>>>;
408}
409
410/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any
411/// key for which the primary store returns None.
412///
413/// Will be used to check the local rocksdb store, before falling back to a
414/// remote scalable store.
415pub struct FallbackTransactionKVStore {
416    primary: TransactionKeyValueStore,
417    fallback: TransactionKeyValueStore,
418}
419
420impl FallbackTransactionKVStore {
421    pub fn new_kv(
422        primary: TransactionKeyValueStore,
423        fallback: TransactionKeyValueStore,
424        metrics: Arc<KeyValueStoreMetrics>,
425        label: &'static str,
426    ) -> TransactionKeyValueStore {
427        let store = Arc::new(Self { primary, fallback });
428        TransactionKeyValueStore::new(label, metrics, store)
429    }
430}
431
432#[async_trait]
433impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
434    #[instrument(level = "trace", skip_all)]
435    async fn multi_get(
436        &self,
437        transaction_keys: &[TransactionDigest],
438        effects_keys: &[TransactionDigest],
439    ) -> IotaResult<KVStoreTransactionData> {
440        let (mut transactions, mut effects) = self
441            .primary
442            .multi_get(transaction_keys, effects_keys)
443            .await?;
444
445        let (fallback_transaction_keys, indices_transactions) =
446            find_fallback(&transactions, transaction_keys);
447        let (fallback_effects_keys, indices_effects) = find_fallback(&effects, effects_keys);
448
449        if fallback_transaction_keys.is_empty() && fallback_effects_keys.is_empty() {
450            return Ok((transactions, effects));
451        }
452
453        let (fallback_transactions, fallback_effects) = self
454            .fallback
455            .multi_get(&fallback_transaction_keys, &fallback_effects_keys)
456            .await?;
457
458        merge_res(
459            &mut transactions,
460            fallback_transactions,
461            &indices_transactions,
462        );
463        merge_res(&mut effects, fallback_effects, &indices_effects);
464
465        Ok((transactions, effects))
466    }
467
468    #[instrument(level = "trace", skip_all)]
469    async fn multi_get_checkpoints(
470        &self,
471        checkpoint_summaries: &[CheckpointSequenceNumber],
472        checkpoint_contents: &[CheckpointSequenceNumber],
473        checkpoint_summaries_by_digest: &[CheckpointDigest],
474    ) -> IotaResult<(
475        Vec<Option<CertifiedCheckpointSummary>>,
476        Vec<Option<CheckpointContents>>,
477        Vec<Option<CertifiedCheckpointSummary>>,
478    )> {
479        let (mut summaries, mut contents, mut summaries_by_digest) = self
480            .primary
481            .multi_get_checkpoints(
482                checkpoint_summaries,
483                checkpoint_contents,
484                checkpoint_summaries_by_digest,
485            )
486            .await?;
487
488        let (fallback_summaries, indices_summaries) =
489            find_fallback(&summaries, checkpoint_summaries);
490        let (fallback_contents, indices_contents) = find_fallback(&contents, checkpoint_contents);
491        let (fallback_summaries_by_digest, indices_summaries_by_digest) =
492            find_fallback(&summaries_by_digest, checkpoint_summaries_by_digest);
493
494        if fallback_summaries.is_empty()
495            && fallback_contents.is_empty()
496            && fallback_summaries_by_digest.is_empty()
497        {
498            return Ok((summaries, contents, summaries_by_digest));
499        }
500
501        let (fallback_summaries, fallback_contents, fallback_summaries_by_digest) = self
502            .fallback
503            .multi_get_checkpoints(
504                &fallback_summaries,
505                &fallback_contents,
506                &fallback_summaries_by_digest,
507            )
508            .await?;
509
510        merge_res(&mut summaries, fallback_summaries, &indices_summaries);
511        merge_res(&mut contents, fallback_contents, &indices_contents);
512        merge_res(
513            &mut summaries_by_digest,
514            fallback_summaries_by_digest,
515            &indices_summaries_by_digest,
516        );
517
518        Ok((summaries, contents, summaries_by_digest))
519    }
520
521    #[instrument(level = "trace", skip_all)]
522    async fn get_transaction_perpetual_checkpoint(
523        &self,
524        digest: TransactionDigest,
525    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
526        let mut res = self
527            .primary
528            .get_transaction_perpetual_checkpoint(digest)
529            .await?;
530        if res.is_none() {
531            res = self
532                .fallback
533                .get_transaction_perpetual_checkpoint(digest)
534                .await?;
535        }
536        Ok(res)
537    }
538
539    #[instrument(level = "trace", skip_all)]
540    async fn get_object(
541        &self,
542        object_id: ObjectId,
543        version: SequenceNumber,
544    ) -> IotaResult<Option<Object>> {
545        let mut res = self.primary.get_object(object_id, version).await?;
546        if res.is_none() {
547            res = self.fallback.get_object(object_id, version).await?;
548        }
549        Ok(res)
550    }
551
552    #[instrument(level = "trace", skip_all)]
553    async fn multi_get_objects(
554        &self,
555        object_keys: &[ObjectKey],
556    ) -> IotaResult<Vec<Option<Object>>> {
557        let mut res = self.primary.multi_get_objects(object_keys).await?;
558
559        let (fallback, indices) = find_fallback(&res, object_keys);
560
561        if fallback.is_empty() {
562            return Ok(res);
563        }
564
565        let secondary_res = self.fallback.multi_get_objects(&fallback).await?;
566
567        merge_res(&mut res, secondary_res, &indices);
568
569        Ok(res)
570    }
571
572    #[instrument(level = "trace", skip_all)]
573    async fn multi_get_transactions_perpetual_checkpoints(
574        &self,
575        digests: &[TransactionDigest],
576    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
577        let mut res = self
578            .primary
579            .multi_get_transactions_perpetual_checkpoints(digests)
580            .await?;
581
582        let (fallback, indices) = find_fallback(&res, digests);
583
584        if fallback.is_empty() {
585            return Ok(res);
586        }
587
588        let secondary_res = self
589            .fallback
590            .multi_get_transactions_perpetual_checkpoints(&fallback)
591            .await?;
592
593        merge_res(&mut res, secondary_res, &indices);
594
595        Ok(res)
596    }
597
598    #[instrument(level = "trace", skip_all)]
599    async fn multi_get_events_by_tx_digests(
600        &self,
601        digests: &[TransactionDigest],
602    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
603        let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
604        let (fallback, indices) = find_fallback(&res, digests);
605        if fallback.is_empty() {
606            return Ok(res);
607        }
608        let secondary_res = self
609            .fallback
610            .multi_get_events_by_tx_digests(&fallback)
611            .await?;
612        merge_res(&mut res, secondary_res, &indices);
613        Ok(res)
614    }
615}
616
617fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
618    let num_nones = values.iter().filter(|v| v.is_none()).count();
619    let mut fallback_keys = Vec::with_capacity(num_nones);
620    let mut fallback_indices = Vec::with_capacity(num_nones);
621    for (i, value) in values.iter().enumerate() {
622        if value.is_none() {
623            fallback_keys.push(keys[i].clone());
624            fallback_indices.push(i);
625        }
626    }
627    (fallback_keys, fallback_indices)
628}
629
630fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
631    for (&index, fallback_value) in indices.iter().zip(fallback_values) {
632        values[index] = fallback_value;
633    }
634}