1use std::{collections::HashMap, sync::Arc, time::Duration};
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use backoff::{ExponentialBackoff, future::retry};
10use futures::future::join_all;
11use indexmap::map::IndexMap;
12use iota_core::authority::AuthorityState;
13use iota_json_rpc_api::{
14 JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
15 ReadApiServer, validate_limit,
16};
17use iota_json_rpc_types::{
18 BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
19 IotaEvent, IotaGetPastObjectRequest, IotaMoveStruct, IotaMoveValue, IotaMoveVariant,
20 IotaObjectData, IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
21 IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
22 IotaTransactionBlockResponseOptions, ObjectChange, ProtocolConfigResponse,
23};
24use iota_metrics::{add_server_timing, spawn_monitored_task};
25use iota_open_rpc::Module;
26use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
27use iota_storage::key_value_store::TransactionKeyValueStore;
28use iota_types::{
29 base_types::{ObjectID, SequenceNumber, TransactionDigest},
30 collection_types::VecMap,
31 crypto::AggregateAuthoritySignature,
32 display::DisplayVersionUpdatedEvent,
33 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
34 error::{IotaError, IotaObjectResponseError},
35 iota_serde::BigInt,
36 messages_checkpoint::{
37 CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
38 },
39 object::{Object, ObjectRead, PastObjectRead},
40 transaction::{Transaction, TransactionDataAPI},
41};
42use itertools::Itertools;
43use jsonrpsee::{RpcModule, core::RpcResult};
44use move_bytecode_utils::module_cache::GetModule;
45use move_core_types::{
46 annotated_value::{MoveStruct, MoveStructLayout, MoveValue},
47 language_storage::StructTag,
48};
49use tap::TapFallible;
50use tracing::{debug, error, instrument, trace, warn};
51
52use crate::{
53 IotaRpcModule, ObjectProvider, ObjectProviderCache,
54 authority_state::{StateRead, StateReadError, StateReadResult},
55 error::{Error, IotaRpcInputError, RpcInterimResult},
56 get_balance_changes_from_effect, get_object_changes,
57 logger::FutureWithTracing as _,
58};
59
60const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
61
62#[derive(Clone)]
65pub struct ReadApi {
66 pub state: Arc<dyn StateRead>,
67 pub transaction_kv_store: Arc<TransactionKeyValueStore>,
68 pub metrics: Arc<JsonRpcMetrics>,
69}
70
71#[derive(Default)]
75struct IntermediateTransactionResponse {
76 digest: TransactionDigest,
77 transaction: Option<Transaction>,
78 effects: Option<TransactionEffects>,
79 events: Option<IotaTransactionBlockEvents>,
80 checkpoint_seq: Option<CheckpointSequenceNumber>,
81 balance_changes: Option<Vec<BalanceChange>>,
82 object_changes: Option<Vec<ObjectChange>>,
83 timestamp: Option<CheckpointTimestamp>,
84 errors: Vec<String>,
85}
86
87impl IntermediateTransactionResponse {
88 pub fn new(digest: TransactionDigest) -> Self {
89 Self {
90 digest,
91 ..Default::default()
92 }
93 }
94
95 pub fn transaction(&self) -> &Option<Transaction> {
96 &self.transaction
97 }
98}
99
100impl ReadApi {
101 pub fn new(
102 state: Arc<AuthorityState>,
103 transaction_kv_store: Arc<TransactionKeyValueStore>,
104 metrics: Arc<JsonRpcMetrics>,
105 ) -> Self {
106 Self {
107 state,
108 transaction_kv_store,
109 metrics,
110 }
111 }
112
113 async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
114 Ok(match id {
115 CheckpointId::SequenceNumber(seq) => {
116 let verified_summary = self
117 .transaction_kv_store
118 .get_checkpoint_summary(seq)
119 .await?;
120 let content = self
121 .transaction_kv_store
122 .get_checkpoint_contents(verified_summary.sequence_number)
123 .await?;
124 let signature = verified_summary.auth_sig().signature.clone();
125 (verified_summary.into_data(), content, signature).into()
126 }
127 CheckpointId::Digest(digest) => {
128 let verified_summary = self
129 .transaction_kv_store
130 .get_checkpoint_summary_by_digest(digest)
131 .await?;
132 let content = self
133 .transaction_kv_store
134 .get_checkpoint_contents(verified_summary.sequence_number)
135 .await?;
136 let signature = verified_summary.auth_sig().signature.clone();
137 (verified_summary.into_data(), content, signature).into()
138 }
139 })
140 }
141
142 pub async fn get_checkpoints_internal(
143 state: Arc<dyn StateRead>,
144 transaction_kv_store: Arc<TransactionKeyValueStore>,
145 cursor: Option<CheckpointSequenceNumber>,
147 limit: u64,
148 descending_order: bool,
149 ) -> StateReadResult<Vec<Checkpoint>> {
150 let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
151 let checkpoint_numbers =
152 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
153
154 let verified_checkpoints = transaction_kv_store
155 .multi_get_checkpoints_summaries(&checkpoint_numbers)
156 .await?;
157
158 let checkpoint_summaries_and_signatures: Vec<(
159 CheckpointSummary,
160 AggregateAuthoritySignature,
161 )> = verified_checkpoints
162 .into_iter()
163 .flatten()
164 .map(|check| {
165 (
166 check.clone().into_summary_and_sequence().1,
167 check.get_validator_signature(),
168 )
169 })
170 .collect();
171
172 let checkpoint_contents = transaction_kv_store
173 .multi_get_checkpoints_contents(&checkpoint_numbers)
174 .await?;
175 let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
176
177 let mut checkpoints: Vec<Checkpoint> = vec![];
178
179 for (summary_and_sig, content) in checkpoint_summaries_and_signatures
180 .into_iter()
181 .zip(contents.into_iter())
182 {
183 checkpoints.push(Checkpoint::from((
184 summary_and_sig.0,
185 content,
186 summary_and_sig.1,
187 )));
188 }
189
190 Ok(checkpoints)
191 }
192
193 async fn multi_get_transaction_blocks_internal(
194 &self,
195 digests: Vec<TransactionDigest>,
196 opts: Option<IotaTransactionBlockResponseOptions>,
197 ) -> Result<Vec<IotaTransactionBlockResponse>, Error> {
198 trace!("start");
199
200 let num_digests = digests.len();
201 if num_digests > *QUERY_MAX_RESULT_LIMIT {
202 Err(IotaRpcInputError::SizeLimitExceeded(
203 QUERY_MAX_RESULT_LIMIT.to_string(),
204 ))?
205 }
206 self.metrics
207 .get_tx_blocks_limit
208 .observe(digests.len() as f64);
209
210 let opts = opts.unwrap_or_default();
211
212 let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
214 IndexMap::from_iter(
215 digests
216 .iter()
217 .map(|k| (k, IntermediateTransactionResponse::new(*k))),
218 );
219 if temp_response.len() < num_digests {
220 Err(IotaRpcInputError::ContainsDuplicates)?
221 }
222
223 if opts.require_input() {
224 trace!("getting input");
225 let digests_clone = digests.clone();
226 let transactions =
227 self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
228 |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
229 )?;
230
231 for ((_digest, cache_entry), txn) in
232 temp_response.iter_mut().zip(transactions.into_iter())
233 {
234 cache_entry.transaction = txn;
235 }
236 }
237
238 if opts.require_effects() {
240 trace!("getting effects");
241 let digests_clone = digests.clone();
242 let effects_list = self.transaction_kv_store
243 .multi_get_fx_by_tx_digest(&digests_clone)
244 .await
245 .tap_err(
246 |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
247 )?;
248 for ((_digest, cache_entry), e) in
249 temp_response.iter_mut().zip(effects_list.into_iter())
250 {
251 cache_entry.effects = e;
252 }
253 }
254
255 trace!("getting checkpoint sequence numbers");
256 let checkpoint_seq_list = self
257 .transaction_kv_store
258 .multi_get_transactions_perpetual_checkpoints(&digests)
259 .await
260 .tap_err(
261 |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
262 for ((_digest, cache_entry), seq) in temp_response
263 .iter_mut()
264 .zip(checkpoint_seq_list.into_iter())
265 {
266 cache_entry.checkpoint_seq = seq;
267 }
268
269 let unique_checkpoint_numbers = temp_response
270 .values()
271 .filter_map(|cache_entry| cache_entry.checkpoint_seq)
272 .unique()
275 .collect::<Vec<CheckpointSequenceNumber>>();
276
277 trace!("getting checkpoint summaries");
279 let timestamps = self
280 .transaction_kv_store
281 .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
282 .await
283 .map_err(|e| {
284 Error::Unexpected(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
285 })?
286 .into_iter()
287 .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
288
289 let checkpoint_to_timestamp = unique_checkpoint_numbers
291 .into_iter()
292 .zip(timestamps)
293 .collect::<HashMap<_, _>>();
294
295 for (_, cache_entry) in temp_response.iter_mut() {
297 if cache_entry.checkpoint_seq.is_some() {
298 cache_entry.timestamp = *checkpoint_to_timestamp
300 .get(cache_entry.checkpoint_seq.as_ref().unwrap())
301 .unwrap();
304 }
305 }
306
307 if opts.show_events {
308 trace!("getting events");
309 let mut non_empty_digests = vec![];
310 for cache_entry in temp_response.values() {
311 if let Some(effects) = &cache_entry.effects {
312 if effects.events_digest().is_some() {
313 non_empty_digests.push(cache_entry.digest);
314 }
315 }
316 }
317 let backoff = ExponentialBackoff {
319 max_elapsed_time: Some(Duration::from_secs(3)),
320 multiplier: 1.0,
321 ..ExponentialBackoff::default()
322 };
323 let mut events = retry(backoff, || async {
324 match self
325 .transaction_kv_store
326 .multi_get_events_by_tx_digests(&non_empty_digests)
327 .await
328 {
329 Ok(events) if !events.contains(&None) => Ok(events),
332 Ok(_) => Err(backoff::Error::transient(Error::Unexpected(
333 "events not found, transaction execution may be incomplete.".into(),
334 ))),
335 Err(e) => Err(backoff::Error::permanent(Error::Unexpected(format!(
336 "failed to call multi_get_events: {e:?}"
337 )))),
338 }
339 })
340 .await
341 .map_err(|e| {
342 Error::Unexpected(format!(
343 "retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
344 ))
345 })?
346 .into_iter();
347
348 for (_, cache_entry) in temp_response.iter_mut() {
350 let transaction_digest = cache_entry.digest;
351 if let Some(events_digest) =
352 cache_entry.effects.as_ref().and_then(|e| e.events_digest())
353 {
354 match events.next() {
355 Some(Some(ev)) => {
356 cache_entry.events =
357 Some(to_iota_transaction_events(self, cache_entry.digest, ev)?)
358 }
359 None | Some(None) => {
360 error!(
361 "failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
362 );
363 cache_entry.errors.push(format!(
364 "failed to fetch events with event digest {events_digest:?}",
365 ))
366 }
367 }
368 } else {
369 cache_entry.events = Some(IotaTransactionBlockEvents::default());
372 }
373 }
374 }
375
376 let object_cache =
377 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
378 if opts.show_balance_changes {
379 trace!("getting balance changes");
380
381 let mut results = vec![];
382 for resp in temp_response.values() {
383 let input_objects = if let Some(tx) = resp.transaction() {
384 tx.data()
385 .inner()
386 .intent_message
387 .value
388 .input_objects()
389 .unwrap_or_default()
390 } else {
391 Vec::new()
393 };
394 results.push(get_balance_changes_from_effect(
395 &object_cache,
396 resp.effects.as_ref().ok_or_else(|| {
397 IotaRpcInputError::GenericNotFound(
398 "unable to derive balance changes because effect is empty".to_string(),
399 )
400 })?,
401 input_objects,
402 None,
403 ));
404 }
405 let results = join_all(results).await;
406 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
407 match result {
408 Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
409 Err(e) => entry
410 .1
411 .errors
412 .push(format!("Failed to fetch balance changes {e:?}")),
413 }
414 }
415 }
416
417 if opts.show_object_changes {
418 trace!("getting object changes");
419
420 let mut results = vec![];
421 for resp in temp_response.values() {
422 let effects = resp.effects.as_ref().ok_or_else(|| {
423 IotaRpcInputError::GenericNotFound(
424 "unable to derive object changes because effect is empty".to_string(),
425 )
426 })?;
427
428 results.push(get_object_changes(
429 &object_cache,
430 resp.transaction
431 .as_ref()
432 .ok_or_else(|| {
433 IotaRpcInputError::GenericNotFound(
434 "unable to derive object changes because transaction is empty"
435 .to_string(),
436 )
437 })?
438 .data()
439 .intent_message()
440 .value
441 .sender(),
442 effects.modified_at_versions(),
443 effects.all_changed_objects(),
444 effects.all_removed_objects(),
445 ));
446 }
447 let results = join_all(results).await;
448 for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
449 match result {
450 Ok(object_changes) => entry.1.object_changes = Some(object_changes),
451 Err(e) => entry
452 .1
453 .errors
454 .push(format!("Failed to fetch object changes {e:?}")),
455 }
456 }
457 }
458
459 let epoch_store = self.state.load_epoch_store_one_call_per_task();
460
461 let converted_tx_block_resps = temp_response
462 .into_iter()
463 .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
464 .collect::<Result<Vec<_>, _>>()?;
465
466 self.metrics
467 .get_tx_blocks_result_size
468 .observe(converted_tx_block_resps.len() as f64);
469 self.metrics
470 .get_tx_blocks_result_size_total
471 .inc_by(converted_tx_block_resps.len() as u64);
472
473 trace!("done");
474
475 Ok(converted_tx_block_resps)
476 }
477}
478
479#[async_trait]
480impl ReadApiServer for ReadApi {
481 #[instrument(skip(self))]
482 async fn get_object(
483 &self,
484 object_id: ObjectID,
485 options: Option<IotaObjectDataOptions>,
486 ) -> RpcResult<IotaObjectResponse> {
487 async move {
488 let state = self.state.clone();
489 let object_read = spawn_monitored_task!(async move {
490 state.get_object_read(&object_id).map_err(|e| {
491 warn!(?object_id, "failed to get object: {:?}", e);
492 Error::from(e)
493 })
494 })
495 .await
496 .map_err(Error::from)??;
497 let options = options.unwrap_or_default();
498
499 match object_read {
500 ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
501 IotaObjectResponseError::NotExists { object_id: id },
502 )),
503 ObjectRead::Exists(object_ref, o, layout) => {
504 let mut display_fields = None;
505 if options.show_display {
506 match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
507 .await
508 {
509 Ok(rendered_fields) => display_fields = Some(rendered_fields),
510 Err(e) => {
511 return Ok(IotaObjectResponse::new(
512 Some(IotaObjectData::new(
513 object_ref, o, layout, options, None,
514 )?),
515 Some(IotaObjectResponseError::Display {
516 error: e.to_string(),
517 }),
518 ));
519 }
520 }
521 }
522 Ok(IotaObjectResponse::new_with_data(IotaObjectData::new(
523 object_ref,
524 o,
525 layout,
526 options,
527 display_fields,
528 )?))
529 }
530 ObjectRead::Deleted((object_id, version, digest)) => Ok(
531 IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
532 object_id,
533 version,
534 digest,
535 }),
536 ),
537 }
538 }
539 .trace()
540 .await
541 }
542
543 #[instrument(skip(self))]
544 async fn multi_get_objects(
545 &self,
546 object_ids: Vec<ObjectID>,
547 options: Option<IotaObjectDataOptions>,
548 ) -> RpcResult<Vec<IotaObjectResponse>> {
549 async move {
550 if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
551 self.metrics
552 .get_objects_limit
553 .observe(object_ids.len() as f64);
554 let mut futures = vec![];
555 for object_id in object_ids {
556 futures.push(self.get_object(object_id, options.clone()));
557 }
558 let results = join_all(futures).await;
559
560 let objects_result: Result<Vec<IotaObjectResponse>, String> = results
561 .into_iter()
562 .map(|result| match result {
563 Ok(response) => Ok(response),
564 Err(error) => {
565 error!("failed to fetch object with error: {error:?}");
566 Err(format!("Error: {error}"))
567 }
568 })
569 .collect();
570
571 let objects = objects_result.map_err(|err| {
572 Error::Unexpected(format!("Failed to fetch objects with error: {err}"))
573 })?;
574
575 self.metrics
576 .get_objects_result_size
577 .observe(objects.len() as f64);
578 self.metrics
579 .get_objects_result_size_total
580 .inc_by(objects.len() as u64);
581 Ok(objects)
582 } else {
583 Err(IotaRpcInputError::SizeLimitExceeded(
584 QUERY_MAX_RESULT_LIMIT.to_string(),
585 ))?
586 }
587 }
588 .trace()
589 .await
590 }
591
592 #[instrument(skip(self))]
593 async fn try_get_past_object(
594 &self,
595 object_id: ObjectID,
596 version: SequenceNumber,
597 options: Option<IotaObjectDataOptions>,
598 ) -> RpcResult<IotaPastObjectResponse> {
599 async move {
600 let state = self.state.clone();
601 let past_read = spawn_monitored_task!(async move {
602 state.get_past_object_read(&object_id, version)
603 .map_err(|e| {
604 error!("failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
605 Error::from(e)
606 })}).await.map_err(Error::from)??;
607 let options = options.unwrap_or_default();
608 match past_read {
609 PastObjectRead::ObjectNotExists(id) => {
610 Ok(IotaPastObjectResponse::ObjectNotExists(id))
611 }
612 PastObjectRead::VersionFound(object_ref, o, layout) => {
613 let display_fields = if options.show_display {
614 Some(
616 get_display_fields(self, &self.transaction_kv_store, &o, &layout)
617 .await
618 .map_err(|e| {
619 Error::Unexpected(format!(
620 "Unable to render object at version {version}: {e}"
621 ))
622 })?,
623 )
624 } else {
625 None
626 };
627 Ok(IotaPastObjectResponse::VersionFound(
628 IotaObjectData::new(object_ref, o, layout, options, display_fields)?,
629 ))
630 }
631 PastObjectRead::ObjectDeleted(oref) => {
632 Ok(IotaPastObjectResponse::ObjectDeleted(oref.into()))
633 }
634 PastObjectRead::VersionNotFound(id, seq_num) => {
635 Ok(IotaPastObjectResponse::VersionNotFound(id, seq_num))
636 }
637 PastObjectRead::VersionTooHigh {
638 object_id,
639 asked_version,
640 latest_version,
641 } => Ok(IotaPastObjectResponse::VersionTooHigh {
642 object_id,
643 asked_version,
644 latest_version,
645 }),
646 }
647 }
648 .trace()
649 .await
650 }
651
652 #[instrument(skip(self))]
653 async fn try_get_object_before_version(
654 &self,
655 object_id: ObjectID,
656 version: SequenceNumber,
657 ) -> RpcResult<IotaPastObjectResponse> {
658 let version = self
659 .state
660 .find_object_lt_or_eq_version(&object_id, &version)
661 .await
662 .map_err(Error::from)?
663 .map(|obj| obj.version())
664 .unwrap_or_default();
665 self.try_get_past_object(
666 object_id,
667 version,
668 Some(IotaObjectDataOptions::bcs_lossless()),
669 )
670 .await
671 }
672
673 #[instrument(skip(self))]
674 async fn try_multi_get_past_objects(
675 &self,
676 past_objects: Vec<IotaGetPastObjectRequest>,
677 options: Option<IotaObjectDataOptions>,
678 ) -> RpcResult<Vec<IotaPastObjectResponse>> {
679 async move {
680 if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
681 let mut futures = vec![];
682 for past_object in past_objects {
683 futures.push(self.try_get_past_object(
684 past_object.object_id,
685 past_object.version,
686 options.clone(),
687 ));
688 }
689 let results = join_all(futures).await;
690
691 let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
692 let success = oks.into_iter().filter_map(Result::ok).collect();
693 let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
694 if !errors.is_empty() {
695 let error_string = errors
696 .iter()
697 .map(|e| e.to_string())
698 .collect::<Vec<String>>()
699 .join("; ");
700 Err(anyhow!("{error_string}").into()) } else {
705 Ok(success)
706 }
707 } else {
708 Err(IotaRpcInputError::SizeLimitExceeded(
709 QUERY_MAX_RESULT_LIMIT.to_string(),
710 ))?
711 }
712 }
713 .trace()
714 .await
715 }
716
717 #[instrument(skip(self))]
718 async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
719 async move {
720 Ok(self
721 .state
722 .get_total_transaction_blocks()
723 .map_err(Error::from)?
724 .into()) }
726 .trace()
727 .await
728 }
729
730 #[instrument(skip(self))]
731 async fn is_transaction_indexed_on_node(&self, digest: TransactionDigest) -> RpcResult<bool> {
732 let transaction = async move {
733 let transaction_kv_store = self.transaction_kv_store.clone();
734 let mut transactions = spawn_monitored_task!(async move {
735 let ret = transaction_kv_store
736 .multi_get_tx(&[digest])
737 .await
738 .map_err(|err| {
739 debug!(tx_digest=?digest, "Failed to get transaction: {:?}", err);
740 Error::from(err)
741 });
742 add_server_timing("tx_kv_lookup");
743 ret
744 })
745 .await??;
746 Ok(transactions
747 .pop()
748 .expect("there should be one tx lookup response"))
749 }
750 .trace()
751 .await?;
752 Ok(transaction.map(|tx| *tx.digest()) == Some(digest))
753 }
754
755 #[instrument(skip(self))]
756 async fn get_transaction_block(
757 &self,
758 digest: TransactionDigest,
759 opts: Option<IotaTransactionBlockResponseOptions>,
760 ) -> RpcResult<IotaTransactionBlockResponse> {
761 async move {
762 let opts = opts.unwrap_or_default();
763 let mut temp_response = IntermediateTransactionResponse::new(digest);
764
765 let transaction_kv_store = self.transaction_kv_store.clone();
767 let transaction = spawn_monitored_task!(async move {
768 let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
769 debug!(tx_digest=?digest, "Failed to get transaction: {:?}", err);
770 Error::from(err)
771 });
772 add_server_timing("tx_kv_lookup");
773 ret
774 })
775 .await
776 .map_err(Error::from)??;
777 let input_objects = transaction
778 .data()
779 .inner()
780 .intent_message
781 .value
782 .input_objects()
783 .unwrap_or_default();
784
785 if opts.require_input() {
787 temp_response.transaction = Some(transaction);
788 }
789
790 if opts.require_effects() {
792 let transaction_kv_store = self.transaction_kv_store.clone();
793 temp_response.effects = Some(
794 spawn_monitored_task!(async move {
795 transaction_kv_store
796 .get_fx_by_tx_digest(digest)
797 .await
798 .map_err(|err| {
799 debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
800 Error::from(err)
801 })
802 })
803 .await
804 .map_err(Error::from)??,
805 );
806 }
807
808 temp_response.checkpoint_seq = self
814 .transaction_kv_store
815 .get_transaction_perpetual_checkpoint(digest)
816 .await
817 .map_err(|e| {
818 error!("failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
819 Error::from(e)
820 })?;
821
822 if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
823 let kv_store = self.transaction_kv_store.clone();
824 let checkpoint_seq = *checkpoint_seq;
825 let checkpoint = spawn_monitored_task!(async move {
826 kv_store
827 .get_checkpoint_summary(checkpoint_seq)
829 .await
830 .map_err(|e| {
831 error!("failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
832 Error::from(e)
833 })
834 }).await.map_err(Error::from)??;
835 temp_response.timestamp = Some(checkpoint.timestamp_ms);
837 }
838
839 if opts.show_events && temp_response.effects.is_some() {
840 let transaction_kv_store = self.transaction_kv_store.clone();
841 let events = spawn_monitored_task!(async move {
842 transaction_kv_store
843 .multi_get_events_by_tx_digests(&[digest])
844 .await
845 .map_err(|e| {
846 error!("failed to call get transaction events for transaction: {digest:?} with error {e:?}");
847 Error::from(e)
848 })
849 })
850 .await
851 .map_err(Error::from)??
852 .pop()
853 .flatten();
854 match events {
855 None => temp_response.events = Some(IotaTransactionBlockEvents::default()),
856 Some(events) => match to_iota_transaction_events(self, digest, events) {
857 Ok(e) => temp_response.events = Some(e),
858 Err(e) => temp_response.errors.push(e.to_string()),
859 },
860 }
861 }
862
863 let object_cache =
864 ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
865 if opts.show_balance_changes {
866 if let Some(effects) = &temp_response.effects {
867 let balance_changes = get_balance_changes_from_effect(
868 &object_cache,
869 effects,
870 input_objects,
871 None,
872 )
873 .await;
874
875 if let Ok(balance_changes) = balance_changes {
876 temp_response.balance_changes = Some(balance_changes);
877 } else {
878 temp_response.errors.push(format!(
879 "Cannot retrieve balance changes: {}",
880 balance_changes.unwrap_err()
881 ));
882 }
883 }
884 }
885
886 if opts.show_object_changes {
887 if let (Some(effects), Some(input)) =
888 (&temp_response.effects, &temp_response.transaction)
889 {
890 let sender = input.data().intent_message().value.sender();
891 let object_changes = get_object_changes(
892 &object_cache,
893 sender,
894 effects.modified_at_versions(),
895 effects.all_changed_objects(),
896 effects.all_removed_objects(),
897 )
898 .await;
899
900 if let Ok(object_changes) = object_changes {
901 temp_response.object_changes = Some(object_changes);
902 } else {
903 temp_response.errors.push(format!(
904 "Cannot retrieve object changes: {}",
905 object_changes.unwrap_err()
906 ));
907 }
908 }
909 }
910 let epoch_store = self.state.load_epoch_store_one_call_per_task();
911
912 convert_to_response(temp_response, &opts, epoch_store.module_cache())
913 }
914 .trace()
915 .await
916 }
917
918 #[instrument(skip(self))]
919 async fn multi_get_transaction_blocks(
920 &self,
921 digests: Vec<TransactionDigest>,
922 opts: Option<IotaTransactionBlockResponseOptions>,
923 ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
924 async move {
925 let cloned_self = self.clone();
926 spawn_monitored_task!(async move {
927 cloned_self
928 .multi_get_transaction_blocks_internal(digests, opts)
929 .await
930 })
931 .await
932 .map_err(Error::from)?
933 }
934 .trace()
935 .await
936 }
937
938 #[instrument(skip(self))]
939 async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
940 async move {
941 let state = self.state.clone();
942 let transaction_kv_store = self.transaction_kv_store.clone();
943 spawn_monitored_task!(async move{
944 let store = state.load_epoch_store_one_call_per_task();
945 let events = transaction_kv_store
946 .multi_get_events_by_tx_digests(&[transaction_digest])
947 .await
948 .map_err(
949 |e| {
950 error!("failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
951 Error::StateRead(e.into())
952 })?
953 .pop()
954 .flatten();
955 Ok(match events {
956 Some(events) => events
957 .data
958 .into_iter()
959 .enumerate()
960 .map(|(seq, e)| {
961 let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
962 IotaEvent::try_from(e, transaction_digest, seq as u64, None, layout)
963 })
964 .collect::<Result<Vec<_>, _>>()
965 .map_err(Error::Iota)?,
966 None => vec![],
967 })
968 })
969 .await
970 .map_err(Error::from)?
971 }
972 .trace()
973 .await
974 }
975
976 #[instrument(skip(self))]
977 async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
978 async move {
979 Ok(self
980 .state
981 .get_latest_checkpoint_sequence_number()
982 .map_err(|e| {
983 IotaRpcInputError::GenericNotFound(format!(
984 "Latest checkpoint sequence number was not found with error :{e}"
985 ))
986 })?
987 .into())
988 }
989 .trace()
990 .await
991 }
992
993 #[instrument(skip(self))]
994 async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
995 self.get_checkpoint_internal(id).trace().await
996 }
997
998 #[instrument(skip(self))]
999 async fn get_checkpoints(
1000 &self,
1001 cursor: Option<BigInt<u64>>,
1003 limit: Option<usize>,
1004 descending_order: bool,
1005 ) -> RpcResult<CheckpointPage> {
1006 async move {
1007 let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
1008 .map_err(IotaRpcInputError::from)?;
1009
1010 let state = self.state.clone();
1011 let kv_store = self.transaction_kv_store.clone();
1012
1013 self.metrics.get_checkpoints_limit.observe(limit as f64);
1014
1015 let mut data = spawn_monitored_task!(Self::get_checkpoints_internal(
1016 state,
1017 kv_store,
1018 cursor.map(|s| *s),
1019 limit as u64 + 1,
1020 descending_order,
1021 ))
1022 .await
1023 .map_err(Error::from)?
1024 .map_err(Error::from)?;
1025
1026 let has_next_page = data.len() > limit;
1027 data.truncate(limit);
1028
1029 let next_cursor = if has_next_page {
1030 data.last().cloned().map(|d| d.sequence_number.into())
1031 } else {
1032 None
1033 };
1034
1035 self.metrics
1036 .get_checkpoints_result_size
1037 .observe(data.len() as f64);
1038 self.metrics
1039 .get_checkpoints_result_size_total
1040 .inc_by(data.len() as u64);
1041
1042 Ok(CheckpointPage {
1043 data,
1044 next_cursor,
1045 has_next_page,
1046 })
1047 }
1048 .trace()
1049 .await
1050 }
1051
1052 #[instrument(skip(self))]
1053 async fn get_protocol_config(
1054 &self,
1055 version: Option<BigInt<u64>>,
1056 ) -> RpcResult<ProtocolConfigResponse> {
1057 async move {
1058 version
1059 .map(|v| {
1060 ProtocolConfig::get_for_version_if_supported(
1061 (*v).into(),
1062 self.state.get_chain_identifier()?.chain(),
1063 )
1064 .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
1065 ProtocolVersion::MIN.as_u64(),
1066 ProtocolVersion::MAX.as_u64(),
1067 ))
1068 .map_err(Error::from)
1069 })
1070 .unwrap_or(Ok(self
1071 .state
1072 .load_epoch_store_one_call_per_task()
1073 .protocol_config()
1074 .clone()))
1075 .map(ProtocolConfigResponse::from)
1076 }
1077 .trace()
1078 .await
1079 }
1080
1081 #[instrument(skip(self))]
1082 async fn get_chain_identifier(&self) -> RpcResult<String> {
1083 async move {
1084 let ci = self.state.get_chain_identifier()?;
1085 Ok(ci.to_string())
1086 }
1087 .trace()
1088 .await
1089 }
1090}
1091
1092impl IotaRpcModule for ReadApi {
1093 fn rpc(self) -> RpcModule<Self> {
1094 self.into_rpc()
1095 }
1096
1097 fn rpc_doc_module() -> Module {
1098 ReadApiOpenRpc::module_doc()
1099 }
1100}
1101
1102fn to_iota_transaction_events(
1103 fullnode_api: &ReadApi,
1104 tx_digest: TransactionDigest,
1105 events: TransactionEvents,
1106) -> Result<IotaTransactionBlockEvents, Error> {
1107 let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1108 let backing_package_store = fullnode_api.state.get_backing_package_store();
1109 let mut layout_resolver = epoch_store
1110 .executor()
1111 .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1112 Ok(IotaTransactionBlockEvents::try_from(
1113 events,
1114 tx_digest,
1115 None,
1116 layout_resolver.as_mut(),
1117 )?)
1118}
1119
1120#[derive(Debug, thiserror::Error)]
1121pub enum ObjectDisplayError {
1122 #[error("Not a move struct")]
1123 NotMoveStruct,
1124
1125 #[error("Failed to extract layout")]
1126 Layout,
1127
1128 #[error("Failed to extract Move object")]
1129 MoveObject,
1130
1131 #[error(transparent)]
1132 Deserialization(#[from] IotaError),
1133
1134 #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1135 Bcs(#[from] bcs::Error),
1136
1137 #[error(transparent)]
1138 StateRead(#[from] StateReadError),
1139}
1140
1141async fn get_display_fields(
1142 fullnode_api: &ReadApi,
1143 kv_store: &Arc<TransactionKeyValueStore>,
1144 original_object: &Object,
1145 original_layout: &Option<MoveStructLayout>,
1146) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1147 let Some((object_type, layout)) = get_object_type_and_struct(original_object, original_layout)?
1148 else {
1149 return Ok(DisplayFieldsResponse {
1150 data: None,
1151 error: None,
1152 });
1153 };
1154 if let Some(display_object) =
1155 get_display_object_by_type(kv_store, fullnode_api, &object_type).await?
1156 {
1157 return get_rendered_fields(display_object.fields, &layout);
1158 }
1159 Ok(DisplayFieldsResponse {
1160 data: None,
1161 error: None,
1162 })
1163}
1164
1165async fn get_display_object_by_type(
1166 kv_store: &Arc<TransactionKeyValueStore>,
1167 fullnode_api: &ReadApi,
1168 object_type: &StructTag,
1169 ) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1171 let mut events = fullnode_api
1172 .state
1173 .query_events(
1174 kv_store,
1175 EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1176 None,
1177 1,
1178 true,
1179 )
1180 .await?;
1181
1182 if let Some(event) = events.pop() {
1185 let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1186 Ok(Some(display))
1187 } else {
1188 Ok(None)
1189 }
1190}
1191
1192pub fn get_object_type_and_struct(
1193 o: &Object,
1194 layout: &Option<MoveStructLayout>,
1195) -> Result<Option<(StructTag, MoveStruct)>, ObjectDisplayError> {
1196 if let Some(object_type) = o.type_() {
1197 let move_struct = get_move_struct(o, layout)?;
1198 Ok(Some((object_type.clone().into(), move_struct)))
1199 } else {
1200 Ok(None)
1201 }
1202}
1203
1204fn get_move_struct(
1205 o: &Object,
1206 layout: &Option<MoveStructLayout>,
1207) -> Result<MoveStruct, ObjectDisplayError> {
1208 let layout = layout.as_ref().ok_or_else(|| ObjectDisplayError::Layout)?;
1209 Ok(o.data
1210 .try_as_move()
1211 .ok_or_else(|| ObjectDisplayError::MoveObject)?
1212 .to_move_struct(layout)?)
1213}
1214
1215pub fn get_rendered_fields(
1216 fields: VecMap<String, String>,
1217 move_struct: &MoveStruct,
1218) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1219 let iota_move_value: IotaMoveValue = MoveValue::Struct(move_struct.clone()).into();
1220 if let IotaMoveValue::Struct(move_struct) = iota_move_value {
1221 let fields =
1222 fields
1223 .contents
1224 .iter()
1225 .map(|entry| match parse_template(&entry.value, &move_struct) {
1226 Ok(value) => Ok((entry.key.clone(), value)),
1227 Err(e) => Err(e),
1228 });
1229 let (oks, errs): (Vec<_>, Vec<_>) = fields.partition(Result::is_ok);
1230 let success = oks.into_iter().filter_map(Result::ok).collect();
1231 let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
1232 let error_string = errors
1233 .iter()
1234 .map(|e| e.to_string())
1235 .collect::<Vec<String>>()
1236 .join("; ");
1237 let error = if !error_string.is_empty() {
1238 Some(IotaObjectResponseError::Display {
1239 error: anyhow!("{error_string}").to_string(),
1240 })
1241 } else {
1242 None
1243 };
1244
1245 return Ok(DisplayFieldsResponse {
1246 data: Some(success),
1247 error,
1248 });
1249 }
1250 Err(ObjectDisplayError::NotMoveStruct)?
1251}
1252
1253fn parse_template(template: &str, move_struct: &IotaMoveStruct) -> Result<String, Error> {
1254 let mut output = template.to_string();
1255 let mut var_name = String::new();
1256 let mut in_braces = false;
1257 let mut escaped = false;
1258
1259 for ch in template.chars() {
1260 match ch {
1261 '\\' => {
1262 escaped = true;
1263 continue;
1264 }
1265 '{' if !escaped => {
1266 in_braces = true;
1267 var_name.clear();
1268 }
1269 '}' if !escaped => {
1270 in_braces = false;
1271 let value = get_value_from_move_struct(move_struct, &var_name)?;
1272 output = output.replace(&format!("{{{var_name}}}"), &value.to_string());
1273 }
1274 _ if !escaped => {
1275 if in_braces {
1276 var_name.push(ch);
1277 }
1278 }
1279 _ => {}
1280 }
1281 escaped = false;
1282 }
1283
1284 Ok(output.replace('\\', ""))
1285}
1286
1287fn get_value_from_move_struct(
1288 move_struct: &IotaMoveStruct,
1289 var_name: &str,
1290) -> Result<String, Error> {
1291 let parts: Vec<&str> = var_name.split('.').collect();
1292 if parts.is_empty() {
1293 Err(anyhow!("Display template value cannot be empty"))?;
1294 }
1295 if parts.len() > MAX_DISPLAY_NESTED_LEVEL {
1296 Err(anyhow!(
1297 "Display template value nested depth cannot exist {}",
1298 MAX_DISPLAY_NESTED_LEVEL
1299 ))?;
1300 }
1301 let mut current_value = &IotaMoveValue::Struct(move_struct.clone());
1302 for part in parts {
1304 match current_value {
1305 IotaMoveValue::Struct(move_struct) => {
1306 if let IotaMoveStruct::WithTypes { type_: _, fields }
1307 | IotaMoveStruct::WithFields(fields) = move_struct
1308 {
1309 if let Some(value) = fields.get(part) {
1310 current_value = value;
1311 } else {
1312 Err(anyhow!("Field value {var_name} cannot be found in struct"))?;
1313 }
1314 } else {
1315 Err(Error::Unexpected(format!(
1316 "Unexpected move struct type for field {var_name}"
1317 )))?;
1318 }
1319 }
1320 IotaMoveValue::Variant(IotaMoveVariant {
1321 fields, variant, ..
1322 }) => {
1323 if let Some(value) = fields.get(part) {
1324 current_value = value;
1325 } else {
1326 Err(anyhow!(
1327 "Field value {var_name} cannot be found in variant {variant}",
1328 ))?
1329 }
1330 }
1331 _ => {
1332 Err(Error::Unexpected(format!(
1333 "Unexpected move value type for field {var_name}"
1334 )))?;
1335 }
1336 }
1337 }
1338
1339 match current_value {
1340 IotaMoveValue::Option(move_option) => match move_option.as_ref() {
1341 Some(move_value) => Ok(move_value.to_string()),
1342 None => Ok("".to_string()),
1343 },
1344 IotaMoveValue::Vector(_) => Err(anyhow!(
1345 "Vector is not supported as a Display value {var_name}"
1346 ))?,
1347
1348 _ => Ok(current_value.to_string()),
1349 }
1350}
1351
1352fn convert_to_response(
1353 cache: IntermediateTransactionResponse,
1354 opts: &IotaTransactionBlockResponseOptions,
1355 module_cache: &impl GetModule,
1356) -> RpcInterimResult<IotaTransactionBlockResponse> {
1357 let mut response = IotaTransactionBlockResponse::new(cache.digest);
1358 response.errors = cache.errors;
1359
1360 if opts.show_raw_input && cache.transaction.is_some() {
1361 let sender_signed_data = cache.transaction.as_ref().unwrap().data();
1362 let raw_tx = bcs::to_bytes(sender_signed_data)
1363 .map_err(|e| anyhow!("Failed to serialize raw transaction with error: {e}"))?; response.raw_transaction = raw_tx;
1365 }
1366
1367 if opts.show_input && cache.transaction.is_some() {
1368 let tx_block = IotaTransactionBlock::try_from(
1369 cache.transaction.unwrap().into_data(),
1370 module_cache,
1371 cache.digest,
1372 )?;
1373 response.transaction = Some(tx_block);
1374 }
1375
1376 if opts.show_raw_effects {
1377 let raw_effects = cache
1378 .effects
1379 .as_ref()
1380 .map(bcs::to_bytes)
1381 .transpose()
1382 .map_err(|e| anyhow!("Failed to serialize raw effects with error: {e}"))?
1383 .unwrap_or_default();
1384 response.raw_effects = raw_effects;
1385 }
1386
1387 if opts.show_effects && cache.effects.is_some() {
1388 let effects = cache.effects.unwrap().try_into().map_err(|e| {
1389 anyhow!(
1390 "Failed to convert transaction block effects with error: {e}"
1392 )
1393 })?;
1394 response.effects = Some(effects);
1395 }
1396
1397 response.checkpoint = cache.checkpoint_seq;
1398 response.timestamp_ms = cache.timestamp;
1399
1400 if opts.show_events {
1401 response.events = cache.events;
1402 }
1403
1404 if opts.show_balance_changes {
1405 response.balance_changes = cache.balance_changes;
1406 }
1407
1408 if opts.show_object_changes {
1409 response.object_changes = cache.object_changes;
1410 }
1411
1412 Ok(response)
1413}
1414
1415fn calculate_checkpoint_numbers(
1416 cursor: Option<CheckpointSequenceNumber>,
1418 limit: u64,
1419 descending_order: bool,
1420 max_checkpoint: CheckpointSequenceNumber,
1421) -> Vec<CheckpointSequenceNumber> {
1422 let (start_index, end_index) = match cursor {
1423 Some(t) => {
1424 if descending_order {
1425 let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1426 let end = start.saturating_sub(limit - 1);
1427 (end, start)
1428 } else {
1429 let start =
1430 std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1431 let end = std::cmp::min(
1432 start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1433 max_checkpoint,
1434 );
1435 (start, end)
1436 }
1437 }
1438 None => {
1439 if descending_order {
1440 (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1441 } else {
1442 (0, std::cmp::min(limit - 1, max_checkpoint))
1443 }
1444 }
1445 };
1446
1447 if descending_order {
1448 (start_index..=end_index).rev().collect()
1449 } else {
1450 (start_index..=end_index).collect()
1451 }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456 use super::*;
1457
1458 #[test]
1459 fn test_calculate_checkpoint_numbers() {
1460 let cursor = Some(10);
1461 let limit = 5;
1462 let descending_order = true;
1463 let max_checkpoint = 15;
1464
1465 let checkpoint_numbers =
1466 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1467
1468 assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1469 }
1470
1471 #[test]
1472 fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1473 let cursor = None;
1474 let limit = 5;
1475 let descending_order = true;
1476 let max_checkpoint = 15;
1477
1478 let checkpoint_numbers =
1479 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1480
1481 assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1482 }
1483
1484 #[test]
1485 fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1486 let cursor = None;
1487 let limit = 5;
1488 let descending_order = false;
1489 let max_checkpoint = 15;
1490
1491 let checkpoint_numbers =
1492 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1493
1494 assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1495 }
1496
1497 #[test]
1498 fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1499 let cursor = Some(10);
1500 let limit = 5;
1501 let descending_order = false;
1502 let max_checkpoint = 15;
1503
1504 let checkpoint_numbers =
1505 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1506
1507 assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1508 }
1509
1510 #[test]
1511 fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1512 let cursor = None;
1513 let limit = 20;
1514 let descending_order = false;
1515 let max_checkpoint = 15;
1516
1517 let checkpoint_numbers =
1518 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1519
1520 assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1521 }
1522
1523 #[test]
1524 fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1525 let cursor = None;
1526 let limit = 20;
1527 let descending_order = true;
1528 let max_checkpoint = 15;
1529
1530 let checkpoint_numbers =
1531 calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1532
1533 assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1534 }
1535}