iota_storage/
key_value_store.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Immutable key/value store trait for storing/retrieving transactions,
6//! effects, and events to/from a scalable.
7
8use std::{sync::Arc, time::Instant};
9
10use async_trait::async_trait;
11use iota_types::{
12    base_types::{ObjectID, SequenceNumber, VersionNumber},
13    digests::{CheckpointDigest, TransactionDigest},
14    effects::{TransactionEffects, TransactionEvents},
15    error::{IotaError, IotaResult, UserInputError},
16    messages_checkpoint::{
17        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
18    },
19    object::Object,
20    transaction::Transaction,
21};
22use tracing::instrument;
23
24use crate::key_value_store_metrics::KeyValueStoreMetrics;
25
26pub type KVStoreTransactionData = (Vec<Option<Transaction>>, Vec<Option<TransactionEffects>>);
27
28pub type KVStoreCheckpointData = (
29    Vec<Option<CertifiedCheckpointSummary>>,
30    Vec<Option<CheckpointContents>>,
31    Vec<Option<CertifiedCheckpointSummary>>,
32);
33
34pub struct TransactionKeyValueStore {
35    store_name: &'static str,
36    metrics: Arc<KeyValueStoreMetrics>,
37    inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
38}
39
40impl TransactionKeyValueStore {
41    pub fn new(
42        store_name: &'static str,
43        metrics: Arc<KeyValueStoreMetrics>,
44        inner: Arc<dyn TransactionKeyValueStoreTrait + Send + Sync>,
45    ) -> Self {
46        Self {
47            store_name,
48            metrics,
49            inner,
50        }
51    }
52
53    /// Generic multi_get, allows implementors to get heterogenous values with a
54    /// single round trip.
55    pub async fn multi_get(
56        &self,
57        transaction_keys: &[TransactionDigest],
58        effects_keys: &[TransactionDigest],
59    ) -> IotaResult<KVStoreTransactionData> {
60        let start = Instant::now();
61        let res = self.inner.multi_get(transaction_keys, effects_keys).await;
62        let elapsed = start.elapsed();
63
64        let num_txns = transaction_keys.len() as u64;
65        let num_effects = effects_keys.len() as u64;
66        let total_keys = num_txns + num_effects;
67
68        self.metrics
69            .key_value_store_num_fetches_latency_ms
70            .with_label_values(&[self.store_name, "tx"])
71            .observe(elapsed.as_millis() as f64);
72        self.metrics
73            .key_value_store_num_fetches_batch_size
74            .with_label_values(&[self.store_name, "tx"])
75            .observe(total_keys as f64);
76
77        if let Ok((transactions, effects)) = &res {
78            let txns_not_found = transactions.iter().filter(|v| v.is_none()).count() as u64;
79            let effects_not_found = effects.iter().filter(|v| v.is_none()).count() as u64;
80
81            if num_txns > 0 {
82                self.metrics
83                    .key_value_store_num_fetches_success
84                    .with_label_values(&[self.store_name, "tx"])
85                    .inc_by(num_txns);
86            }
87            if num_effects > 0 {
88                self.metrics
89                    .key_value_store_num_fetches_success
90                    .with_label_values(&[self.store_name, "fx"])
91                    .inc_by(num_effects);
92            }
93
94            if txns_not_found > 0 {
95                self.metrics
96                    .key_value_store_num_fetches_not_found
97                    .with_label_values(&[self.store_name, "tx"])
98                    .inc_by(txns_not_found);
99            }
100            if effects_not_found > 0 {
101                self.metrics
102                    .key_value_store_num_fetches_not_found
103                    .with_label_values(&[self.store_name, "fx"])
104                    .inc_by(effects_not_found);
105            }
106        } else {
107            self.metrics
108                .key_value_store_num_fetches_error
109                .with_label_values(&[self.store_name, "tx"])
110                .inc_by(num_txns);
111            self.metrics
112                .key_value_store_num_fetches_error
113                .with_label_values(&[self.store_name, "fx"])
114                .inc_by(num_effects);
115        }
116
117        res
118    }
119
120    pub async fn multi_get_checkpoints(
121        &self,
122        checkpoint_summaries: &[CheckpointSequenceNumber],
123        checkpoint_contents: &[CheckpointSequenceNumber],
124        checkpoint_summaries_by_digest: &[CheckpointDigest],
125    ) -> IotaResult<(
126        Vec<Option<CertifiedCheckpointSummary>>,
127        Vec<Option<CheckpointContents>>,
128        Vec<Option<CertifiedCheckpointSummary>>,
129    )> {
130        let start = Instant::now();
131        let res = self
132            .inner
133            .multi_get_checkpoints(
134                checkpoint_summaries,
135                checkpoint_contents,
136                checkpoint_summaries_by_digest,
137            )
138            .await;
139        let elapsed = start.elapsed();
140
141        let num_summaries =
142            checkpoint_summaries.len() as u64 + checkpoint_summaries_by_digest.len() as u64;
143        let num_contents = checkpoint_contents.len() as u64;
144
145        self.metrics
146            .key_value_store_num_fetches_latency_ms
147            .with_label_values(&[self.store_name, "checkpoint"])
148            .observe(elapsed.as_millis() as f64);
149        self.metrics
150            .key_value_store_num_fetches_batch_size
151            .with_label_values(&[self.store_name, "checkpoint_summary"])
152            .observe(num_summaries as f64);
153        self.metrics
154            .key_value_store_num_fetches_batch_size
155            .with_label_values(&[self.store_name, "checkpoint_content"])
156            .observe(num_contents as f64);
157
158        if let Ok((summaries, contents, summaries_by_digest)) = &res {
159            let summaries_not_found = summaries.iter().filter(|v| v.is_none()).count() as u64
160                + summaries_by_digest.iter().filter(|v| v.is_none()).count() as u64;
161            let contents_not_found = contents.iter().filter(|v| v.is_none()).count() as u64;
162
163            if num_summaries > 0 {
164                self.metrics
165                    .key_value_store_num_fetches_success
166                    .with_label_values(&[self.store_name, "ckpt_summary"])
167                    .inc_by(num_summaries);
168            }
169            if num_contents > 0 {
170                self.metrics
171                    .key_value_store_num_fetches_success
172                    .with_label_values(&[self.store_name, "ckpt_contents"])
173                    .inc_by(num_contents);
174            }
175
176            if summaries_not_found > 0 {
177                self.metrics
178                    .key_value_store_num_fetches_not_found
179                    .with_label_values(&[self.store_name, "ckpt_summary"])
180                    .inc_by(summaries_not_found);
181            }
182            if contents_not_found > 0 {
183                self.metrics
184                    .key_value_store_num_fetches_not_found
185                    .with_label_values(&[self.store_name, "ckpt_contents"])
186                    .inc_by(contents_not_found);
187            }
188        } else {
189            self.metrics
190                .key_value_store_num_fetches_error
191                .with_label_values(&[self.store_name, "ckpt_summary"])
192                .inc_by(num_summaries);
193            self.metrics
194                .key_value_store_num_fetches_error
195                .with_label_values(&[self.store_name, "ckpt_contents"])
196                .inc_by(num_contents);
197        }
198
199        res
200    }
201
202    pub async fn multi_get_checkpoints_summaries(
203        &self,
204        keys: &[CheckpointSequenceNumber],
205    ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
206        self.multi_get_checkpoints(keys, &[], &[])
207            .await
208            .map(|(summaries, _, _)| summaries)
209    }
210
211    pub async fn multi_get_checkpoints_contents(
212        &self,
213        keys: &[CheckpointSequenceNumber],
214    ) -> IotaResult<Vec<Option<CheckpointContents>>> {
215        self.multi_get_checkpoints(&[], keys, &[])
216            .await
217            .map(|(_, contents, _)| contents)
218    }
219
220    pub async fn multi_get_checkpoints_summaries_by_digest(
221        &self,
222        keys: &[CheckpointDigest],
223    ) -> IotaResult<Vec<Option<CertifiedCheckpointSummary>>> {
224        self.multi_get_checkpoints(&[], &[], keys)
225            .await
226            .map(|(_, _, summaries)| summaries)
227    }
228
229    pub async fn multi_get_tx(
230        &self,
231        keys: &[TransactionDigest],
232    ) -> IotaResult<Vec<Option<Transaction>>> {
233        self.multi_get(keys, &[]).await.map(|(txns, _)| txns)
234    }
235
236    pub async fn multi_get_fx_by_tx_digest(
237        &self,
238        keys: &[TransactionDigest],
239    ) -> IotaResult<Vec<Option<TransactionEffects>>> {
240        self.multi_get(&[], keys).await.map(|(_, fx)| fx)
241    }
242
243    /// Convenience method for fetching single digest, and returning an error if
244    /// it's not found. Prefer using multi_get_tx whenever possible.
245    pub async fn get_tx(&self, digest: TransactionDigest) -> IotaResult<Transaction> {
246        self.multi_get_tx(&[digest])
247            .await?
248            .into_iter()
249            .next()
250            .flatten()
251            .ok_or(IotaError::TransactionNotFound { digest })
252    }
253
254    /// Convenience method for fetching single digest, and returning an error if
255    /// it's not found. Prefer using multi_get_fx_by_tx_digest whenever
256    /// possible.
257    pub async fn get_fx_by_tx_digest(
258        &self,
259        digest: TransactionDigest,
260    ) -> IotaResult<TransactionEffects> {
261        self.multi_get_fx_by_tx_digest(&[digest])
262            .await?
263            .into_iter()
264            .next()
265            .flatten()
266            .ok_or(IotaError::TransactionNotFound { digest })
267    }
268
269    /// Convenience method for fetching single checkpoint, and returning an
270    /// error if it's not found. Prefer using
271    /// multi_get_checkpoints_summaries whenever possible.
272    pub async fn get_checkpoint_summary(
273        &self,
274        checkpoint: CheckpointSequenceNumber,
275    ) -> IotaResult<CertifiedCheckpointSummary> {
276        self.multi_get_checkpoints_summaries(&[checkpoint])
277            .await?
278            .into_iter()
279            .next()
280            .flatten()
281            .ok_or(IotaError::UserInput {
282                error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
283            })
284    }
285
286    /// Convenience method for fetching single checkpoint, and returning an
287    /// error if it's not found. Prefer using multi_get_checkpoints_contents
288    /// whenever possible.
289    pub async fn get_checkpoint_contents(
290        &self,
291        checkpoint: CheckpointSequenceNumber,
292    ) -> IotaResult<CheckpointContents> {
293        self.multi_get_checkpoints_contents(&[checkpoint])
294            .await?
295            .into_iter()
296            .next()
297            .flatten()
298            .ok_or(IotaError::UserInput {
299                error: UserInputError::VerifiedCheckpointNotFound(checkpoint),
300            })
301    }
302
303    /// Convenience method for fetching single checkpoint, and returning an
304    /// error if it's not found. Prefer using
305    /// multi_get_checkpoints_summaries_by_digest whenever possible.
306    pub async fn get_checkpoint_summary_by_digest(
307        &self,
308        digest: CheckpointDigest,
309    ) -> IotaResult<CertifiedCheckpointSummary> {
310        self.multi_get_checkpoints_summaries_by_digest(&[digest])
311            .await?
312            .into_iter()
313            .next()
314            .flatten()
315            .ok_or(IotaError::UserInput {
316                error: UserInputError::VerifiedCheckpointDigestNotFound(format!("{:?}", digest)),
317            })
318    }
319
320    pub async fn get_transaction_perpetual_checkpoint(
321        &self,
322        digest: TransactionDigest,
323    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
324        self.inner
325            .get_transaction_perpetual_checkpoint(digest)
326            .await
327    }
328
329    pub async fn get_object(
330        &self,
331        object_id: ObjectID,
332        version: VersionNumber,
333    ) -> IotaResult<Option<Object>> {
334        self.inner.get_object(object_id, version).await
335    }
336
337    pub async fn multi_get_transactions_perpetual_checkpoints(
338        &self,
339        digests: &[TransactionDigest],
340    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
341        self.inner
342            .multi_get_transactions_perpetual_checkpoints(digests)
343            .await
344    }
345
346    pub async fn multi_get_events_by_tx_digests(
347        &self,
348        digests: &[TransactionDigest],
349    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
350        self.inner.multi_get_events_by_tx_digests(digests).await
351    }
352}
353
354/// Immutable key/value store trait for storing/retrieving transactions,
355/// effects, and events. Only defines multi_get/multi_put methods to discourage
356/// single key/value operations.
357#[async_trait]
358pub trait TransactionKeyValueStoreTrait {
359    /// Generic multi_get, allows implementors to get heterogenous values with a
360    /// single round trip.
361    async fn multi_get(
362        &self,
363        transaction_keys: &[TransactionDigest],
364        effects_keys: &[TransactionDigest],
365    ) -> IotaResult<KVStoreTransactionData>;
366
367    /// Generic multi_get to allow implementors to get heterogenous values with
368    /// a single round trip.
369    async fn multi_get_checkpoints(
370        &self,
371        checkpoint_summaries: &[CheckpointSequenceNumber],
372        checkpoint_contents: &[CheckpointSequenceNumber],
373        checkpoint_summaries_by_digest: &[CheckpointDigest],
374    ) -> IotaResult<KVStoreCheckpointData>;
375
376    async fn get_transaction_perpetual_checkpoint(
377        &self,
378        digest: TransactionDigest,
379    ) -> IotaResult<Option<CheckpointSequenceNumber>>;
380
381    async fn get_object(
382        &self,
383        object_id: ObjectID,
384        version: SequenceNumber,
385    ) -> IotaResult<Option<Object>>;
386
387    async fn multi_get_transactions_perpetual_checkpoints(
388        &self,
389        digests: &[TransactionDigest],
390    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>>;
391
392    async fn multi_get_events_by_tx_digests(
393        &self,
394        digests: &[TransactionDigest],
395    ) -> IotaResult<Vec<Option<TransactionEvents>>>;
396}
397
398/// A TransactionKeyValueStoreTrait that falls back to a secondary store for any
399/// key for which the primary store returns None.
400///
401/// Will be used to check the local rocksdb store, before falling back to a
402/// remote scalable store.
403pub struct FallbackTransactionKVStore {
404    primary: TransactionKeyValueStore,
405    fallback: TransactionKeyValueStore,
406}
407
408impl FallbackTransactionKVStore {
409    pub fn new_kv(
410        primary: TransactionKeyValueStore,
411        fallback: TransactionKeyValueStore,
412        metrics: Arc<KeyValueStoreMetrics>,
413        label: &'static str,
414    ) -> TransactionKeyValueStore {
415        let store = Arc::new(Self { primary, fallback });
416        TransactionKeyValueStore::new(label, metrics, store)
417    }
418}
419
420#[async_trait]
421impl TransactionKeyValueStoreTrait for FallbackTransactionKVStore {
422    #[instrument(level = "trace", skip_all)]
423    async fn multi_get(
424        &self,
425        transaction_keys: &[TransactionDigest],
426        effects_keys: &[TransactionDigest],
427    ) -> IotaResult<KVStoreTransactionData> {
428        let (mut transactions, mut effects) = self
429            .primary
430            .multi_get(transaction_keys, effects_keys)
431            .await?;
432
433        let (fallback_transaction_keys, indices_transactions) =
434            find_fallback(&transactions, transaction_keys);
435        let (fallback_effects_keys, indices_effects) = find_fallback(&effects, effects_keys);
436
437        if fallback_transaction_keys.is_empty() && fallback_effects_keys.is_empty() {
438            return Ok((transactions, effects));
439        }
440
441        let (fallback_transactions, fallback_effects) = self
442            .fallback
443            .multi_get(&fallback_transaction_keys, &fallback_effects_keys)
444            .await?;
445
446        merge_res(
447            &mut transactions,
448            fallback_transactions,
449            &indices_transactions,
450        );
451        merge_res(&mut effects, fallback_effects, &indices_effects);
452
453        Ok((transactions, effects))
454    }
455
456    #[instrument(level = "trace", skip_all)]
457    async fn multi_get_checkpoints(
458        &self,
459        checkpoint_summaries: &[CheckpointSequenceNumber],
460        checkpoint_contents: &[CheckpointSequenceNumber],
461        checkpoint_summaries_by_digest: &[CheckpointDigest],
462    ) -> IotaResult<(
463        Vec<Option<CertifiedCheckpointSummary>>,
464        Vec<Option<CheckpointContents>>,
465        Vec<Option<CertifiedCheckpointSummary>>,
466    )> {
467        let (mut summaries, mut contents, mut summaries_by_digest) = self
468            .primary
469            .multi_get_checkpoints(
470                checkpoint_summaries,
471                checkpoint_contents,
472                checkpoint_summaries_by_digest,
473            )
474            .await?;
475
476        let (fallback_summaries, indices_summaries) =
477            find_fallback(&summaries, checkpoint_summaries);
478        let (fallback_contents, indices_contents) = find_fallback(&contents, checkpoint_contents);
479        let (fallback_summaries_by_digest, indices_summaries_by_digest) =
480            find_fallback(&summaries_by_digest, checkpoint_summaries_by_digest);
481
482        if fallback_summaries.is_empty()
483            && fallback_contents.is_empty()
484            && fallback_summaries_by_digest.is_empty()
485        {
486            return Ok((summaries, contents, summaries_by_digest));
487        }
488
489        let (fallback_summaries, fallback_contents, fallback_summaries_by_digest) = self
490            .fallback
491            .multi_get_checkpoints(
492                &fallback_summaries,
493                &fallback_contents,
494                &fallback_summaries_by_digest,
495            )
496            .await?;
497
498        merge_res(&mut summaries, fallback_summaries, &indices_summaries);
499        merge_res(&mut contents, fallback_contents, &indices_contents);
500        merge_res(
501            &mut summaries_by_digest,
502            fallback_summaries_by_digest,
503            &indices_summaries_by_digest,
504        );
505
506        Ok((summaries, contents, summaries_by_digest))
507    }
508
509    #[instrument(level = "trace", skip_all)]
510    async fn get_transaction_perpetual_checkpoint(
511        &self,
512        digest: TransactionDigest,
513    ) -> IotaResult<Option<CheckpointSequenceNumber>> {
514        let mut res = self
515            .primary
516            .get_transaction_perpetual_checkpoint(digest)
517            .await?;
518        if res.is_none() {
519            res = self
520                .fallback
521                .get_transaction_perpetual_checkpoint(digest)
522                .await?;
523        }
524        Ok(res)
525    }
526
527    #[instrument(level = "trace", skip_all)]
528    async fn get_object(
529        &self,
530        object_id: ObjectID,
531        version: SequenceNumber,
532    ) -> IotaResult<Option<Object>> {
533        let mut res = self.primary.get_object(object_id, version).await?;
534        if res.is_none() {
535            res = self.fallback.get_object(object_id, version).await?;
536        }
537        Ok(res)
538    }
539
540    #[instrument(level = "trace", skip_all)]
541    async fn multi_get_transactions_perpetual_checkpoints(
542        &self,
543        digests: &[TransactionDigest],
544    ) -> IotaResult<Vec<Option<CheckpointSequenceNumber>>> {
545        let mut res = self
546            .primary
547            .multi_get_transactions_perpetual_checkpoints(digests)
548            .await?;
549
550        let (fallback, indices) = find_fallback(&res, digests);
551
552        if fallback.is_empty() {
553            return Ok(res);
554        }
555
556        let secondary_res = self
557            .fallback
558            .multi_get_transactions_perpetual_checkpoints(&fallback)
559            .await?;
560
561        merge_res(&mut res, secondary_res, &indices);
562
563        Ok(res)
564    }
565
566    #[instrument(level = "trace", skip_all)]
567    async fn multi_get_events_by_tx_digests(
568        &self,
569        digests: &[TransactionDigest],
570    ) -> IotaResult<Vec<Option<TransactionEvents>>> {
571        let mut res = self.primary.multi_get_events_by_tx_digests(digests).await?;
572        let (fallback, indices) = find_fallback(&res, digests);
573        if fallback.is_empty() {
574            return Ok(res);
575        }
576        let secondary_res = self
577            .fallback
578            .multi_get_events_by_tx_digests(&fallback)
579            .await?;
580        merge_res(&mut res, secondary_res, &indices);
581        Ok(res)
582    }
583}
584
585fn find_fallback<T, K: Clone>(values: &[Option<T>], keys: &[K]) -> (Vec<K>, Vec<usize>) {
586    let num_nones = values.iter().filter(|v| v.is_none()).count();
587    let mut fallback_keys = Vec::with_capacity(num_nones);
588    let mut fallback_indices = Vec::with_capacity(num_nones);
589    for (i, value) in values.iter().enumerate() {
590        if value.is_none() {
591            fallback_keys.push(keys[i].clone());
592            fallback_indices.push(i);
593        }
594    }
595    (fallback_keys, fallback_indices)
596}
597
598fn merge_res<T>(values: &mut [Option<T>], fallback_values: Vec<Option<T>>, indices: &[usize]) {
599    for (&index, fallback_value) in indices.iter().zip(fallback_values) {
600        values[index] = fallback_value;
601    }
602}