1use std::{collections::HashMap, sync::Arc, time::Duration};
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use backoff::{ExponentialBackoff, future::retry};
10use futures::future::join_all;
11use indexmap::map::IndexMap;
12use iota_core::authority::AuthorityState;
13use iota_json_rpc_api::{
14 JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
15 ReadApiServer, validate_limit,
16};
17use iota_json_rpc_types::{
18 BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
19 IotaEvent, IotaGetPastObjectRequest, IotaMoveStruct, IotaMoveValue, IotaMoveVariant,
20 IotaObjectData, IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
21 IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
22 IotaTransactionBlockResponseOptions, ObjectChange, ProtocolConfigResponse,
23};
24use iota_metrics::{add_server_timing, spawn_monitored_task};
25use iota_open_rpc::Module;
26use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
27use iota_storage::key_value_store::TransactionKeyValueStore;
28use iota_types::{
29 base_types::{ObjectID, SequenceNumber, TransactionDigest},
30 collection_types::VecMap,
31 crypto::AggregateAuthoritySignature,
32 display::DisplayVersionUpdatedEvent,
33 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
34 error::{IotaError, IotaObjectResponseError},
35 iota_serde::BigInt,
36 messages_checkpoint::{
37 CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
38 },
39 object::{Object, ObjectRead, PastObjectRead},
40 transaction::{Transaction, TransactionDataAPI},
41};
42use itertools::Itertools;
43use jsonrpsee::{RpcModule, core::RpcResult};
44use move_bytecode_utils::module_cache::GetModule;
45use move_core_types::{
46 annotated_value::{MoveStruct, MoveStructLayout, MoveValue},
47 language_storage::StructTag,
48};
49use tap::TapFallible;
50use tracing::{debug, error, instrument, trace, warn};
51
52use crate::{
53 IotaRpcModule, ObjectProvider, ObjectProviderCache,
54 authority_state::{StateRead, StateReadError, StateReadResult},
55 error::{Error, IotaRpcInputError, RpcInterimResult},
56 get_balance_changes_from_effect, get_object_changes,
57 logger::FutureWithTracing as _,
58};
59
60const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
61
62#[derive(Clone)]
65pub struct ReadApi {
66 pub state: Arc<dyn StateRead>,
67 pub transaction_kv_store: Arc<TransactionKeyValueStore>,
68 pub metrics: Arc<JsonRpcMetrics>,
69}
70
71#[derive(Default)]
75struct IntermediateTransactionResponse {
76 digest: TransactionDigest,
77 transaction: Option<Transaction>,
78 effects: Option<TransactionEffects>,
79 events: Option<IotaTransactionBlockEvents>,
80 checkpoint_seq: Option<CheckpointSequenceNumber>,
81 balance_changes: Option<Vec<BalanceChange>>,
82 object_changes: Option<Vec<ObjectChange>>,
83 timestamp: Option<CheckpointTimestamp>,
84 errors: Vec<String>,
85}
86
87impl IntermediateTransactionResponse {
88 pub fn new(digest: TransactionDigest) -> Self {
89 Self {
90 digest,
91 ..Default::default()
92 }
93 }
94
95 pub fn transaction(&self) -> &Option<Transaction> {
96 &self.transaction
97 }
98}
99
100impl ReadApi {
101 pub fn new(
102 state: Arc<AuthorityState>,
103 transaction_kv_store: Arc<TransactionKeyValueStore>,
104 metrics: Arc<JsonRpcMetrics>,
105 ) -> Self {
106 Self {
107 state,
108 transaction_kv_store,
109 metrics,
110 }
111 }
112
113 async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
114 Ok(match id {
115 CheckpointId::SequenceNumber(seq) => {
116 let verified_summary = self
117 .transaction_kv_store
118 .get_checkpoint_summary(seq)
119 .await?;
120 let content = self
121 .transaction_kv_store
122 .get_checkpoint_contents(verified_summary.sequence_number)
123 .await?;
124 let signature = verified_summary.auth_sig().signature.clone();
125 (verified_summary.into_data(), content, signature).into()
126 }
127 CheckpointId::Digest(digest) => {
128 let verified_summary = self
129 .transaction_kv_store
130 .get_checkpoint_summary_by_digest(digest)
131 .await?;
132 let content = self
133 .transaction_kv_store
134 .get_checkpoint_contents(verified_summary.sequence_number)
135 .await?;
136 let signature = verified_summary.auth_sig().signature.clone();
137 (verified_summary.into_data(), content, signature).into()
138 }
139 })
140 }
141
142 pub async fn get_checkpoints_internal(
143 state: Arc<dyn StateRead>,
144 transaction_kv_store: Arc<TransactionKeyValueStore>,
145 cursor: Option<CheckpointSequenceNumber>,
147 limit: u64,
148 descending_order: bool,
149 ) -> StateReadResult<Vec<Checkpoint>> {
150 let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
151 let checkpoint_numbers =
152 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
153
154 let verified_checkpoints = transaction_kv_store
155 .multi_get_checkpoints_summaries(&checkpoint_numbers)
156 .await?;
157
158 let checkpoint_summaries_and_signatures: Vec<(
159 CheckpointSummary,
160 AggregateAuthoritySignature,
161 )> = verified_checkpoints
162 .into_iter()
163 .flatten()
164 .map(|check| {
165 (
166 check.clone().into_summary_and_sequence().1,
167 check.get_validator_signature(),
168 )
169 })
170 .collect();
171
172 let checkpoint_contents = transaction_kv_store
173 .multi_get_checkpoints_contents(&checkpoint_numbers)
174 .await?;
175 let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
176
177 let mut checkpoints: Vec<Checkpoint> = vec![];
178
179 for (summary_and_sig, content) in checkpoint_summaries_and_signatures
180 .into_iter()
181 .zip(contents.into_iter())
182 {
183 checkpoints.push(Checkpoint::from((
184 summary_and_sig.0,
185 content,
186 summary_and_sig.1,
187 )));
188 }
189
190 Ok(checkpoints)
191 }
192
193 async fn multi_get_transaction_blocks_internal(
194 &self,
195 digests: Vec<TransactionDigest>,
196 opts: Option<IotaTransactionBlockResponseOptions>,
197 ) -> Result<Vec<IotaTransactionBlockResponse>, Error> {
198 trace!("start");
199
200 let num_digests = digests.len();
201 if num_digests > *QUERY_MAX_RESULT_LIMIT {
202 Err(IotaRpcInputError::SizeLimitExceeded(
203 QUERY_MAX_RESULT_LIMIT.to_string(),
204 ))?
205 }
206 self.metrics
207 .get_tx_blocks_limit
208 .observe(digests.len() as f64);
209
210 let opts = opts.unwrap_or_default();
211
212 let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
214 IndexMap::from_iter(
215 digests
216 .iter()
217 .map(|k| (k, IntermediateTransactionResponse::new(*k))),
218 );
219 if temp_response.len() < num_digests {
220 Err(IotaRpcInputError::ContainsDuplicates)?
221 }
222
223 if opts.require_input() {
224 trace!("getting input");
225 let digests_clone = digests.clone();
226 let transactions =
227 self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
228 |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
229 )?;
230
231 for ((_digest, cache_entry), txn) in
232 temp_response.iter_mut().zip(transactions.into_iter())
233 {
234 cache_entry.transaction = txn;
235 }
236 }
237
238 if opts.require_effects() {
240 trace!("getting effects");
241 let digests_clone = digests.clone();
242 let effects_list = self.transaction_kv_store
243 .multi_get_fx_by_tx_digest(&digests_clone)
244 .await
245 .tap_err(
246 |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
247 )?;
248 for ((_digest, cache_entry), e) in
249 temp_response.iter_mut().zip(effects_list.into_iter())
250 {
251 cache_entry.effects = e;
252 }
253 }
254
255 trace!("getting checkpoint sequence numbers");
256 let checkpoint_seq_list = self
257 .transaction_kv_store
258 .multi_get_transactions_perpetual_checkpoints(&digests)
259 .await
260 .tap_err(
261 |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
262 for ((_digest, cache_entry), seq) in temp_response
263 .iter_mut()
264 .zip(checkpoint_seq_list.into_iter())
265 {
266 cache_entry.checkpoint_seq = seq;
267 }
268
269 let unique_checkpoint_numbers = temp_response
270 .values()
271 .filter_map(|cache_entry| cache_entry.checkpoint_seq)
272 .unique()
275 .collect::<Vec<CheckpointSequenceNumber>>();
276
277 trace!("getting checkpoint summaries");
279 let timestamps = self
280 .transaction_kv_store
281 .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
282 .await
283 .map_err(|e| {
284 Error::Unexpected(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
285 })?
286 .into_iter()
287 .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
288
289 let checkpoint_to_timestamp = unique_checkpoint_numbers
291 .into_iter()
292 .zip(timestamps)
293 .collect::<HashMap<_, _>>();
294
295 for (_, cache_entry) in temp_response.iter_mut() {
297 if cache_entry.checkpoint_seq.is_some() {
298 cache_entry.timestamp = *checkpoint_to_timestamp
300 .get(cache_entry.checkpoint_seq.as_ref().unwrap())
301 .unwrap();
304 }
305 }
306
307 if opts.show_events {
308 trace!("getting events");
309 let mut non_empty_digests = vec![];
310 for cache_entry in temp_response.values() {
311 if let Some(effects) = &cache_entry.effects {
312 if effects.events_digest().is_some() {
313 non_empty_digests.push(cache_entry.digest);
314 }
315 }
316 }
317 let backoff = ExponentialBackoff {
319 max_elapsed_time: Some(Duration::from_secs(3)),
320 multiplier: 1.0,
321 ..ExponentialBackoff::default()
322 };
323 let mut events = retry(backoff, || async {
324 match self
325 .transaction_kv_store
326 .multi_get_events_by_tx_digests(&non_empty_digests)
327 .await
328 {
329 Ok(events) if !events.contains(&None) => Ok(events),
332 Ok(_) => Err(backoff::Error::transient(Error::Unexpected(
333 "events not found, transaction execution may be incomplete.".into(),
334 ))),
335 Err(e) => Err(backoff::Error::permanent(Error::Unexpected(format!(
336 "failed to call multi_get_events: {e:?}"
337 )))),
338 }
339 })
340 .await
341 .map_err(|e| {
342 Error::Unexpected(format!(
343 "retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
344 ))
345 })?
346 .into_iter();
347
348 for (_, cache_entry) in temp_response.iter_mut() {
350 let transaction_digest = cache_entry.digest;
351 if let Some(events_digest) =
352 cache_entry.effects.as_ref().and_then(|e| e.events_digest())
353 {
354 match events.next() {
355 Some(Some(ev)) => {
356 cache_entry.events =
357 Some(to_iota_transaction_events(self, cache_entry.digest, ev)?)
358 }
359 None | Some(None) => {
360 error!(
361 "failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
362 );
363 cache_entry.errors.push(format!(
364 "failed to fetch events with event digest {events_digest:?}",
365 ))
366 }
367 }
368 } else {
369 cache_entry.events = Some(IotaTransactionBlockEvents::default());
372 }
373 }
374 }
375
376 let object_cache =
377 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
378 if opts.show_balance_changes {
379 trace!("getting balance changes");
380
381 let mut results = vec![];
382 for resp in temp_response.values() {
383 let input_objects = if let Some(tx) = resp.transaction() {
384 tx.data()
385 .inner()
386 .intent_message
387 .value
388 .input_objects()
389 .unwrap_or_default()
390 } else {
391 Vec::new()
393 };
394 results.push(get_balance_changes_from_effect(
395 &object_cache,
396 resp.effects.as_ref().ok_or_else(|| {
397 IotaRpcInputError::GenericNotFound(
398 "unable to derive balance changes because effect is empty".to_string(),
399 )
400 })?,
401 input_objects,
402 None,
403 ));
404 }
405 let results = join_all(results).await;
406 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
407 match result {
408 Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
409 Err(e) => entry
410 .1
411 .errors
412 .push(format!("Failed to fetch balance changes {e:?}")),
413 }
414 }
415 }
416
417 if opts.show_object_changes {
418 trace!("getting object changes");
419
420 let mut results = vec![];
421 for resp in temp_response.values() {
422 let effects = resp.effects.as_ref().ok_or_else(|| {
423 IotaRpcInputError::GenericNotFound(
424 "unable to derive object changes because effect is empty".to_string(),
425 )
426 })?;
427
428 results.push(get_object_changes(
429 &object_cache,
430 resp.transaction
431 .as_ref()
432 .ok_or_else(|| {
433 IotaRpcInputError::GenericNotFound(
434 "unable to derive object changes because transaction is empty"
435 .to_string(),
436 )
437 })?
438 .data()
439 .intent_message()
440 .value
441 .sender(),
442 effects.modified_at_versions(),
443 effects.all_changed_objects(),
444 effects.all_removed_objects(),
445 ));
446 }
447 let results = join_all(results).await;
448 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
449 match result {
450 Ok(object_changes) => entry.1.object_changes = Some(object_changes),
451 Err(e) => entry
452 .1
453 .errors
454 .push(format!("Failed to fetch object changes {e:?}")),
455 }
456 }
457 }
458
459 let epoch_store = self.state.load_epoch_store_one_call_per_task();
460
461 let converted_tx_block_resps = temp_response
462 .into_iter()
463 .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
464 .collect::<Result<Vec<_>, _>>()?;
465
466 self.metrics
467 .get_tx_blocks_result_size
468 .observe(converted_tx_block_resps.len() as f64);
469 self.metrics
470 .get_tx_blocks_result_size_total
471 .inc_by(converted_tx_block_resps.len() as u64);
472
473 trace!("done");
474
475 Ok(converted_tx_block_resps)
476 }
477}
478
479#[async_trait]
480impl ReadApiServer for ReadApi {
481 #[instrument(skip(self))]
482 async fn get_object(
483 &self,
484 object_id: ObjectID,
485 options: Option<IotaObjectDataOptions>,
486 ) -> RpcResult<IotaObjectResponse> {
487 async move {
488 let state = self.state.clone();
489 let object_read = spawn_monitored_task!(async move {
490 state.get_object_read(&object_id).map_err(|e| {
491 warn!(?object_id, "Failed to get object: {:?}", e);
492 Error::from(e)
493 })
494 })
495 .await
496 .map_err(Error::from)??;
497 let options = options.unwrap_or_default();
498
499 match object_read {
500 ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
501 IotaObjectResponseError::NotExists { object_id: id },
502 )),
503 ObjectRead::Exists(object_ref, o, layout) => {
504 let mut display_fields = None;
505 if options.show_display {
506 match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
507 .await
508 {
509 Ok(rendered_fields) => display_fields = Some(rendered_fields),
510 Err(e) => {
511 return Ok(IotaObjectResponse::new(
512 Some(IotaObjectData::new(
513 object_ref, o, layout, options, None,
514 )?),
515 Some(IotaObjectResponseError::Display {
516 error: e.to_string(),
517 }),
518 ));
519 }
520 }
521 }
522 Ok(IotaObjectResponse::new_with_data(IotaObjectData::new(
523 object_ref,
524 o,
525 layout,
526 options,
527 display_fields,
528 )?))
529 }
530 ObjectRead::Deleted((object_id, version, digest)) => Ok(
531 IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
532 object_id,
533 version,
534 digest,
535 }),
536 ),
537 }
538 }
539 .trace()
540 .await
541 }
542
543 #[instrument(skip(self))]
544 async fn multi_get_objects(
545 &self,
546 object_ids: Vec<ObjectID>,
547 options: Option<IotaObjectDataOptions>,
548 ) -> RpcResult<Vec<IotaObjectResponse>> {
549 async move {
550 if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
551 self.metrics
552 .get_objects_limit
553 .observe(object_ids.len() as f64);
554 let mut futures = vec![];
555 for object_id in object_ids {
556 futures.push(self.get_object(object_id, options.clone()));
557 }
558 let results = join_all(futures).await;
559
560 let objects_result: Result<Vec<IotaObjectResponse>, String> = results
561 .into_iter()
562 .map(|result| match result {
563 Ok(response) => Ok(response),
564 Err(error) => {
565 error!("Failed to fetch object with error: {error:?}");
566 Err(format!("Error: {}", error))
567 }
568 })
569 .collect();
570
571 let objects = objects_result.map_err(|err| {
572 Error::Unexpected(format!("Failed to fetch objects with error: {}", err))
573 })?;
574
575 self.metrics
576 .get_objects_result_size
577 .observe(objects.len() as f64);
578 self.metrics
579 .get_objects_result_size_total
580 .inc_by(objects.len() as u64);
581 Ok(objects)
582 } else {
583 Err(IotaRpcInputError::SizeLimitExceeded(
584 QUERY_MAX_RESULT_LIMIT.to_string(),
585 ))?
586 }
587 }
588 .trace()
589 .await
590 }
591
592 #[instrument(skip(self))]
593 async fn try_get_past_object(
594 &self,
595 object_id: ObjectID,
596 version: SequenceNumber,
597 options: Option<IotaObjectDataOptions>,
598 ) -> RpcResult<IotaPastObjectResponse> {
599 async move {
600 let state = self.state.clone();
601 let past_read = spawn_monitored_task!(async move {
602 state.get_past_object_read(&object_id, version)
603 .map_err(|e| {
604 error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
605 Error::from(e)
606 })}).await.map_err(Error::from)??;
607 let options = options.unwrap_or_default();
608 match past_read {
609 PastObjectRead::ObjectNotExists(id) => {
610 Ok(IotaPastObjectResponse::ObjectNotExists(id))
611 }
612 PastObjectRead::VersionFound(object_ref, o, layout) => {
613 let display_fields = if options.show_display {
614 Some(
616 get_display_fields(self, &self.transaction_kv_store, &o, &layout)
617 .await
618 .map_err(|e| {
619 Error::Unexpected(format!(
620 "Unable to render object at version {version}: {e}"
621 ))
622 })?,
623 )
624 } else {
625 None
626 };
627 Ok(IotaPastObjectResponse::VersionFound(
628 IotaObjectData::new(object_ref, o, layout, options, display_fields)?,
629 ))
630 }
631 PastObjectRead::ObjectDeleted(oref) => {
632 Ok(IotaPastObjectResponse::ObjectDeleted(oref.into()))
633 }
634 PastObjectRead::VersionNotFound(id, seq_num) => {
635 Ok(IotaPastObjectResponse::VersionNotFound(id, seq_num))
636 }
637 PastObjectRead::VersionTooHigh {
638 object_id,
639 asked_version,
640 latest_version,
641 } => Ok(IotaPastObjectResponse::VersionTooHigh {
642 object_id,
643 asked_version,
644 latest_version,
645 }),
646 }
647 }
648 .trace()
649 .await
650 }
651
652 #[instrument(skip(self))]
653 async fn try_get_object_before_version(
654 &self,
655 object_id: ObjectID,
656 version: SequenceNumber,
657 ) -> RpcResult<IotaPastObjectResponse> {
658 let version = self
659 .state
660 .find_object_lt_or_eq_version(&object_id, &version)
661 .await
662 .map_err(Error::from)?
663 .map(|obj| obj.version())
664 .unwrap_or_default();
665 self.try_get_past_object(
666 object_id,
667 version,
668 Some(IotaObjectDataOptions::bcs_lossless()),
669 )
670 .await
671 }
672
673 #[instrument(skip(self))]
674 async fn try_multi_get_past_objects(
675 &self,
676 past_objects: Vec<IotaGetPastObjectRequest>,
677 options: Option<IotaObjectDataOptions>,
678 ) -> RpcResult<Vec<IotaPastObjectResponse>> {
679 async move {
680 if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
681 let mut futures = vec![];
682 for past_object in past_objects {
683 futures.push(self.try_get_past_object(
684 past_object.object_id,
685 past_object.version,
686 options.clone(),
687 ));
688 }
689 let results = join_all(futures).await;
690
691 let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
692 let success = oks.into_iter().filter_map(Result::ok).collect();
693 let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
694 if !errors.is_empty() {
695 let error_string = errors
696 .iter()
697 .map(|e| e.to_string())
698 .collect::<Vec<String>>()
699 .join("; ");
700 Err(anyhow!("{error_string}").into()) } else {
705 Ok(success)
706 }
707 } else {
708 Err(IotaRpcInputError::SizeLimitExceeded(
709 QUERY_MAX_RESULT_LIMIT.to_string(),
710 ))?
711 }
712 }
713 .trace()
714 .await
715 }
716
717 #[instrument(skip(self))]
718 async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
719 async move {
720 Ok(self
721 .state
722 .get_total_transaction_blocks()
723 .map_err(Error::from)?
724 .into()) }
726 .trace()
727 .await
728 }
729
730 #[instrument(skip(self))]
731 async fn get_transaction_block(
732 &self,
733 digest: TransactionDigest,
734 opts: Option<IotaTransactionBlockResponseOptions>,
735 ) -> RpcResult<IotaTransactionBlockResponse> {
736 async move {
737 let opts = opts.unwrap_or_default();
738 let mut temp_response = IntermediateTransactionResponse::new(digest);
739
740 let transaction_kv_store = self.transaction_kv_store.clone();
742 let transaction = spawn_monitored_task!(async move {
743 let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
744 debug!(tx_digest=?digest, "Failed to get transaction: {:?}", err);
745 Error::from(err)
746 });
747 add_server_timing("tx_kv_lookup");
748 ret
749 })
750 .await
751 .map_err(Error::from)??;
752 let input_objects = transaction
753 .data()
754 .inner()
755 .intent_message
756 .value
757 .input_objects()
758 .unwrap_or_default();
759
760 if opts.require_input() {
762 temp_response.transaction = Some(transaction);
763 }
764
765 if opts.require_effects() {
767 let transaction_kv_store = self.transaction_kv_store.clone();
768 temp_response.effects = Some(
769 spawn_monitored_task!(async move {
770 transaction_kv_store
771 .get_fx_by_tx_digest(digest)
772 .await
773 .map_err(|err| {
774 debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
775 Error::from(err)
776 })
777 })
778 .await
779 .map_err(Error::from)??,
780 );
781 }
782
783 temp_response.checkpoint_seq = self
789 .transaction_kv_store
790 .get_transaction_perpetual_checkpoint(digest)
791 .await
792 .map_err(|e| {
793 error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
794 Error::from(e)
795 })?;
796
797 if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
798 let kv_store = self.transaction_kv_store.clone();
799 let checkpoint_seq = *checkpoint_seq;
800 let checkpoint = spawn_monitored_task!(async move {
801 kv_store
802 .get_checkpoint_summary(checkpoint_seq)
804 .await
805 .map_err(|e| {
806 error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
807 Error::from(e)
808 })
809 }).await.map_err(Error::from)??;
810 temp_response.timestamp = Some(checkpoint.timestamp_ms);
812 }
813
814 if opts.show_events && temp_response.effects.is_some() {
815 let transaction_kv_store = self.transaction_kv_store.clone();
816 let events = spawn_monitored_task!(async move {
817 transaction_kv_store
818 .multi_get_events_by_tx_digests(&[digest])
819 .await
820 .map_err(|e| {
821 error!("failed to call get transaction events for transaction: {digest:?} with error {e:?}");
822 Error::from(e)
823 })
824 })
825 .await
826 .map_err(Error::from)??
827 .pop()
828 .flatten();
829 match events {
830 None => temp_response.events = Some(IotaTransactionBlockEvents::default()),
831 Some(events) => match to_iota_transaction_events(self, digest, events) {
832 Ok(e) => temp_response.events = Some(e),
833 Err(e) => temp_response.errors.push(e.to_string()),
834 },
835 }
836 }
837
838 let object_cache =
839 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
840 if opts.show_balance_changes {
841 if let Some(effects) = &temp_response.effects {
842 let balance_changes = get_balance_changes_from_effect(
843 &object_cache,
844 effects,
845 input_objects,
846 None,
847 )
848 .await;
849
850 if let Ok(balance_changes) = balance_changes {
851 temp_response.balance_changes = Some(balance_changes);
852 } else {
853 temp_response.errors.push(format!(
854 "Cannot retrieve balance changes: {}",
855 balance_changes.unwrap_err()
856 ));
857 }
858 }
859 }
860
861 if opts.show_object_changes {
862 if let (Some(effects), Some(input)) =
863 (&temp_response.effects, &temp_response.transaction)
864 {
865 let sender = input.data().intent_message().value.sender();
866 let object_changes = get_object_changes(
867 &object_cache,
868 sender,
869 effects.modified_at_versions(),
870 effects.all_changed_objects(),
871 effects.all_removed_objects(),
872 )
873 .await;
874
875 if let Ok(object_changes) = object_changes {
876 temp_response.object_changes = Some(object_changes);
877 } else {
878 temp_response.errors.push(format!(
879 "Cannot retrieve object changes: {}",
880 object_changes.unwrap_err()
881 ));
882 }
883 }
884 }
885 let epoch_store = self.state.load_epoch_store_one_call_per_task();
886
887 convert_to_response(temp_response, &opts, epoch_store.module_cache())
888 }
889 .trace()
890 .await
891 }
892
893 #[instrument(skip(self))]
894 async fn multi_get_transaction_blocks(
895 &self,
896 digests: Vec<TransactionDigest>,
897 opts: Option<IotaTransactionBlockResponseOptions>,
898 ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
899 async move {
900 let cloned_self = self.clone();
901 spawn_monitored_task!(async move {
902 cloned_self
903 .multi_get_transaction_blocks_internal(digests, opts)
904 .await
905 })
906 .await
907 .map_err(Error::from)?
908 }
909 .trace()
910 .await
911 }
912
913 #[instrument(skip(self))]
914 async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
915 async move {
916 let state = self.state.clone();
917 let transaction_kv_store = self.transaction_kv_store.clone();
918 spawn_monitored_task!(async move{
919 let store = state.load_epoch_store_one_call_per_task();
920 let events = transaction_kv_store
921 .multi_get_events_by_tx_digests(&[transaction_digest])
922 .await
923 .map_err(
924 |e| {
925 error!("failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
926 Error::StateRead(e.into())
927 })?
928 .pop()
929 .flatten();
930 Ok(match events {
931 Some(events) => events
932 .data
933 .into_iter()
934 .enumerate()
935 .map(|(seq, e)| {
936 let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
937 IotaEvent::try_from(e, transaction_digest, seq as u64, None, layout)
938 })
939 .collect::<Result<Vec<_>, _>>()
940 .map_err(Error::Iota)?,
941 None => vec![],
942 })
943 })
944 .await
945 .map_err(Error::from)?
946 }
947 .trace()
948 .await
949 }
950
951 #[instrument(skip(self))]
952 async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
953 async move {
954 Ok(self
955 .state
956 .get_latest_checkpoint_sequence_number()
957 .map_err(|e| {
958 IotaRpcInputError::GenericNotFound(format!(
959 "Latest checkpoint sequence number was not found with error :{e}"
960 ))
961 })?
962 .into())
963 }
964 .trace()
965 .await
966 }
967
968 #[instrument(skip(self))]
969 async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
970 self.get_checkpoint_internal(id).trace().await
971 }
972
973 #[instrument(skip(self))]
974 async fn get_checkpoints(
975 &self,
976 cursor: Option<BigInt<u64>>,
978 limit: Option<usize>,
979 descending_order: bool,
980 ) -> RpcResult<CheckpointPage> {
981 async move {
982 let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
983 .map_err(IotaRpcInputError::from)?;
984
985 let state = self.state.clone();
986 let kv_store = self.transaction_kv_store.clone();
987
988 self.metrics.get_checkpoints_limit.observe(limit as f64);
989
990 let mut data = spawn_monitored_task!(Self::get_checkpoints_internal(
991 state,
992 kv_store,
993 cursor.map(|s| *s),
994 limit as u64 + 1,
995 descending_order,
996 ))
997 .await
998 .map_err(Error::from)?
999 .map_err(Error::from)?;
1000
1001 let has_next_page = data.len() > limit;
1002 data.truncate(limit);
1003
1004 let next_cursor = if has_next_page {
1005 data.last().cloned().map(|d| d.sequence_number.into())
1006 } else {
1007 None
1008 };
1009
1010 self.metrics
1011 .get_checkpoints_result_size
1012 .observe(data.len() as f64);
1013 self.metrics
1014 .get_checkpoints_result_size_total
1015 .inc_by(data.len() as u64);
1016
1017 Ok(CheckpointPage {
1018 data,
1019 next_cursor,
1020 has_next_page,
1021 })
1022 }
1023 .trace()
1024 .await
1025 }
1026
1027 #[instrument(skip(self))]
1028 async fn get_protocol_config(
1029 &self,
1030 version: Option<BigInt<u64>>,
1031 ) -> RpcResult<ProtocolConfigResponse> {
1032 async move {
1033 version
1034 .map(|v| {
1035 ProtocolConfig::get_for_version_if_supported(
1036 (*v).into(),
1037 self.state.get_chain_identifier()?.chain(),
1038 )
1039 .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
1040 ProtocolVersion::MIN.as_u64(),
1041 ProtocolVersion::MAX.as_u64(),
1042 ))
1043 .map_err(Error::from)
1044 })
1045 .unwrap_or(Ok(self
1046 .state
1047 .load_epoch_store_one_call_per_task()
1048 .protocol_config()
1049 .clone()))
1050 .map(ProtocolConfigResponse::from)
1051 }
1052 .trace()
1053 .await
1054 }
1055
1056 #[instrument(skip(self))]
1057 async fn get_chain_identifier(&self) -> RpcResult<String> {
1058 async move {
1059 let ci = self.state.get_chain_identifier()?;
1060 Ok(ci.to_string())
1061 }
1062 .trace()
1063 .await
1064 }
1065}
1066
1067impl IotaRpcModule for ReadApi {
1068 fn rpc(self) -> RpcModule<Self> {
1069 self.into_rpc()
1070 }
1071
1072 fn rpc_doc_module() -> Module {
1073 ReadApiOpenRpc::module_doc()
1074 }
1075}
1076
1077fn to_iota_transaction_events(
1078 fullnode_api: &ReadApi,
1079 tx_digest: TransactionDigest,
1080 events: TransactionEvents,
1081) -> Result<IotaTransactionBlockEvents, Error> {
1082 let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1083 let backing_package_store = fullnode_api.state.get_backing_package_store();
1084 let mut layout_resolver = epoch_store
1085 .executor()
1086 .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1087 Ok(IotaTransactionBlockEvents::try_from(
1088 events,
1089 tx_digest,
1090 None,
1091 layout_resolver.as_mut(),
1092 )?)
1093}
1094
1095#[derive(Debug, thiserror::Error)]
1096pub enum ObjectDisplayError {
1097 #[error("Not a move struct")]
1098 NotMoveStruct,
1099
1100 #[error("Failed to extract layout")]
1101 Layout,
1102
1103 #[error("Failed to extract Move object")]
1104 MoveObject,
1105
1106 #[error(transparent)]
1107 Deserialization(#[from] IotaError),
1108
1109 #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1110 Bcs(#[from] bcs::Error),
1111
1112 #[error(transparent)]
1113 StateRead(#[from] StateReadError),
1114}
1115
1116async fn get_display_fields(
1117 fullnode_api: &ReadApi,
1118 kv_store: &Arc<TransactionKeyValueStore>,
1119 original_object: &Object,
1120 original_layout: &Option<MoveStructLayout>,
1121) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1122 let Some((object_type, layout)) = get_object_type_and_struct(original_object, original_layout)?
1123 else {
1124 return Ok(DisplayFieldsResponse {
1125 data: None,
1126 error: None,
1127 });
1128 };
1129 if let Some(display_object) =
1130 get_display_object_by_type(kv_store, fullnode_api, &object_type).await?
1131 {
1132 return get_rendered_fields(display_object.fields, &layout);
1133 }
1134 Ok(DisplayFieldsResponse {
1135 data: None,
1136 error: None,
1137 })
1138}
1139
1140async fn get_display_object_by_type(
1141 kv_store: &Arc<TransactionKeyValueStore>,
1142 fullnode_api: &ReadApi,
1143 object_type: &StructTag,
1144 ) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1146 let mut events = fullnode_api
1147 .state
1148 .query_events(
1149 kv_store,
1150 EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1151 None,
1152 1,
1153 true,
1154 )
1155 .await?;
1156
1157 if let Some(event) = events.pop() {
1160 let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1161 Ok(Some(display))
1162 } else {
1163 Ok(None)
1164 }
1165}
1166
1167pub fn get_object_type_and_struct(
1168 o: &Object,
1169 layout: &Option<MoveStructLayout>,
1170) -> Result<Option<(StructTag, MoveStruct)>, ObjectDisplayError> {
1171 if let Some(object_type) = o.type_() {
1172 let move_struct = get_move_struct(o, layout)?;
1173 Ok(Some((object_type.clone().into(), move_struct)))
1174 } else {
1175 Ok(None)
1176 }
1177}
1178
1179fn get_move_struct(
1180 o: &Object,
1181 layout: &Option<MoveStructLayout>,
1182) -> Result<MoveStruct, ObjectDisplayError> {
1183 let layout = layout.as_ref().ok_or_else(|| ObjectDisplayError::Layout)?;
1184 Ok(o.data
1185 .try_as_move()
1186 .ok_or_else(|| ObjectDisplayError::MoveObject)?
1187 .to_move_struct(layout)?)
1188}
1189
1190pub fn get_rendered_fields(
1191 fields: VecMap<String, String>,
1192 move_struct: &MoveStruct,
1193) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1194 let iota_move_value: IotaMoveValue = MoveValue::Struct(move_struct.clone()).into();
1195 if let IotaMoveValue::Struct(move_struct) = iota_move_value {
1196 let fields =
1197 fields
1198 .contents
1199 .iter()
1200 .map(|entry| match parse_template(&entry.value, &move_struct) {
1201 Ok(value) => Ok((entry.key.clone(), value)),
1202 Err(e) => Err(e),
1203 });
1204 let (oks, errs): (Vec<_>, Vec<_>) = fields.partition(Result::is_ok);
1205 let success = oks.into_iter().filter_map(Result::ok).collect();
1206 let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
1207 let error_string = errors
1208 .iter()
1209 .map(|e| e.to_string())
1210 .collect::<Vec<String>>()
1211 .join("; ");
1212 let error = if !error_string.is_empty() {
1213 Some(IotaObjectResponseError::Display {
1214 error: anyhow!("{error_string}").to_string(),
1215 })
1216 } else {
1217 None
1218 };
1219
1220 return Ok(DisplayFieldsResponse {
1221 data: Some(success),
1222 error,
1223 });
1224 }
1225 Err(ObjectDisplayError::NotMoveStruct)?
1226}
1227
1228fn parse_template(template: &str, move_struct: &IotaMoveStruct) -> Result<String, Error> {
1229 let mut output = template.to_string();
1230 let mut var_name = String::new();
1231 let mut in_braces = false;
1232 let mut escaped = false;
1233
1234 for ch in template.chars() {
1235 match ch {
1236 '\\' => {
1237 escaped = true;
1238 continue;
1239 }
1240 '{' if !escaped => {
1241 in_braces = true;
1242 var_name.clear();
1243 }
1244 '}' if !escaped => {
1245 in_braces = false;
1246 let value = get_value_from_move_struct(move_struct, &var_name)?;
1247 output = output.replace(&format!("{{{}}}", var_name), &value.to_string());
1248 }
1249 _ if !escaped => {
1250 if in_braces {
1251 var_name.push(ch);
1252 }
1253 }
1254 _ => {}
1255 }
1256 escaped = false;
1257 }
1258
1259 Ok(output.replace('\\', ""))
1260}
1261
1262fn get_value_from_move_struct(
1263 move_struct: &IotaMoveStruct,
1264 var_name: &str,
1265) -> Result<String, Error> {
1266 let parts: Vec<&str> = var_name.split('.').collect();
1267 if parts.is_empty() {
1268 Err(anyhow!("Display template value cannot be empty"))?;
1269 }
1270 if parts.len() > MAX_DISPLAY_NESTED_LEVEL {
1271 Err(anyhow!(
1272 "Display template value nested depth cannot exist {}",
1273 MAX_DISPLAY_NESTED_LEVEL
1274 ))?;
1275 }
1276 let mut current_value = &IotaMoveValue::Struct(move_struct.clone());
1277 for part in parts {
1279 match current_value {
1280 IotaMoveValue::Struct(move_struct) => {
1281 if let IotaMoveStruct::WithTypes { type_: _, fields }
1282 | IotaMoveStruct::WithFields(fields) = move_struct
1283 {
1284 if let Some(value) = fields.get(part) {
1285 current_value = value;
1286 } else {
1287 Err(anyhow!("Field value {var_name} cannot be found in struct"))?;
1288 }
1289 } else {
1290 Err(Error::Unexpected(format!(
1291 "Unexpected move struct type for field {var_name}"
1292 )))?;
1293 }
1294 }
1295 IotaMoveValue::Variant(IotaMoveVariant {
1296 fields, variant, ..
1297 }) => {
1298 if let Some(value) = fields.get(part) {
1299 current_value = value;
1300 } else {
1301 Err(anyhow!(
1302 "Field value {var_name} cannot be found in variant {variant}",
1303 ))?
1304 }
1305 }
1306 _ => {
1307 Err(Error::Unexpected(format!(
1308 "Unexpected move value type for field {var_name}"
1309 )))?;
1310 }
1311 }
1312 }
1313
1314 match current_value {
1315 IotaMoveValue::Option(move_option) => match move_option.as_ref() {
1316 Some(move_value) => Ok(move_value.to_string()),
1317 None => Ok("".to_string()),
1318 },
1319 IotaMoveValue::Vector(_) => Err(anyhow!(
1320 "Vector is not supported as a Display value {var_name}"
1321 ))?,
1322
1323 _ => Ok(current_value.to_string()),
1324 }
1325}
1326
1327fn convert_to_response(
1328 cache: IntermediateTransactionResponse,
1329 opts: &IotaTransactionBlockResponseOptions,
1330 module_cache: &impl GetModule,
1331) -> RpcInterimResult<IotaTransactionBlockResponse> {
1332 let mut response = IotaTransactionBlockResponse::new(cache.digest);
1333 response.errors = cache.errors;
1334
1335 if opts.show_raw_input && cache.transaction.is_some() {
1336 let sender_signed_data = cache.transaction.as_ref().unwrap().data();
1337 let raw_tx = bcs::to_bytes(sender_signed_data)
1338 .map_err(|e| anyhow!("Failed to serialize raw transaction with error: {e}"))?; response.raw_transaction = raw_tx;
1340 }
1341
1342 if opts.show_input && cache.transaction.is_some() {
1343 let tx_block = IotaTransactionBlock::try_from(
1344 cache.transaction.unwrap().into_data(),
1345 module_cache,
1346 cache.digest,
1347 )?;
1348 response.transaction = Some(tx_block);
1349 }
1350
1351 if opts.show_raw_effects {
1352 let raw_effects = cache
1353 .effects
1354 .as_ref()
1355 .map(bcs::to_bytes)
1356 .transpose()
1357 .map_err(|e| anyhow!("Failed to serialize raw effects with error: {e}"))?
1358 .unwrap_or_default();
1359 response.raw_effects = raw_effects;
1360 }
1361
1362 if opts.show_effects && cache.effects.is_some() {
1363 let effects = cache.effects.unwrap().try_into().map_err(|e| {
1364 anyhow!(
1365 "Failed to convert transaction block effects with error: {e}"
1367 )
1368 })?;
1369 response.effects = Some(effects);
1370 }
1371
1372 response.checkpoint = cache.checkpoint_seq;
1373 response.timestamp_ms = cache.timestamp;
1374
1375 if opts.show_events {
1376 response.events = cache.events;
1377 }
1378
1379 if opts.show_balance_changes {
1380 response.balance_changes = cache.balance_changes;
1381 }
1382
1383 if opts.show_object_changes {
1384 response.object_changes = cache.object_changes;
1385 }
1386
1387 Ok(response)
1388}
1389
1390fn calculate_checkpoint_numbers(
1391 cursor: Option<CheckpointSequenceNumber>,
1393 limit: u64,
1394 descending_order: bool,
1395 max_checkpoint: CheckpointSequenceNumber,
1396) -> Vec<CheckpointSequenceNumber> {
1397 let (start_index, end_index) = match cursor {
1398 Some(t) => {
1399 if descending_order {
1400 let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1401 let end = start.saturating_sub(limit - 1);
1402 (end, start)
1403 } else {
1404 let start =
1405 std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1406 let end = std::cmp::min(
1407 start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1408 max_checkpoint,
1409 );
1410 (start, end)
1411 }
1412 }
1413 None => {
1414 if descending_order {
1415 (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1416 } else {
1417 (0, std::cmp::min(limit - 1, max_checkpoint))
1418 }
1419 }
1420 };
1421
1422 if descending_order {
1423 (start_index..=end_index).rev().collect()
1424 } else {
1425 (start_index..=end_index).collect()
1426 }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431 use super::*;
1432
1433 #[test]
1434 fn test_calculate_checkpoint_numbers() {
1435 let cursor = Some(10);
1436 let limit = 5;
1437 let descending_order = true;
1438 let max_checkpoint = 15;
1439
1440 let checkpoint_numbers =
1441 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1442
1443 assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1444 }
1445
1446 #[test]
1447 fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1448 let cursor = None;
1449 let limit = 5;
1450 let descending_order = true;
1451 let max_checkpoint = 15;
1452
1453 let checkpoint_numbers =
1454 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1455
1456 assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1457 }
1458
1459 #[test]
1460 fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1461 let cursor = None;
1462 let limit = 5;
1463 let descending_order = false;
1464 let max_checkpoint = 15;
1465
1466 let checkpoint_numbers =
1467 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1468
1469 assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1470 }
1471
1472 #[test]
1473 fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1474 let cursor = Some(10);
1475 let limit = 5;
1476 let descending_order = false;
1477 let max_checkpoint = 15;
1478
1479 let checkpoint_numbers =
1480 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1481
1482 assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1483 }
1484
1485 #[test]
1486 fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1487 let cursor = None;
1488 let limit = 20;
1489 let descending_order = false;
1490 let max_checkpoint = 15;
1491
1492 let checkpoint_numbers =
1493 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1494
1495 assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1496 }
1497
1498 #[test]
1499 fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1500 let cursor = None;
1501 let limit = 20;
1502 let descending_order = true;
1503 let max_checkpoint = 15;
1504
1505 let checkpoint_numbers =
1506 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1507
1508 assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1509 }
1510}