iota_json_rpc/
authority_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap},
7    sync::Arc,
8};
9
10use anyhow::anyhow;
11use arc_swap::Guard;
12use async_trait::async_trait;
13use iota_core::{
14    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
15    execution_cache::ObjectCacheRead,
16    subscription_handler::SubscriptionHandler,
17};
18use iota_json_rpc_types::{
19    Coin as IotaCoin, DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent,
20    IotaObjectDataFilter, TransactionFilter,
21};
22use iota_storage::{
23    indexes::TotalBalance,
24    key_value_store::{
25        KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
26    },
27};
28use iota_types::{
29    base_types::{IotaAddress, MoveObjectType, ObjectID, ObjectInfo, ObjectRef, SequenceNumber},
30    bridge::Bridge,
31    committee::{Committee, EpochId},
32    digests::{ChainIdentifier, TransactionDigest},
33    dynamic_field::DynamicFieldInfo,
34    effects::TransactionEffects,
35    error::{IotaError, UserInputError},
36    event::EventID,
37    governance::StakedIota,
38    iota_serde::BigInt,
39    iota_system_state::IotaSystemState,
40    messages_checkpoint::{
41        CheckpointContents, CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber,
42        VerifiedCheckpoint,
43    },
44    object::{Object, ObjectRead, PastObjectRead},
45    storage::{BackingPackageStore, ObjectStore, WriteKind},
46    timelock::timelocked_staked_iota::TimelockedStakedIota,
47    transaction::{Transaction, TransactionData, TransactionKind},
48};
49#[cfg(test)]
50use mockall::automock;
51use move_core_types::language_storage::TypeTag;
52use thiserror::Error;
53use tokio::task::JoinError;
54
55use crate::ObjectProvider;
56
57pub type StateReadResult<T = ()> = Result<T, StateReadError>;
58
59/// Trait for AuthorityState methods commonly used by at least two api.
60#[cfg_attr(test, automock)]
61#[async_trait]
62pub trait StateRead: Send + Sync {
63    async fn multi_get(
64        &self,
65        transaction_keys: &[TransactionDigest],
66        effects_keys: &[TransactionDigest],
67    ) -> StateReadResult<KVStoreTransactionData>;
68
69    fn get_object_read(&self, object_id: &ObjectID) -> StateReadResult<ObjectRead>;
70
71    fn get_past_object_read(
72        &self,
73        object_id: &ObjectID,
74        version: SequenceNumber,
75    ) -> StateReadResult<PastObjectRead>;
76
77    async fn get_object(&self, object_id: &ObjectID) -> StateReadResult<Option<Object>>;
78
79    fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>>;
80
81    fn get_dynamic_fields(
82        &self,
83        owner: ObjectID,
84        cursor: Option<ObjectID>,
85        limit: usize,
86    ) -> StateReadResult<Vec<(ObjectID, DynamicFieldInfo)>>;
87
88    fn get_cache_reader(&self) -> &Arc<dyn ObjectCacheRead>;
89
90    fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync>;
91
92    fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync>;
93
94    fn get_owner_objects(
95        &self,
96        owner: IotaAddress,
97        cursor: Option<ObjectID>,
98        filter: Option<IotaObjectDataFilter>,
99    ) -> StateReadResult<Vec<ObjectInfo>>;
100
101    async fn query_events(
102        &self,
103        kv_store: &Arc<TransactionKeyValueStore>,
104        query: EventFilter,
105        // If `Some`, the query will start from the next item after the specified cursor
106        cursor: Option<EventID>,
107        limit: usize,
108        descending: bool,
109    ) -> StateReadResult<Vec<IotaEvent>>;
110
111    // transaction_execution_api
112    #[allow(clippy::type_complexity)]
113    async fn dry_exec_transaction(
114        &self,
115        transaction: TransactionData,
116        transaction_digest: TransactionDigest,
117    ) -> StateReadResult<(
118        DryRunTransactionBlockResponse,
119        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
120        TransactionEffects,
121        Option<ObjectID>,
122    )>;
123
124    async fn dev_inspect_transaction_block(
125        &self,
126        sender: IotaAddress,
127        transaction_kind: TransactionKind,
128        gas_price: Option<u64>,
129        gas_budget: Option<u64>,
130        gas_sponsor: Option<IotaAddress>,
131        gas_objects: Option<Vec<ObjectRef>>,
132        show_raw_txn_data_and_effects: Option<bool>,
133        skip_checks: Option<bool>,
134    ) -> StateReadResult<DevInspectResults>;
135
136    // indexer_api
137    fn get_subscription_handler(&self) -> Arc<SubscriptionHandler>;
138
139    fn get_owner_objects_with_limit(
140        &self,
141        owner: IotaAddress,
142        cursor: Option<ObjectID>,
143        limit: usize,
144        filter: Option<IotaObjectDataFilter>,
145    ) -> StateReadResult<Vec<ObjectInfo>>;
146
147    async fn get_transactions(
148        &self,
149        kv_store: &Arc<TransactionKeyValueStore>,
150        filter: Option<TransactionFilter>,
151        cursor: Option<TransactionDigest>,
152        limit: Option<usize>,
153        reverse: bool,
154    ) -> StateReadResult<Vec<TransactionDigest>>;
155
156    fn get_dynamic_field_object_id(
157        &self,
158        owner: ObjectID,
159        name_type: TypeTag,
160        name_bcs_bytes: &[u8],
161    ) -> StateReadResult<Option<ObjectID>>;
162
163    // governance_api
164    async fn get_staked_iota(&self, owner: IotaAddress) -> StateReadResult<Vec<StakedIota>>;
165    async fn get_timelocked_staked_iota(
166        &self,
167        owner: IotaAddress,
168    ) -> StateReadResult<Vec<TimelockedStakedIota>>;
169
170    fn get_system_state(&self) -> StateReadResult<IotaSystemState>;
171    fn get_or_latest_committee(&self, epoch: Option<BigInt<u64>>) -> StateReadResult<Committee>;
172
173    // bridge_api
174    fn get_bridge(&self) -> StateReadResult<Bridge>;
175
176    // coin_api
177    fn find_publish_txn_digest(&self, package_id: ObjectID) -> StateReadResult<TransactionDigest>;
178    fn get_owned_coins(
179        &self,
180        owner: IotaAddress,
181        cursor: (String, ObjectID),
182        limit: usize,
183        one_coin_type_only: bool,
184    ) -> StateReadResult<Vec<IotaCoin>>;
185    async fn get_executed_transaction_and_effects(
186        &self,
187        digest: TransactionDigest,
188        kv_store: Arc<TransactionKeyValueStore>,
189    ) -> StateReadResult<(Transaction, TransactionEffects)>;
190    async fn get_balance(
191        &self,
192        owner: IotaAddress,
193        coin_type: TypeTag,
194    ) -> StateReadResult<TotalBalance>;
195    async fn get_all_balance(
196        &self,
197        owner: IotaAddress,
198    ) -> StateReadResult<Arc<HashMap<TypeTag, TotalBalance>>>;
199
200    // read_api
201    fn get_verified_checkpoint_by_sequence_number(
202        &self,
203        sequence_number: CheckpointSequenceNumber,
204    ) -> StateReadResult<VerifiedCheckpoint>;
205
206    fn get_checkpoint_contents(
207        &self,
208        digest: CheckpointContentsDigest,
209    ) -> StateReadResult<CheckpointContents>;
210
211    fn get_verified_checkpoint_summary_by_digest(
212        &self,
213        digest: CheckpointDigest,
214    ) -> StateReadResult<VerifiedCheckpoint>;
215
216    fn multi_get_transactions_perpetual_checkpoints(
217        &self,
218        digests: &[TransactionDigest],
219    ) -> StateReadResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>>;
220
221    fn get_transaction_perpetual_checkpoint(
222        &self,
223        digest: &TransactionDigest,
224    ) -> StateReadResult<Option<(EpochId, CheckpointSequenceNumber)>>;
225
226    fn multi_get_checkpoint_by_sequence_number(
227        &self,
228        sequence_numbers: &[CheckpointSequenceNumber],
229    ) -> StateReadResult<Vec<Option<VerifiedCheckpoint>>>;
230
231    fn get_total_transaction_blocks(&self) -> StateReadResult<u64>;
232
233    fn get_checkpoint_by_sequence_number(
234        &self,
235        sequence_number: CheckpointSequenceNumber,
236    ) -> StateReadResult<Option<VerifiedCheckpoint>>;
237
238    fn get_latest_checkpoint_sequence_number(&self) -> StateReadResult<CheckpointSequenceNumber>;
239
240    fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier>;
241}
242
243#[async_trait]
244impl StateRead for AuthorityState {
245    async fn multi_get(
246        &self,
247        transaction_keys: &[TransactionDigest],
248        effects_keys: &[TransactionDigest],
249    ) -> StateReadResult<KVStoreTransactionData> {
250        Ok(
251            <AuthorityState as TransactionKeyValueStoreTrait>::multi_get(
252                self,
253                transaction_keys,
254                effects_keys,
255            )
256            .await?,
257        )
258    }
259
260    fn get_object_read(&self, object_id: &ObjectID) -> StateReadResult<ObjectRead> {
261        Ok(self.get_object_read(object_id)?)
262    }
263
264    async fn get_object(&self, object_id: &ObjectID) -> StateReadResult<Option<Object>> {
265        Ok(self.get_object(object_id).await?)
266    }
267
268    fn get_past_object_read(
269        &self,
270        object_id: &ObjectID,
271        version: SequenceNumber,
272    ) -> StateReadResult<PastObjectRead> {
273        Ok(self.get_past_object_read(object_id, version)?)
274    }
275
276    fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
277        self.load_epoch_store_one_call_per_task()
278    }
279
280    fn get_dynamic_fields(
281        &self,
282        owner: ObjectID,
283        cursor: Option<ObjectID>,
284        limit: usize,
285    ) -> StateReadResult<Vec<(ObjectID, DynamicFieldInfo)>> {
286        Ok(self.get_dynamic_fields(owner, cursor, limit)?)
287    }
288
289    fn get_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
290        self.get_object_cache_reader()
291    }
292
293    fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
294        self.get_object_store()
295    }
296
297    fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
298        self.get_backing_package_store()
299    }
300
301    fn get_owner_objects(
302        &self,
303        owner: IotaAddress,
304        cursor: Option<ObjectID>,
305        filter: Option<IotaObjectDataFilter>,
306    ) -> StateReadResult<Vec<ObjectInfo>> {
307        Ok(self
308            .get_owner_objects_iterator(owner, cursor, filter)?
309            .collect())
310    }
311
312    async fn query_events(
313        &self,
314        kv_store: &Arc<TransactionKeyValueStore>,
315        query: EventFilter,
316        // If `Some`, the query will start from the next item after the specified cursor
317        cursor: Option<EventID>,
318        limit: usize,
319        descending: bool,
320    ) -> StateReadResult<Vec<IotaEvent>> {
321        Ok(self
322            .query_events(kv_store, query, cursor, limit, descending)
323            .await?)
324    }
325
326    async fn dry_exec_transaction(
327        &self,
328        transaction: TransactionData,
329        transaction_digest: TransactionDigest,
330    ) -> StateReadResult<(
331        DryRunTransactionBlockResponse,
332        BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
333        TransactionEffects,
334        Option<ObjectID>,
335    )> {
336        Ok(self
337            .dry_exec_transaction(transaction, transaction_digest)
338            .await?)
339    }
340
341    async fn dev_inspect_transaction_block(
342        &self,
343        sender: IotaAddress,
344        transaction_kind: TransactionKind,
345        gas_price: Option<u64>,
346        gas_budget: Option<u64>,
347        gas_sponsor: Option<IotaAddress>,
348        gas_objects: Option<Vec<ObjectRef>>,
349        show_raw_txn_data_and_effects: Option<bool>,
350        skip_checks: Option<bool>,
351    ) -> StateReadResult<DevInspectResults> {
352        Ok(self
353            .dev_inspect_transaction_block(
354                sender,
355                transaction_kind,
356                gas_price,
357                gas_budget,
358                gas_sponsor,
359                gas_objects,
360                show_raw_txn_data_and_effects,
361                skip_checks,
362            )
363            .await?)
364    }
365
366    fn get_subscription_handler(&self) -> Arc<SubscriptionHandler> {
367        self.subscription_handler.clone()
368    }
369
370    fn get_owner_objects_with_limit(
371        &self,
372        owner: IotaAddress,
373        cursor: Option<ObjectID>,
374        limit: usize,
375        filter: Option<IotaObjectDataFilter>,
376    ) -> StateReadResult<Vec<ObjectInfo>> {
377        Ok(self.get_owner_objects(owner, cursor, limit, filter)?)
378    }
379
380    async fn get_transactions(
381        &self,
382        kv_store: &Arc<TransactionKeyValueStore>,
383        filter: Option<TransactionFilter>,
384        cursor: Option<TransactionDigest>,
385        limit: Option<usize>,
386        reverse: bool,
387    ) -> StateReadResult<Vec<TransactionDigest>> {
388        Ok(self
389            .get_transactions(kv_store, filter, cursor, limit, reverse)
390            .await?)
391    }
392
393    fn get_dynamic_field_object_id(
394        // indexer
395        &self,
396        owner: ObjectID,
397        name_type: TypeTag,
398        name_bcs_bytes: &[u8],
399    ) -> StateReadResult<Option<ObjectID>> {
400        Ok(self.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)?)
401    }
402
403    async fn get_staked_iota(&self, owner: IotaAddress) -> StateReadResult<Vec<StakedIota>> {
404        Ok(self
405            .get_move_objects(owner, MoveObjectType::staked_iota())
406            .await?)
407    }
408
409    async fn get_timelocked_staked_iota(
410        &self,
411        owner: IotaAddress,
412    ) -> StateReadResult<Vec<TimelockedStakedIota>> {
413        Ok(self
414            .get_move_objects(owner, MoveObjectType::timelocked_staked_iota())
415            .await?)
416    }
417
418    fn get_system_state(&self) -> StateReadResult<IotaSystemState> {
419        Ok(self
420            .get_cache_reader()
421            .get_iota_system_state_object_unsafe()?)
422    }
423
424    fn get_or_latest_committee(&self, epoch: Option<BigInt<u64>>) -> StateReadResult<Committee> {
425        Ok(self
426            .committee_store()
427            .get_or_latest_committee(epoch.map(|e| *e))?)
428    }
429
430    fn get_bridge(&self) -> StateReadResult<Bridge> {
431        self.get_cache_reader()
432            .get_bridge_object_unsafe()
433            .map_err(|err| err.into())
434    }
435
436    fn find_publish_txn_digest(&self, package_id: ObjectID) -> StateReadResult<TransactionDigest> {
437        Ok(self.find_publish_txn_digest(package_id)?)
438    }
439    fn get_owned_coins(
440        &self,
441        owner: IotaAddress,
442        cursor: (String, ObjectID),
443        limit: usize,
444        one_coin_type_only: bool,
445    ) -> StateReadResult<Vec<IotaCoin>> {
446        Ok(self
447            .get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)?
448            .map(|(coin_type, coin_object_id, coin)| IotaCoin {
449                coin_type,
450                coin_object_id,
451                version: coin.version,
452                digest: coin.digest,
453                balance: coin.balance,
454                previous_transaction: coin.previous_transaction,
455            })
456            .collect::<Vec<_>>())
457    }
458
459    async fn get_executed_transaction_and_effects(
460        &self,
461        digest: TransactionDigest,
462        kv_store: Arc<TransactionKeyValueStore>,
463    ) -> StateReadResult<(Transaction, TransactionEffects)> {
464        Ok(self
465            .get_executed_transaction_and_effects(digest, kv_store)
466            .await?)
467    }
468
469    async fn get_balance(
470        &self,
471        owner: IotaAddress,
472        coin_type: TypeTag,
473    ) -> StateReadResult<TotalBalance> {
474        Ok(self
475            .indexes
476            .as_ref()
477            .ok_or(IotaError::IndexStoreNotAvailable)?
478            .get_balance(owner, coin_type)
479            .await?)
480    }
481
482    async fn get_all_balance(
483        &self,
484        owner: IotaAddress,
485    ) -> StateReadResult<Arc<HashMap<TypeTag, TotalBalance>>> {
486        Ok(self
487            .indexes
488            .as_ref()
489            .ok_or(IotaError::IndexStoreNotAvailable)?
490            .get_all_balance(owner)
491            .await?)
492    }
493
494    fn get_verified_checkpoint_by_sequence_number(
495        &self,
496        sequence_number: CheckpointSequenceNumber,
497    ) -> StateReadResult<VerifiedCheckpoint> {
498        Ok(self.get_verified_checkpoint_by_sequence_number(sequence_number)?)
499    }
500
501    fn get_checkpoint_contents(
502        &self,
503        digest: CheckpointContentsDigest,
504    ) -> StateReadResult<CheckpointContents> {
505        Ok(self.get_checkpoint_contents(digest)?)
506    }
507
508    fn get_verified_checkpoint_summary_by_digest(
509        &self,
510        digest: CheckpointDigest,
511    ) -> StateReadResult<VerifiedCheckpoint> {
512        Ok(self.get_verified_checkpoint_summary_by_digest(digest)?)
513    }
514
515    fn multi_get_transactions_perpetual_checkpoints(
516        &self,
517        digests: &[TransactionDigest],
518    ) -> StateReadResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
519        Ok(self
520            .get_checkpoint_cache()
521            .multi_get_transactions_perpetual_checkpoints(digests)?)
522    }
523
524    fn get_transaction_perpetual_checkpoint(
525        &self,
526        digest: &TransactionDigest,
527    ) -> StateReadResult<Option<(EpochId, CheckpointSequenceNumber)>> {
528        Ok(self
529            .get_checkpoint_cache()
530            .get_transaction_perpetual_checkpoint(digest)?)
531    }
532
533    fn multi_get_checkpoint_by_sequence_number(
534        &self,
535        sequence_numbers: &[CheckpointSequenceNumber],
536    ) -> StateReadResult<Vec<Option<VerifiedCheckpoint>>> {
537        Ok(self.multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
538    }
539
540    fn get_total_transaction_blocks(&self) -> StateReadResult<u64> {
541        Ok(self.get_total_transaction_blocks()?)
542    }
543
544    fn get_checkpoint_by_sequence_number(
545        &self,
546        sequence_number: CheckpointSequenceNumber,
547    ) -> StateReadResult<Option<VerifiedCheckpoint>> {
548        Ok(self.get_checkpoint_by_sequence_number(sequence_number)?)
549    }
550
551    fn get_latest_checkpoint_sequence_number(&self) -> StateReadResult<CheckpointSequenceNumber> {
552        Ok(self.get_latest_checkpoint_sequence_number()?)
553    }
554
555    fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier> {
556        Ok(self
557            .get_chain_identifier()
558            .ok_or(anyhow!("Chain identifier not found"))?)
559    }
560}
561
562/// This implementation allows `S` to be a dynamically sized type (DST) that
563/// implements ObjectProvider Valid as `S` is referenced only, and memory
564/// management is handled by `Arc`
565#[async_trait]
566impl<S: ?Sized + StateRead> ObjectProvider for Arc<S> {
567    type Error = StateReadError;
568
569    async fn get_object(
570        &self,
571        id: &ObjectID,
572        version: &SequenceNumber,
573    ) -> Result<Object, Self::Error> {
574        Ok(self.get_past_object_read(id, *version)?.into_object()?)
575    }
576
577    async fn find_object_lt_or_eq_version(
578        &self,
579        id: &ObjectID,
580        version: &SequenceNumber,
581    ) -> Result<Option<Object>, Self::Error> {
582        Ok(self
583            .get_cache_reader()
584            .find_object_lt_or_eq_version(*id, *version)?)
585    }
586}
587
588#[async_trait]
589impl<S: ?Sized + StateRead> ObjectProvider for (Arc<S>, Arc<TransactionKeyValueStore>) {
590    type Error = StateReadError;
591
592    async fn get_object(
593        &self,
594        id: &ObjectID,
595        version: &SequenceNumber,
596    ) -> Result<Object, Self::Error> {
597        let object_read = self.0.get_past_object_read(id, *version)?;
598        match object_read {
599            PastObjectRead::ObjectNotExists(_) | PastObjectRead::VersionNotFound(..) => {
600                match self.1.get_object(*id, *version).await? {
601                    Some(object) => Ok(object),
602                    None => Ok(PastObjectRead::VersionNotFound(*id, *version).into_object()?),
603                }
604            }
605            _ => Ok(object_read.into_object()?),
606        }
607    }
608
609    async fn find_object_lt_or_eq_version(
610        &self,
611        id: &ObjectID,
612        version: &SequenceNumber,
613    ) -> Result<Option<Object>, Self::Error> {
614        Ok(self
615            .0
616            .get_cache_reader()
617            .find_object_lt_or_eq_version(*id, *version)?)
618    }
619}
620
621#[derive(Debug, Error)]
622pub enum StateReadInternalError {
623    #[error(transparent)]
624    Iota(#[from] IotaError),
625    #[error(transparent)]
626    Join(#[from] JoinError),
627    #[error(transparent)]
628    Anyhow(#[from] anyhow::Error),
629}
630
631#[derive(Debug, Error)]
632pub enum StateReadClientError {
633    #[error(transparent)]
634    Iota(#[from] IotaError),
635    #[error(transparent)]
636    UserInput(#[from] UserInputError),
637}
638
639/// `StateReadError` is the error type for callers to work with.
640/// It captures all possible errors that can occur while reading state,
641/// classifying them into two categories. Unless `StateReadError` is the final
642/// error state before returning to caller, the app may still want error
643/// context. This context is preserved in `Internal` and `Client` variants.
644#[derive(Debug, Error)]
645pub enum StateReadError {
646    // iota_json_rpc::Error will do the final conversion to generic error message
647    #[error(transparent)]
648    Internal(#[from] StateReadInternalError),
649
650    // Client errors
651    #[error(transparent)]
652    Client(#[from] StateReadClientError),
653}
654
655impl From<IotaError> for StateReadError {
656    fn from(e: IotaError) -> Self {
657        match e {
658            IotaError::IndexStoreNotAvailable
659            | IotaError::TransactionNotFound { .. }
660            | IotaError::UnsupportedFeature { .. }
661            | IotaError::UserInput { .. }
662            | IotaError::WrongMessageVersion { .. } => StateReadError::Client(e.into()),
663            _ => StateReadError::Internal(e.into()),
664        }
665    }
666}
667
668impl From<UserInputError> for StateReadError {
669    fn from(e: UserInputError) -> Self {
670        StateReadError::Client(e.into())
671    }
672}
673
674impl From<JoinError> for StateReadError {
675    fn from(e: JoinError) -> Self {
676        StateReadError::Internal(e.into())
677    }
678}
679
680impl From<anyhow::Error> for StateReadError {
681    fn from(e: anyhow::Error) -> Self {
682        StateReadError::Internal(e.into())
683    }
684}