1use std::{
6 collections::{BTreeMap, HashMap},
7 sync::Arc,
8};
9
10use anyhow::anyhow;
11use arc_swap::Guard;
12use async_trait::async_trait;
13use iota_core::{
14 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
15 execution_cache::ObjectCacheRead,
16 subscription_handler::SubscriptionHandler,
17};
18use iota_json_rpc_types::{
19 Coin as IotaCoin, DevInspectResults, DryRunTransactionBlockResponse, EventFilter, IotaEvent,
20 IotaObjectDataFilter, TransactionFilter,
21};
22use iota_storage::{
23 indexes::TotalBalance,
24 key_value_store::{
25 KVStoreTransactionData, TransactionKeyValueStore, TransactionKeyValueStoreTrait,
26 },
27};
28use iota_types::{
29 base_types::{IotaAddress, MoveObjectType, ObjectID, ObjectInfo, ObjectRef, SequenceNumber},
30 bridge::Bridge,
31 committee::{Committee, EpochId},
32 digests::{ChainIdentifier, TransactionDigest},
33 dynamic_field::DynamicFieldInfo,
34 effects::TransactionEffects,
35 error::{IotaError, UserInputError},
36 event::EventID,
37 governance::StakedIota,
38 iota_serde::BigInt,
39 iota_system_state::IotaSystemState,
40 messages_checkpoint::{
41 CheckpointContents, CheckpointContentsDigest, CheckpointDigest, CheckpointSequenceNumber,
42 VerifiedCheckpoint,
43 },
44 object::{Object, ObjectRead, PastObjectRead},
45 storage::{BackingPackageStore, ObjectStore, WriteKind},
46 timelock::timelocked_staked_iota::TimelockedStakedIota,
47 transaction::{Transaction, TransactionData, TransactionKind},
48};
49#[cfg(test)]
50use mockall::automock;
51use move_core_types::language_storage::TypeTag;
52use thiserror::Error;
53use tokio::task::JoinError;
54
55use crate::ObjectProvider;
56
57pub type StateReadResult<T = ()> = Result<T, StateReadError>;
58
59#[cfg_attr(test, automock)]
61#[async_trait]
62pub trait StateRead: Send + Sync {
63 async fn multi_get(
64 &self,
65 transaction_keys: &[TransactionDigest],
66 effects_keys: &[TransactionDigest],
67 ) -> StateReadResult<KVStoreTransactionData>;
68
69 fn get_object_read(&self, object_id: &ObjectID) -> StateReadResult<ObjectRead>;
70
71 fn get_past_object_read(
72 &self,
73 object_id: &ObjectID,
74 version: SequenceNumber,
75 ) -> StateReadResult<PastObjectRead>;
76
77 async fn get_object(&self, object_id: &ObjectID) -> StateReadResult<Option<Object>>;
78
79 fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>>;
80
81 fn get_dynamic_fields(
82 &self,
83 owner: ObjectID,
84 cursor: Option<ObjectID>,
85 limit: usize,
86 ) -> StateReadResult<Vec<(ObjectID, DynamicFieldInfo)>>;
87
88 fn get_cache_reader(&self) -> &Arc<dyn ObjectCacheRead>;
89
90 fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync>;
91
92 fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync>;
93
94 fn get_owner_objects(
95 &self,
96 owner: IotaAddress,
97 cursor: Option<ObjectID>,
98 filter: Option<IotaObjectDataFilter>,
99 ) -> StateReadResult<Vec<ObjectInfo>>;
100
101 async fn query_events(
102 &self,
103 kv_store: &Arc<TransactionKeyValueStore>,
104 query: EventFilter,
105 cursor: Option<EventID>,
107 limit: usize,
108 descending: bool,
109 ) -> StateReadResult<Vec<IotaEvent>>;
110
111 #[allow(clippy::type_complexity)]
113 async fn dry_exec_transaction(
114 &self,
115 transaction: TransactionData,
116 transaction_digest: TransactionDigest,
117 ) -> StateReadResult<(
118 DryRunTransactionBlockResponse,
119 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
120 TransactionEffects,
121 Option<ObjectID>,
122 )>;
123
124 async fn dev_inspect_transaction_block(
125 &self,
126 sender: IotaAddress,
127 transaction_kind: TransactionKind,
128 gas_price: Option<u64>,
129 gas_budget: Option<u64>,
130 gas_sponsor: Option<IotaAddress>,
131 gas_objects: Option<Vec<ObjectRef>>,
132 show_raw_txn_data_and_effects: Option<bool>,
133 skip_checks: Option<bool>,
134 ) -> StateReadResult<DevInspectResults>;
135
136 fn get_subscription_handler(&self) -> Arc<SubscriptionHandler>;
138
139 fn get_owner_objects_with_limit(
140 &self,
141 owner: IotaAddress,
142 cursor: Option<ObjectID>,
143 limit: usize,
144 filter: Option<IotaObjectDataFilter>,
145 ) -> StateReadResult<Vec<ObjectInfo>>;
146
147 async fn get_transactions(
148 &self,
149 kv_store: &Arc<TransactionKeyValueStore>,
150 filter: Option<TransactionFilter>,
151 cursor: Option<TransactionDigest>,
152 limit: Option<usize>,
153 reverse: bool,
154 ) -> StateReadResult<Vec<TransactionDigest>>;
155
156 fn get_dynamic_field_object_id(
157 &self,
158 owner: ObjectID,
159 name_type: TypeTag,
160 name_bcs_bytes: &[u8],
161 ) -> StateReadResult<Option<ObjectID>>;
162
163 async fn get_staked_iota(&self, owner: IotaAddress) -> StateReadResult<Vec<StakedIota>>;
165 async fn get_timelocked_staked_iota(
166 &self,
167 owner: IotaAddress,
168 ) -> StateReadResult<Vec<TimelockedStakedIota>>;
169
170 fn get_system_state(&self) -> StateReadResult<IotaSystemState>;
171 fn get_or_latest_committee(&self, epoch: Option<BigInt<u64>>) -> StateReadResult<Committee>;
172
173 fn get_bridge(&self) -> StateReadResult<Bridge>;
175
176 fn find_publish_txn_digest(&self, package_id: ObjectID) -> StateReadResult<TransactionDigest>;
178 fn get_owned_coins(
179 &self,
180 owner: IotaAddress,
181 cursor: (String, ObjectID),
182 limit: usize,
183 one_coin_type_only: bool,
184 ) -> StateReadResult<Vec<IotaCoin>>;
185 async fn get_executed_transaction_and_effects(
186 &self,
187 digest: TransactionDigest,
188 kv_store: Arc<TransactionKeyValueStore>,
189 ) -> StateReadResult<(Transaction, TransactionEffects)>;
190 async fn get_balance(
191 &self,
192 owner: IotaAddress,
193 coin_type: TypeTag,
194 ) -> StateReadResult<TotalBalance>;
195 async fn get_all_balance(
196 &self,
197 owner: IotaAddress,
198 ) -> StateReadResult<Arc<HashMap<TypeTag, TotalBalance>>>;
199
200 fn get_verified_checkpoint_by_sequence_number(
202 &self,
203 sequence_number: CheckpointSequenceNumber,
204 ) -> StateReadResult<VerifiedCheckpoint>;
205
206 fn get_checkpoint_contents(
207 &self,
208 digest: CheckpointContentsDigest,
209 ) -> StateReadResult<CheckpointContents>;
210
211 fn get_verified_checkpoint_summary_by_digest(
212 &self,
213 digest: CheckpointDigest,
214 ) -> StateReadResult<VerifiedCheckpoint>;
215
216 fn multi_get_transactions_perpetual_checkpoints(
217 &self,
218 digests: &[TransactionDigest],
219 ) -> StateReadResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>>;
220
221 fn get_transaction_perpetual_checkpoint(
222 &self,
223 digest: &TransactionDigest,
224 ) -> StateReadResult<Option<(EpochId, CheckpointSequenceNumber)>>;
225
226 fn multi_get_checkpoint_by_sequence_number(
227 &self,
228 sequence_numbers: &[CheckpointSequenceNumber],
229 ) -> StateReadResult<Vec<Option<VerifiedCheckpoint>>>;
230
231 fn get_total_transaction_blocks(&self) -> StateReadResult<u64>;
232
233 fn get_checkpoint_by_sequence_number(
234 &self,
235 sequence_number: CheckpointSequenceNumber,
236 ) -> StateReadResult<Option<VerifiedCheckpoint>>;
237
238 fn get_latest_checkpoint_sequence_number(&self) -> StateReadResult<CheckpointSequenceNumber>;
239
240 fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier>;
241}
242
243#[async_trait]
244impl StateRead for AuthorityState {
245 async fn multi_get(
246 &self,
247 transaction_keys: &[TransactionDigest],
248 effects_keys: &[TransactionDigest],
249 ) -> StateReadResult<KVStoreTransactionData> {
250 Ok(
251 <AuthorityState as TransactionKeyValueStoreTrait>::multi_get(
252 self,
253 transaction_keys,
254 effects_keys,
255 )
256 .await?,
257 )
258 }
259
260 fn get_object_read(&self, object_id: &ObjectID) -> StateReadResult<ObjectRead> {
261 Ok(self.get_object_read(object_id)?)
262 }
263
264 async fn get_object(&self, object_id: &ObjectID) -> StateReadResult<Option<Object>> {
265 Ok(self.get_object(object_id).await?)
266 }
267
268 fn get_past_object_read(
269 &self,
270 object_id: &ObjectID,
271 version: SequenceNumber,
272 ) -> StateReadResult<PastObjectRead> {
273 Ok(self.get_past_object_read(object_id, version)?)
274 }
275
276 fn load_epoch_store_one_call_per_task(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
277 self.load_epoch_store_one_call_per_task()
278 }
279
280 fn get_dynamic_fields(
281 &self,
282 owner: ObjectID,
283 cursor: Option<ObjectID>,
284 limit: usize,
285 ) -> StateReadResult<Vec<(ObjectID, DynamicFieldInfo)>> {
286 Ok(self.get_dynamic_fields(owner, cursor, limit)?)
287 }
288
289 fn get_cache_reader(&self) -> &Arc<dyn ObjectCacheRead> {
290 self.get_object_cache_reader()
291 }
292
293 fn get_object_store(&self) -> &Arc<dyn ObjectStore + Send + Sync> {
294 self.get_object_store()
295 }
296
297 fn get_backing_package_store(&self) -> &Arc<dyn BackingPackageStore + Send + Sync> {
298 self.get_backing_package_store()
299 }
300
301 fn get_owner_objects(
302 &self,
303 owner: IotaAddress,
304 cursor: Option<ObjectID>,
305 filter: Option<IotaObjectDataFilter>,
306 ) -> StateReadResult<Vec<ObjectInfo>> {
307 Ok(self
308 .get_owner_objects_iterator(owner, cursor, filter)?
309 .collect())
310 }
311
312 async fn query_events(
313 &self,
314 kv_store: &Arc<TransactionKeyValueStore>,
315 query: EventFilter,
316 cursor: Option<EventID>,
318 limit: usize,
319 descending: bool,
320 ) -> StateReadResult<Vec<IotaEvent>> {
321 Ok(self
322 .query_events(kv_store, query, cursor, limit, descending)
323 .await?)
324 }
325
326 async fn dry_exec_transaction(
327 &self,
328 transaction: TransactionData,
329 transaction_digest: TransactionDigest,
330 ) -> StateReadResult<(
331 DryRunTransactionBlockResponse,
332 BTreeMap<ObjectID, (ObjectRef, Object, WriteKind)>,
333 TransactionEffects,
334 Option<ObjectID>,
335 )> {
336 Ok(self
337 .dry_exec_transaction(transaction, transaction_digest)
338 .await?)
339 }
340
341 async fn dev_inspect_transaction_block(
342 &self,
343 sender: IotaAddress,
344 transaction_kind: TransactionKind,
345 gas_price: Option<u64>,
346 gas_budget: Option<u64>,
347 gas_sponsor: Option<IotaAddress>,
348 gas_objects: Option<Vec<ObjectRef>>,
349 show_raw_txn_data_and_effects: Option<bool>,
350 skip_checks: Option<bool>,
351 ) -> StateReadResult<DevInspectResults> {
352 Ok(self
353 .dev_inspect_transaction_block(
354 sender,
355 transaction_kind,
356 gas_price,
357 gas_budget,
358 gas_sponsor,
359 gas_objects,
360 show_raw_txn_data_and_effects,
361 skip_checks,
362 )
363 .await?)
364 }
365
366 fn get_subscription_handler(&self) -> Arc<SubscriptionHandler> {
367 self.subscription_handler.clone()
368 }
369
370 fn get_owner_objects_with_limit(
371 &self,
372 owner: IotaAddress,
373 cursor: Option<ObjectID>,
374 limit: usize,
375 filter: Option<IotaObjectDataFilter>,
376 ) -> StateReadResult<Vec<ObjectInfo>> {
377 Ok(self.get_owner_objects(owner, cursor, limit, filter)?)
378 }
379
380 async fn get_transactions(
381 &self,
382 kv_store: &Arc<TransactionKeyValueStore>,
383 filter: Option<TransactionFilter>,
384 cursor: Option<TransactionDigest>,
385 limit: Option<usize>,
386 reverse: bool,
387 ) -> StateReadResult<Vec<TransactionDigest>> {
388 Ok(self
389 .get_transactions(kv_store, filter, cursor, limit, reverse)
390 .await?)
391 }
392
393 fn get_dynamic_field_object_id(
394 &self,
396 owner: ObjectID,
397 name_type: TypeTag,
398 name_bcs_bytes: &[u8],
399 ) -> StateReadResult<Option<ObjectID>> {
400 Ok(self.get_dynamic_field_object_id(owner, name_type, name_bcs_bytes)?)
401 }
402
403 async fn get_staked_iota(&self, owner: IotaAddress) -> StateReadResult<Vec<StakedIota>> {
404 Ok(self
405 .get_move_objects(owner, MoveObjectType::staked_iota())
406 .await?)
407 }
408
409 async fn get_timelocked_staked_iota(
410 &self,
411 owner: IotaAddress,
412 ) -> StateReadResult<Vec<TimelockedStakedIota>> {
413 Ok(self
414 .get_move_objects(owner, MoveObjectType::timelocked_staked_iota())
415 .await?)
416 }
417
418 fn get_system_state(&self) -> StateReadResult<IotaSystemState> {
419 Ok(self
420 .get_cache_reader()
421 .get_iota_system_state_object_unsafe()?)
422 }
423
424 fn get_or_latest_committee(&self, epoch: Option<BigInt<u64>>) -> StateReadResult<Committee> {
425 Ok(self
426 .committee_store()
427 .get_or_latest_committee(epoch.map(|e| *e))?)
428 }
429
430 fn get_bridge(&self) -> StateReadResult<Bridge> {
431 self.get_cache_reader()
432 .get_bridge_object_unsafe()
433 .map_err(|err| err.into())
434 }
435
436 fn find_publish_txn_digest(&self, package_id: ObjectID) -> StateReadResult<TransactionDigest> {
437 Ok(self.find_publish_txn_digest(package_id)?)
438 }
439 fn get_owned_coins(
440 &self,
441 owner: IotaAddress,
442 cursor: (String, ObjectID),
443 limit: usize,
444 one_coin_type_only: bool,
445 ) -> StateReadResult<Vec<IotaCoin>> {
446 Ok(self
447 .get_owned_coins_iterator_with_cursor(owner, cursor, limit, one_coin_type_only)?
448 .map(|(coin_type, coin_object_id, coin)| IotaCoin {
449 coin_type,
450 coin_object_id,
451 version: coin.version,
452 digest: coin.digest,
453 balance: coin.balance,
454 previous_transaction: coin.previous_transaction,
455 })
456 .collect::<Vec<_>>())
457 }
458
459 async fn get_executed_transaction_and_effects(
460 &self,
461 digest: TransactionDigest,
462 kv_store: Arc<TransactionKeyValueStore>,
463 ) -> StateReadResult<(Transaction, TransactionEffects)> {
464 Ok(self
465 .get_executed_transaction_and_effects(digest, kv_store)
466 .await?)
467 }
468
469 async fn get_balance(
470 &self,
471 owner: IotaAddress,
472 coin_type: TypeTag,
473 ) -> StateReadResult<TotalBalance> {
474 Ok(self
475 .indexes
476 .as_ref()
477 .ok_or(IotaError::IndexStoreNotAvailable)?
478 .get_balance(owner, coin_type)
479 .await?)
480 }
481
482 async fn get_all_balance(
483 &self,
484 owner: IotaAddress,
485 ) -> StateReadResult<Arc<HashMap<TypeTag, TotalBalance>>> {
486 Ok(self
487 .indexes
488 .as_ref()
489 .ok_or(IotaError::IndexStoreNotAvailable)?
490 .get_all_balance(owner)
491 .await?)
492 }
493
494 fn get_verified_checkpoint_by_sequence_number(
495 &self,
496 sequence_number: CheckpointSequenceNumber,
497 ) -> StateReadResult<VerifiedCheckpoint> {
498 Ok(self.get_verified_checkpoint_by_sequence_number(sequence_number)?)
499 }
500
501 fn get_checkpoint_contents(
502 &self,
503 digest: CheckpointContentsDigest,
504 ) -> StateReadResult<CheckpointContents> {
505 Ok(self.get_checkpoint_contents(digest)?)
506 }
507
508 fn get_verified_checkpoint_summary_by_digest(
509 &self,
510 digest: CheckpointDigest,
511 ) -> StateReadResult<VerifiedCheckpoint> {
512 Ok(self.get_verified_checkpoint_summary_by_digest(digest)?)
513 }
514
515 fn multi_get_transactions_perpetual_checkpoints(
516 &self,
517 digests: &[TransactionDigest],
518 ) -> StateReadResult<Vec<Option<(EpochId, CheckpointSequenceNumber)>>> {
519 Ok(self
520 .get_checkpoint_cache()
521 .multi_get_transactions_perpetual_checkpoints(digests)?)
522 }
523
524 fn get_transaction_perpetual_checkpoint(
525 &self,
526 digest: &TransactionDigest,
527 ) -> StateReadResult<Option<(EpochId, CheckpointSequenceNumber)>> {
528 Ok(self
529 .get_checkpoint_cache()
530 .get_transaction_perpetual_checkpoint(digest)?)
531 }
532
533 fn multi_get_checkpoint_by_sequence_number(
534 &self,
535 sequence_numbers: &[CheckpointSequenceNumber],
536 ) -> StateReadResult<Vec<Option<VerifiedCheckpoint>>> {
537 Ok(self.multi_get_checkpoint_by_sequence_number(sequence_numbers)?)
538 }
539
540 fn get_total_transaction_blocks(&self) -> StateReadResult<u64> {
541 Ok(self.get_total_transaction_blocks()?)
542 }
543
544 fn get_checkpoint_by_sequence_number(
545 &self,
546 sequence_number: CheckpointSequenceNumber,
547 ) -> StateReadResult<Option<VerifiedCheckpoint>> {
548 Ok(self.get_checkpoint_by_sequence_number(sequence_number)?)
549 }
550
551 fn get_latest_checkpoint_sequence_number(&self) -> StateReadResult<CheckpointSequenceNumber> {
552 Ok(self.get_latest_checkpoint_sequence_number()?)
553 }
554
555 fn get_chain_identifier(&self) -> StateReadResult<ChainIdentifier> {
556 Ok(self
557 .get_chain_identifier()
558 .ok_or(anyhow!("Chain identifier not found"))?)
559 }
560}
561
562#[async_trait]
566impl<S: ?Sized + StateRead> ObjectProvider for Arc<S> {
567 type Error = StateReadError;
568
569 async fn get_object(
570 &self,
571 id: &ObjectID,
572 version: &SequenceNumber,
573 ) -> Result<Object, Self::Error> {
574 Ok(self.get_past_object_read(id, *version)?.into_object()?)
575 }
576
577 async fn find_object_lt_or_eq_version(
578 &self,
579 id: &ObjectID,
580 version: &SequenceNumber,
581 ) -> Result<Option<Object>, Self::Error> {
582 Ok(self
583 .get_cache_reader()
584 .find_object_lt_or_eq_version(*id, *version)?)
585 }
586}
587
588#[async_trait]
589impl<S: ?Sized + StateRead> ObjectProvider for (Arc<S>, Arc<TransactionKeyValueStore>) {
590 type Error = StateReadError;
591
592 async fn get_object(
593 &self,
594 id: &ObjectID,
595 version: &SequenceNumber,
596 ) -> Result<Object, Self::Error> {
597 let object_read = self.0.get_past_object_read(id, *version)?;
598 match object_read {
599 PastObjectRead::ObjectNotExists(_) | PastObjectRead::VersionNotFound(..) => {
600 match self.1.get_object(*id, *version).await? {
601 Some(object) => Ok(object),
602 None => Ok(PastObjectRead::VersionNotFound(*id, *version).into_object()?),
603 }
604 }
605 _ => Ok(object_read.into_object()?),
606 }
607 }
608
609 async fn find_object_lt_or_eq_version(
610 &self,
611 id: &ObjectID,
612 version: &SequenceNumber,
613 ) -> Result<Option<Object>, Self::Error> {
614 Ok(self
615 .0
616 .get_cache_reader()
617 .find_object_lt_or_eq_version(*id, *version)?)
618 }
619}
620
621#[derive(Debug, Error)]
622pub enum StateReadInternalError {
623 #[error(transparent)]
624 Iota(#[from] IotaError),
625 #[error(transparent)]
626 Join(#[from] JoinError),
627 #[error(transparent)]
628 Anyhow(#[from] anyhow::Error),
629}
630
631#[derive(Debug, Error)]
632pub enum StateReadClientError {
633 #[error(transparent)]
634 Iota(#[from] IotaError),
635 #[error(transparent)]
636 UserInput(#[from] UserInputError),
637}
638
639#[derive(Debug, Error)]
645pub enum StateReadError {
646 #[error(transparent)]
648 Internal(#[from] StateReadInternalError),
649
650 #[error(transparent)]
652 Client(#[from] StateReadClientError),
653}
654
655impl From<IotaError> for StateReadError {
656 fn from(e: IotaError) -> Self {
657 match e {
658 IotaError::IndexStoreNotAvailable
659 | IotaError::TransactionNotFound { .. }
660 | IotaError::UnsupportedFeature { .. }
661 | IotaError::UserInput { .. }
662 | IotaError::WrongMessageVersion { .. } => StateReadError::Client(e.into()),
663 _ => StateReadError::Internal(e.into()),
664 }
665 }
666}
667
668impl From<UserInputError> for StateReadError {
669 fn from(e: UserInputError) -> Self {
670 StateReadError::Client(e.into())
671 }
672}
673
674impl From<JoinError> for StateReadError {
675 fn from(e: JoinError) -> Self {
676 StateReadError::Internal(e.into())
677 }
678}
679
680impl From<anyhow::Error> for StateReadError {
681 fn from(e: anyhow::Error) -> Self {
682 StateReadError::Internal(e.into())
683 }
684}