iota_json_rpc/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc, time::Duration};
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use backoff::{ExponentialBackoff, future::retry};
10use futures::future::join_all;
11use indexmap::map::IndexMap;
12use iota_core::authority::AuthorityState;
13use iota_json_rpc_api::{
14    JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
15    ReadApiServer, validate_limit,
16};
17use iota_json_rpc_types::{
18    BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
19    IotaEvent, IotaGetPastObjectRequest, IotaMoveStruct, IotaMoveValue, IotaMoveVariant,
20    IotaObjectData, IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
21    IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
22    IotaTransactionBlockResponseOptions, ObjectChange, ProtocolConfigResponse,
23};
24use iota_metrics::{add_server_timing, spawn_monitored_task};
25use iota_open_rpc::Module;
26use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
27use iota_storage::key_value_store::TransactionKeyValueStore;
28use iota_types::{
29    base_types::{ObjectID, SequenceNumber, TransactionDigest},
30    collection_types::VecMap,
31    crypto::AggregateAuthoritySignature,
32    display::DisplayVersionUpdatedEvent,
33    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
34    error::{IotaError, IotaObjectResponseError},
35    iota_serde::BigInt,
36    messages_checkpoint::{
37        CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
38    },
39    object::{Object, ObjectRead, PastObjectRead},
40    transaction::{Transaction, TransactionDataAPI},
41};
42use itertools::Itertools;
43use jsonrpsee::{RpcModule, core::RpcResult};
44use move_bytecode_utils::module_cache::GetModule;
45use move_core_types::{
46    annotated_value::{MoveStruct, MoveStructLayout, MoveValue},
47    language_storage::StructTag,
48};
49use tap::TapFallible;
50use tracing::{debug, error, instrument, trace, warn};
51
52use crate::{
53    IotaRpcModule, ObjectProvider, ObjectProviderCache,
54    authority_state::{StateRead, StateReadError, StateReadResult},
55    error::{Error, IotaRpcInputError, RpcInterimResult},
56    get_balance_changes_from_effect, get_object_changes,
57    logger::FutureWithTracing as _,
58};
59
60const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
61
62// An implementation of the read portion of the JSON-RPC interface intended for
63// use in Fullnodes.
64#[derive(Clone)]
65pub struct ReadApi {
66    pub state: Arc<dyn StateRead>,
67    pub transaction_kv_store: Arc<TransactionKeyValueStore>,
68    pub metrics: Arc<JsonRpcMetrics>,
69}
70
71// Internal data structure to make it easy to work with data returned from
72// authority store and also enable code sharing between
73// get_transaction_with_options, multi_get_transaction_with_options, etc.
74#[derive(Default)]
75struct IntermediateTransactionResponse {
76    digest: TransactionDigest,
77    transaction: Option<Transaction>,
78    effects: Option<TransactionEffects>,
79    events: Option<IotaTransactionBlockEvents>,
80    checkpoint_seq: Option<CheckpointSequenceNumber>,
81    balance_changes: Option<Vec<BalanceChange>>,
82    object_changes: Option<Vec<ObjectChange>>,
83    timestamp: Option<CheckpointTimestamp>,
84    errors: Vec<String>,
85}
86
87impl IntermediateTransactionResponse {
88    pub fn new(digest: TransactionDigest) -> Self {
89        Self {
90            digest,
91            ..Default::default()
92        }
93    }
94
95    pub fn transaction(&self) -> &Option<Transaction> {
96        &self.transaction
97    }
98}
99
100impl ReadApi {
101    pub fn new(
102        state: Arc<AuthorityState>,
103        transaction_kv_store: Arc<TransactionKeyValueStore>,
104        metrics: Arc<JsonRpcMetrics>,
105    ) -> Self {
106        Self {
107            state,
108            transaction_kv_store,
109            metrics,
110        }
111    }
112
113    async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
114        Ok(match id {
115            CheckpointId::SequenceNumber(seq) => {
116                let verified_summary = self
117                    .transaction_kv_store
118                    .get_checkpoint_summary(seq)
119                    .await?;
120                let content = self
121                    .transaction_kv_store
122                    .get_checkpoint_contents(verified_summary.sequence_number)
123                    .await?;
124                let signature = verified_summary.auth_sig().signature.clone();
125                (verified_summary.into_data(), content, signature).into()
126            }
127            CheckpointId::Digest(digest) => {
128                let verified_summary = self
129                    .transaction_kv_store
130                    .get_checkpoint_summary_by_digest(digest)
131                    .await?;
132                let content = self
133                    .transaction_kv_store
134                    .get_checkpoint_contents(verified_summary.sequence_number)
135                    .await?;
136                let signature = verified_summary.auth_sig().signature.clone();
137                (verified_summary.into_data(), content, signature).into()
138            }
139        })
140    }
141
142    pub async fn get_checkpoints_internal(
143        state: Arc<dyn StateRead>,
144        transaction_kv_store: Arc<TransactionKeyValueStore>,
145        // If `Some`, the query will start from the next item after the specified cursor
146        cursor: Option<CheckpointSequenceNumber>,
147        limit: u64,
148        descending_order: bool,
149    ) -> StateReadResult<Vec<Checkpoint>> {
150        let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
151        let checkpoint_numbers =
152            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
153
154        let verified_checkpoints = transaction_kv_store
155            .multi_get_checkpoints_summaries(&checkpoint_numbers)
156            .await?;
157
158        let checkpoint_summaries_and_signatures: Vec<(
159            CheckpointSummary,
160            AggregateAuthoritySignature,
161        )> = verified_checkpoints
162            .into_iter()
163            .flatten()
164            .map(|check| {
165                (
166                    check.clone().into_summary_and_sequence().1,
167                    check.get_validator_signature(),
168                )
169            })
170            .collect();
171
172        let checkpoint_contents = transaction_kv_store
173            .multi_get_checkpoints_contents(&checkpoint_numbers)
174            .await?;
175        let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
176
177        let mut checkpoints: Vec<Checkpoint> = vec![];
178
179        for (summary_and_sig, content) in checkpoint_summaries_and_signatures
180            .into_iter()
181            .zip(contents.into_iter())
182        {
183            checkpoints.push(Checkpoint::from((
184                summary_and_sig.0,
185                content,
186                summary_and_sig.1,
187            )));
188        }
189
190        Ok(checkpoints)
191    }
192
193    async fn multi_get_transaction_blocks_internal(
194        &self,
195        digests: Vec<TransactionDigest>,
196        opts: Option<IotaTransactionBlockResponseOptions>,
197    ) -> Result<Vec<IotaTransactionBlockResponse>, Error> {
198        trace!("start");
199
200        let num_digests = digests.len();
201        if num_digests > *QUERY_MAX_RESULT_LIMIT {
202            Err(IotaRpcInputError::SizeLimitExceeded(
203                QUERY_MAX_RESULT_LIMIT.to_string(),
204            ))?
205        }
206        self.metrics
207            .get_tx_blocks_limit
208            .observe(digests.len() as f64);
209
210        let opts = opts.unwrap_or_default();
211
212        // use LinkedHashMap to dedup and can iterate in insertion order.
213        let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
214            IndexMap::from_iter(
215                digests
216                    .iter()
217                    .map(|k| (k, IntermediateTransactionResponse::new(*k))),
218            );
219        if temp_response.len() < num_digests {
220            Err(IotaRpcInputError::ContainsDuplicates)?
221        }
222
223        if opts.require_input() {
224            trace!("getting input");
225            let digests_clone = digests.clone();
226            let transactions =
227                self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
228                    |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
229                )?;
230
231            for ((_digest, cache_entry), txn) in
232                temp_response.iter_mut().zip(transactions.into_iter())
233            {
234                cache_entry.transaction = txn;
235            }
236        }
237
238        // Fetch effects when `show_events` is true because events relies on effects
239        if opts.require_effects() {
240            trace!("getting effects");
241            let digests_clone = digests.clone();
242            let effects_list = self.transaction_kv_store
243                .multi_get_fx_by_tx_digest(&digests_clone)
244                .await
245                .tap_err(
246                    |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
247                )?;
248            for ((_digest, cache_entry), e) in
249                temp_response.iter_mut().zip(effects_list.into_iter())
250            {
251                cache_entry.effects = e;
252            }
253        }
254
255        trace!("getting checkpoint sequence numbers");
256        let checkpoint_seq_list = self
257            .transaction_kv_store
258            .multi_get_transactions_perpetual_checkpoints(&digests)
259            .await
260            .tap_err(
261                |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
262        for ((_digest, cache_entry), seq) in temp_response
263            .iter_mut()
264            .zip(checkpoint_seq_list.into_iter())
265        {
266            cache_entry.checkpoint_seq = seq;
267        }
268
269        let unique_checkpoint_numbers = temp_response
270            .values()
271            .filter_map(|cache_entry| cache_entry.checkpoint_seq)
272            // It's likely that many transactions have the same checkpoint, so we don't
273            // need to over-fetch
274            .unique()
275            .collect::<Vec<CheckpointSequenceNumber>>();
276
277        // fetch timestamp from the DB
278        trace!("getting checkpoint summaries");
279        let timestamps = self
280            .transaction_kv_store
281            .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
282            .await
283            .map_err(|e| {
284                Error::Unexpected(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
285            })?
286            .into_iter()
287            .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
288
289        // construct a hashmap of checkpoint -> timestamp for fast lookup
290        let checkpoint_to_timestamp = unique_checkpoint_numbers
291            .into_iter()
292            .zip(timestamps)
293            .collect::<HashMap<_, _>>();
294
295        // fill cache with the timestamp
296        for (_, cache_entry) in temp_response.iter_mut() {
297            if cache_entry.checkpoint_seq.is_some() {
298                // safe to unwrap because is_some is checked
299                cache_entry.timestamp = *checkpoint_to_timestamp
300                    .get(cache_entry.checkpoint_seq.as_ref().unwrap())
301                    // Safe to unwrap because checkpoint_seq is guaranteed to exist in
302                    // checkpoint_to_timestamp
303                    .unwrap();
304            }
305        }
306
307        if opts.show_events {
308            trace!("getting events");
309            let mut non_empty_digests = vec![];
310            for cache_entry in temp_response.values() {
311                if let Some(effects) = &cache_entry.effects {
312                    if effects.events_digest().is_some() {
313                        non_empty_digests.push(cache_entry.digest);
314                    }
315                }
316            }
317            // fetch events from the DB with retry, retry each 0.5s for 3s
318            let backoff = ExponentialBackoff {
319                max_elapsed_time: Some(Duration::from_secs(3)),
320                multiplier: 1.0,
321                ..ExponentialBackoff::default()
322            };
323            let mut events = retry(backoff, || async {
324                match self
325                    .transaction_kv_store
326                    .multi_get_events_by_tx_digests(&non_empty_digests)
327                    .await
328                {
329                    // Only return Ok when all the queried transaction events are found, otherwise
330                    // retry until timeout, then return Err.
331                    Ok(events) if !events.contains(&None) => Ok(events),
332                    Ok(_) => Err(backoff::Error::transient(Error::Unexpected(
333                        "events not found, transaction execution may be incomplete.".into(),
334                    ))),
335                    Err(e) => Err(backoff::Error::permanent(Error::Unexpected(format!(
336                        "failed to call multi_get_events: {e:?}"
337                    )))),
338                }
339            })
340            .await
341            .map_err(|e| {
342                Error::Unexpected(format!(
343                    "retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
344                ))
345            })?
346            .into_iter();
347
348            // fill cache with the events
349            for (_, cache_entry) in temp_response.iter_mut() {
350                let transaction_digest = cache_entry.digest;
351                if let Some(events_digest) =
352                    cache_entry.effects.as_ref().and_then(|e| e.events_digest())
353                {
354                    match events.next() {
355                        Some(Some(ev)) => {
356                            cache_entry.events =
357                                Some(to_iota_transaction_events(self, cache_entry.digest, ev)?)
358                        }
359                        None | Some(None) => {
360                            error!(
361                                "failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
362                            );
363                            cache_entry.errors.push(format!(
364                                "failed to fetch events with event digest {events_digest:?}",
365                            ))
366                        }
367                    }
368                } else {
369                    // events field will be Some if and only if `show_events` is true and
370                    // there is no error in converting fetching events
371                    cache_entry.events = Some(IotaTransactionBlockEvents::default());
372                }
373            }
374        }
375
376        let object_cache =
377            ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
378        if opts.show_balance_changes {
379            trace!("getting balance changes");
380
381            let mut results = vec![];
382            for resp in temp_response.values() {
383                let input_objects = if let Some(tx) = resp.transaction() {
384                    tx.data()
385                        .inner()
386                        .intent_message
387                        .value
388                        .input_objects()
389                        .unwrap_or_default()
390                } else {
391                    // don't have the input tx, so not much we can do. perhaps this is an Err?
392                    Vec::new()
393                };
394                results.push(get_balance_changes_from_effect(
395                    &object_cache,
396                    resp.effects.as_ref().ok_or_else(|| {
397                        IotaRpcInputError::GenericNotFound(
398                            "unable to derive balance changes because effect is empty".to_string(),
399                        )
400                    })?,
401                    input_objects,
402                    None,
403                ));
404            }
405            let results = join_all(results).await;
406            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
407                match result {
408                    Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
409                    Err(e) => entry
410                        .1
411                        .errors
412                        .push(format!("Failed to fetch balance changes {e:?}")),
413                }
414            }
415        }
416
417        if opts.show_object_changes {
418            trace!("getting object changes");
419
420            let mut results = vec![];
421            for resp in temp_response.values() {
422                let effects = resp.effects.as_ref().ok_or_else(|| {
423                    IotaRpcInputError::GenericNotFound(
424                        "unable to derive object changes because effect is empty".to_string(),
425                    )
426                })?;
427
428                results.push(get_object_changes(
429                    &object_cache,
430                    resp.transaction
431                        .as_ref()
432                        .ok_or_else(|| {
433                            IotaRpcInputError::GenericNotFound(
434                                "unable to derive object changes because transaction is empty"
435                                    .to_string(),
436                            )
437                        })?
438                        .data()
439                        .intent_message()
440                        .value
441                        .sender(),
442                    effects.modified_at_versions(),
443                    effects.all_changed_objects(),
444                    effects.all_removed_objects(),
445                ));
446            }
447            let results = join_all(results).await;
448            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
449                match result {
450                    Ok(object_changes) => entry.1.object_changes = Some(object_changes),
451                    Err(e) => entry
452                        .1
453                        .errors
454                        .push(format!("Failed to fetch object changes {e:?}")),
455                }
456            }
457        }
458
459        let epoch_store = self.state.load_epoch_store_one_call_per_task();
460
461        let converted_tx_block_resps = temp_response
462            .into_iter()
463            .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
464            .collect::<Result<Vec<_>, _>>()?;
465
466        self.metrics
467            .get_tx_blocks_result_size
468            .observe(converted_tx_block_resps.len() as f64);
469        self.metrics
470            .get_tx_blocks_result_size_total
471            .inc_by(converted_tx_block_resps.len() as u64);
472
473        trace!("done");
474
475        Ok(converted_tx_block_resps)
476    }
477}
478
479#[async_trait]
480impl ReadApiServer for ReadApi {
481    #[instrument(skip(self))]
482    async fn get_object(
483        &self,
484        object_id: ObjectID,
485        options: Option<IotaObjectDataOptions>,
486    ) -> RpcResult<IotaObjectResponse> {
487        async move {
488            let state = self.state.clone();
489            let object_read = spawn_monitored_task!(async move {
490                state.get_object_read(&object_id).map_err(|e| {
491                    warn!(?object_id, "Failed to get object: {:?}", e);
492                    Error::from(e)
493                })
494            })
495            .await
496            .map_err(Error::from)??;
497            let options = options.unwrap_or_default();
498
499            match object_read {
500                ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
501                    IotaObjectResponseError::NotExists { object_id: id },
502                )),
503                ObjectRead::Exists(object_ref, o, layout) => {
504                    let mut display_fields = None;
505                    if options.show_display {
506                        match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
507                            .await
508                        {
509                            Ok(rendered_fields) => display_fields = Some(rendered_fields),
510                            Err(e) => {
511                                return Ok(IotaObjectResponse::new(
512                                    Some(IotaObjectData::new(
513                                        object_ref, o, layout, options, None,
514                                    )?),
515                                    Some(IotaObjectResponseError::Display {
516                                        error: e.to_string(),
517                                    }),
518                                ));
519                            }
520                        }
521                    }
522                    Ok(IotaObjectResponse::new_with_data(IotaObjectData::new(
523                        object_ref,
524                        o,
525                        layout,
526                        options,
527                        display_fields,
528                    )?))
529                }
530                ObjectRead::Deleted((object_id, version, digest)) => Ok(
531                    IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
532                        object_id,
533                        version,
534                        digest,
535                    }),
536                ),
537            }
538        }
539        .trace()
540        .await
541    }
542
543    #[instrument(skip(self))]
544    async fn multi_get_objects(
545        &self,
546        object_ids: Vec<ObjectID>,
547        options: Option<IotaObjectDataOptions>,
548    ) -> RpcResult<Vec<IotaObjectResponse>> {
549        async move {
550            if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
551                self.metrics
552                    .get_objects_limit
553                    .observe(object_ids.len() as f64);
554                let mut futures = vec![];
555                for object_id in object_ids {
556                    futures.push(self.get_object(object_id, options.clone()));
557                }
558                let results = join_all(futures).await;
559
560                let objects_result: Result<Vec<IotaObjectResponse>, String> = results
561                    .into_iter()
562                    .map(|result| match result {
563                        Ok(response) => Ok(response),
564                        Err(error) => {
565                            error!("Failed to fetch object with error: {error:?}");
566                            Err(format!("Error: {}", error))
567                        }
568                    })
569                    .collect();
570
571                let objects = objects_result.map_err(|err| {
572                    Error::Unexpected(format!("Failed to fetch objects with error: {}", err))
573                })?;
574
575                self.metrics
576                    .get_objects_result_size
577                    .observe(objects.len() as f64);
578                self.metrics
579                    .get_objects_result_size_total
580                    .inc_by(objects.len() as u64);
581                Ok(objects)
582            } else {
583                Err(IotaRpcInputError::SizeLimitExceeded(
584                    QUERY_MAX_RESULT_LIMIT.to_string(),
585                ))?
586            }
587        }
588        .trace()
589        .await
590    }
591
592    #[instrument(skip(self))]
593    async fn try_get_past_object(
594        &self,
595        object_id: ObjectID,
596        version: SequenceNumber,
597        options: Option<IotaObjectDataOptions>,
598    ) -> RpcResult<IotaPastObjectResponse> {
599        async move {
600            let state = self.state.clone();
601            let past_read = spawn_monitored_task!(async move {
602            state.get_past_object_read(&object_id, version)
603            .map_err(|e| {
604                error!("Failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
605                Error::from(e)
606            })}).await.map_err(Error::from)??;
607            let options = options.unwrap_or_default();
608            match past_read {
609                PastObjectRead::ObjectNotExists(id) => {
610                    Ok(IotaPastObjectResponse::ObjectNotExists(id))
611                }
612                PastObjectRead::VersionFound(object_ref, o, layout) => {
613                    let display_fields = if options.show_display {
614                        // TODO (jian): api breaking change to also modify past objects.
615                        Some(
616                            get_display_fields(self, &self.transaction_kv_store, &o, &layout)
617                                .await
618                                .map_err(|e| {
619                                    Error::Unexpected(format!(
620                                        "Unable to render object at version {version}: {e}"
621                                    ))
622                                })?,
623                        )
624                    } else {
625                        None
626                    };
627                    Ok(IotaPastObjectResponse::VersionFound(
628                        IotaObjectData::new(object_ref, o, layout, options, display_fields)?,
629                    ))
630                }
631                PastObjectRead::ObjectDeleted(oref) => {
632                    Ok(IotaPastObjectResponse::ObjectDeleted(oref.into()))
633                }
634                PastObjectRead::VersionNotFound(id, seq_num) => {
635                    Ok(IotaPastObjectResponse::VersionNotFound(id, seq_num))
636                }
637                PastObjectRead::VersionTooHigh {
638                    object_id,
639                    asked_version,
640                    latest_version,
641                } => Ok(IotaPastObjectResponse::VersionTooHigh {
642                    object_id,
643                    asked_version,
644                    latest_version,
645                }),
646            }
647        }
648        .trace()
649        .await
650    }
651
652    #[instrument(skip(self))]
653    async fn try_get_object_before_version(
654        &self,
655        object_id: ObjectID,
656        version: SequenceNumber,
657    ) -> RpcResult<IotaPastObjectResponse> {
658        let version = self
659            .state
660            .find_object_lt_or_eq_version(&object_id, &version)
661            .await
662            .map_err(Error::from)?
663            .map(|obj| obj.version())
664            .unwrap_or_default();
665        self.try_get_past_object(
666            object_id,
667            version,
668            Some(IotaObjectDataOptions::bcs_lossless()),
669        )
670        .await
671    }
672
673    #[instrument(skip(self))]
674    async fn try_multi_get_past_objects(
675        &self,
676        past_objects: Vec<IotaGetPastObjectRequest>,
677        options: Option<IotaObjectDataOptions>,
678    ) -> RpcResult<Vec<IotaPastObjectResponse>> {
679        async move {
680            if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
681                let mut futures = vec![];
682                for past_object in past_objects {
683                    futures.push(self.try_get_past_object(
684                        past_object.object_id,
685                        past_object.version,
686                        options.clone(),
687                    ));
688                }
689                let results = join_all(futures).await;
690
691                let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
692                let success = oks.into_iter().filter_map(Result::ok).collect();
693                let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
694                if !errors.is_empty() {
695                    let error_string = errors
696                        .iter()
697                        .map(|e| e.to_string())
698                        .collect::<Vec<String>>()
699                        .join("; ");
700                    Err(anyhow!("{error_string}").into()) // Collects errors not
701                // related to
702                // IotaPastObjectResponse
703                // variants
704                } else {
705                    Ok(success)
706                }
707            } else {
708                Err(IotaRpcInputError::SizeLimitExceeded(
709                    QUERY_MAX_RESULT_LIMIT.to_string(),
710                ))?
711            }
712        }
713        .trace()
714        .await
715    }
716
717    #[instrument(skip(self))]
718    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
719        async move {
720            Ok(self
721                .state
722                .get_total_transaction_blocks()
723                .map_err(Error::from)?
724                .into()) // converts into BigInt<u64>
725        }
726        .trace()
727        .await
728    }
729
730    #[instrument(skip(self))]
731    async fn get_transaction_block(
732        &self,
733        digest: TransactionDigest,
734        opts: Option<IotaTransactionBlockResponseOptions>,
735    ) -> RpcResult<IotaTransactionBlockResponse> {
736        async move {
737            let opts = opts.unwrap_or_default();
738            let mut temp_response = IntermediateTransactionResponse::new(digest);
739
740            // Fetch transaction to determine existence
741            let transaction_kv_store = self.transaction_kv_store.clone();
742            let transaction = spawn_monitored_task!(async move {
743                let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
744                    debug!(tx_digest=?digest, "Failed to get transaction: {:?}", err);
745                    Error::from(err)
746                });
747                add_server_timing("tx_kv_lookup");
748                ret
749            })
750            .await
751            .map_err(Error::from)??;
752            let input_objects = transaction
753                .data()
754                .inner()
755                .intent_message
756                .value
757                .input_objects()
758                .unwrap_or_default();
759
760            // the input is needed for object_changes to retrieve the sender address.
761            if opts.require_input() {
762                temp_response.transaction = Some(transaction);
763            }
764
765            // Fetch effects when `show_events` is true because events relies on effects
766            if opts.require_effects() {
767                let transaction_kv_store = self.transaction_kv_store.clone();
768                temp_response.effects = Some(
769                    spawn_monitored_task!(async move {
770                        transaction_kv_store
771                            .get_fx_by_tx_digest(digest)
772                            .await
773                            .map_err(|err| {
774                                debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
775                                Error::from(err)
776                            })
777                    })
778                    .await
779                    .map_err(Error::from)??,
780                );
781            }
782
783            // `AuthorityPerpetualTables::executed_transactions_to_checkpoint`
784            // table and `CheckpointCache` trait exist for the sole purpose
785            // of being able to execute the following call below.
786            // It if gets removed or rewritten then the table and associated
787            // code can be removed as well.
788            temp_response.checkpoint_seq = self
789                .transaction_kv_store
790                .get_transaction_perpetual_checkpoint(digest)
791                .await
792                .map_err(|e| {
793                    error!("Failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
794                    Error::from(e)
795                })?;
796
797            if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
798                let kv_store = self.transaction_kv_store.clone();
799                let checkpoint_seq = *checkpoint_seq;
800                let checkpoint = spawn_monitored_task!(async move {
801                    kv_store
802                    // safe to unwrap because we have checked `is_some` above
803                    .get_checkpoint_summary(checkpoint_seq)
804                    .await
805                    .map_err(|e| {
806                        error!("Failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
807                        Error::from(e)
808                    })
809                }).await.map_err(Error::from)??;
810                // TODO(chris): we don't need to fetch the whole checkpoint summary
811                temp_response.timestamp = Some(checkpoint.timestamp_ms);
812            }
813
814            if opts.show_events && temp_response.effects.is_some() {
815                let transaction_kv_store = self.transaction_kv_store.clone();
816                let events = spawn_monitored_task!(async move {
817                    transaction_kv_store
818                        .multi_get_events_by_tx_digests(&[digest])
819                        .await
820                        .map_err(|e| {
821                            error!("failed to call get transaction events for transaction: {digest:?} with error {e:?}");
822                            Error::from(e)
823                        })
824                    })
825                    .await
826                    .map_err(Error::from)??
827                    .pop()
828                    .flatten();
829                match events {
830                    None => temp_response.events = Some(IotaTransactionBlockEvents::default()),
831                    Some(events) => match to_iota_transaction_events(self, digest, events) {
832                        Ok(e) => temp_response.events = Some(e),
833                        Err(e) => temp_response.errors.push(e.to_string()),
834                    },
835                }
836            }
837
838            let object_cache =
839                ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
840            if opts.show_balance_changes {
841                if let Some(effects) = &temp_response.effects {
842                    let balance_changes = get_balance_changes_from_effect(
843                        &object_cache,
844                        effects,
845                        input_objects,
846                        None,
847                    )
848                    .await;
849
850                    if let Ok(balance_changes) = balance_changes {
851                        temp_response.balance_changes = Some(balance_changes);
852                    } else {
853                        temp_response.errors.push(format!(
854                            "Cannot retrieve balance changes: {}",
855                            balance_changes.unwrap_err()
856                        ));
857                    }
858                }
859            }
860
861            if opts.show_object_changes {
862                if let (Some(effects), Some(input)) =
863                    (&temp_response.effects, &temp_response.transaction)
864                {
865                    let sender = input.data().intent_message().value.sender();
866                    let object_changes = get_object_changes(
867                        &object_cache,
868                        sender,
869                        effects.modified_at_versions(),
870                        effects.all_changed_objects(),
871                        effects.all_removed_objects(),
872                    )
873                    .await;
874
875                    if let Ok(object_changes) = object_changes {
876                        temp_response.object_changes = Some(object_changes);
877                    } else {
878                        temp_response.errors.push(format!(
879                            "Cannot retrieve object changes: {}",
880                            object_changes.unwrap_err()
881                        ));
882                    }
883                }
884            }
885            let epoch_store = self.state.load_epoch_store_one_call_per_task();
886
887            convert_to_response(temp_response, &opts, epoch_store.module_cache())
888        }
889        .trace()
890        .await
891    }
892
893    #[instrument(skip(self))]
894    async fn multi_get_transaction_blocks(
895        &self,
896        digests: Vec<TransactionDigest>,
897        opts: Option<IotaTransactionBlockResponseOptions>,
898    ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
899        async move {
900            let cloned_self = self.clone();
901            spawn_monitored_task!(async move {
902                cloned_self
903                    .multi_get_transaction_blocks_internal(digests, opts)
904                    .await
905            })
906            .await
907            .map_err(Error::from)?
908        }
909        .trace()
910        .await
911    }
912
913    #[instrument(skip(self))]
914    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
915        async move {
916            let state = self.state.clone();
917            let transaction_kv_store = self.transaction_kv_store.clone();
918            spawn_monitored_task!(async move{
919                let store = state.load_epoch_store_one_call_per_task();
920                let events = transaction_kv_store
921                    .multi_get_events_by_tx_digests(&[transaction_digest])
922                    .await
923                    .map_err(
924                        |e| {
925                            error!("failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
926                            Error::StateRead(e.into())
927                        })?
928                    .pop()
929                    .flatten();
930                Ok(match events {
931                    Some(events) => events
932                        .data
933                        .into_iter()
934                        .enumerate()
935                        .map(|(seq, e)| {
936                            let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
937                            IotaEvent::try_from(e, transaction_digest, seq as u64, None, layout)
938                        })
939                        .collect::<Result<Vec<_>, _>>()
940                        .map_err(Error::Iota)?,
941                    None => vec![],
942                })
943            })
944            .await
945            .map_err(Error::from)?
946        }
947        .trace()
948        .await
949    }
950
951    #[instrument(skip(self))]
952    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
953        async move {
954            Ok(self
955                .state
956                .get_latest_checkpoint_sequence_number()
957                .map_err(|e| {
958                    IotaRpcInputError::GenericNotFound(format!(
959                        "Latest checkpoint sequence number was not found with error :{e}"
960                    ))
961                })?
962                .into())
963        }
964        .trace()
965        .await
966    }
967
968    #[instrument(skip(self))]
969    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
970        self.get_checkpoint_internal(id).trace().await
971    }
972
973    #[instrument(skip(self))]
974    async fn get_checkpoints(
975        &self,
976        // If `Some`, the query will start from the next item after the specified cursor
977        cursor: Option<BigInt<u64>>,
978        limit: Option<usize>,
979        descending_order: bool,
980    ) -> RpcResult<CheckpointPage> {
981        async move {
982            let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
983                .map_err(IotaRpcInputError::from)?;
984
985            let state = self.state.clone();
986            let kv_store = self.transaction_kv_store.clone();
987
988            self.metrics.get_checkpoints_limit.observe(limit as f64);
989
990            let mut data = spawn_monitored_task!(Self::get_checkpoints_internal(
991                state,
992                kv_store,
993                cursor.map(|s| *s),
994                limit as u64 + 1,
995                descending_order,
996            ))
997            .await
998            .map_err(Error::from)?
999            .map_err(Error::from)?;
1000
1001            let has_next_page = data.len() > limit;
1002            data.truncate(limit);
1003
1004            let next_cursor = if has_next_page {
1005                data.last().cloned().map(|d| d.sequence_number.into())
1006            } else {
1007                None
1008            };
1009
1010            self.metrics
1011                .get_checkpoints_result_size
1012                .observe(data.len() as f64);
1013            self.metrics
1014                .get_checkpoints_result_size_total
1015                .inc_by(data.len() as u64);
1016
1017            Ok(CheckpointPage {
1018                data,
1019                next_cursor,
1020                has_next_page,
1021            })
1022        }
1023        .trace()
1024        .await
1025    }
1026
1027    #[instrument(skip(self))]
1028    async fn get_protocol_config(
1029        &self,
1030        version: Option<BigInt<u64>>,
1031    ) -> RpcResult<ProtocolConfigResponse> {
1032        async move {
1033            version
1034                .map(|v| {
1035                    ProtocolConfig::get_for_version_if_supported(
1036                        (*v).into(),
1037                        self.state.get_chain_identifier()?.chain(),
1038                    )
1039                    .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
1040                        ProtocolVersion::MIN.as_u64(),
1041                        ProtocolVersion::MAX.as_u64(),
1042                    ))
1043                    .map_err(Error::from)
1044                })
1045                .unwrap_or(Ok(self
1046                    .state
1047                    .load_epoch_store_one_call_per_task()
1048                    .protocol_config()
1049                    .clone()))
1050                .map(ProtocolConfigResponse::from)
1051        }
1052        .trace()
1053        .await
1054    }
1055
1056    #[instrument(skip(self))]
1057    async fn get_chain_identifier(&self) -> RpcResult<String> {
1058        async move {
1059            let ci = self.state.get_chain_identifier()?;
1060            Ok(ci.to_string())
1061        }
1062        .trace()
1063        .await
1064    }
1065}
1066
1067impl IotaRpcModule for ReadApi {
1068    fn rpc(self) -> RpcModule<Self> {
1069        self.into_rpc()
1070    }
1071
1072    fn rpc_doc_module() -> Module {
1073        ReadApiOpenRpc::module_doc()
1074    }
1075}
1076
1077fn to_iota_transaction_events(
1078    fullnode_api: &ReadApi,
1079    tx_digest: TransactionDigest,
1080    events: TransactionEvents,
1081) -> Result<IotaTransactionBlockEvents, Error> {
1082    let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1083    let backing_package_store = fullnode_api.state.get_backing_package_store();
1084    let mut layout_resolver = epoch_store
1085        .executor()
1086        .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1087    Ok(IotaTransactionBlockEvents::try_from(
1088        events,
1089        tx_digest,
1090        None,
1091        layout_resolver.as_mut(),
1092    )?)
1093}
1094
1095#[derive(Debug, thiserror::Error)]
1096pub enum ObjectDisplayError {
1097    #[error("Not a move struct")]
1098    NotMoveStruct,
1099
1100    #[error("Failed to extract layout")]
1101    Layout,
1102
1103    #[error("Failed to extract Move object")]
1104    MoveObject,
1105
1106    #[error(transparent)]
1107    Deserialization(#[from] IotaError),
1108
1109    #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1110    Bcs(#[from] bcs::Error),
1111
1112    #[error(transparent)]
1113    StateRead(#[from] StateReadError),
1114}
1115
1116async fn get_display_fields(
1117    fullnode_api: &ReadApi,
1118    kv_store: &Arc<TransactionKeyValueStore>,
1119    original_object: &Object,
1120    original_layout: &Option<MoveStructLayout>,
1121) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1122    let Some((object_type, layout)) = get_object_type_and_struct(original_object, original_layout)?
1123    else {
1124        return Ok(DisplayFieldsResponse {
1125            data: None,
1126            error: None,
1127        });
1128    };
1129    if let Some(display_object) =
1130        get_display_object_by_type(kv_store, fullnode_api, &object_type).await?
1131    {
1132        return get_rendered_fields(display_object.fields, &layout);
1133    }
1134    Ok(DisplayFieldsResponse {
1135        data: None,
1136        error: None,
1137    })
1138}
1139
1140async fn get_display_object_by_type(
1141    kv_store: &Arc<TransactionKeyValueStore>,
1142    fullnode_api: &ReadApi,
1143    object_type: &StructTag,
1144    // TODO: add query version support
1145) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1146    let mut events = fullnode_api
1147        .state
1148        .query_events(
1149            kv_store,
1150            EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1151            None,
1152            1,
1153            true,
1154        )
1155        .await?;
1156
1157    // If there's any recent version of Display, give it to the client.
1158    // TODO: add support for version query.
1159    if let Some(event) = events.pop() {
1160        let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1161        Ok(Some(display))
1162    } else {
1163        Ok(None)
1164    }
1165}
1166
1167pub fn get_object_type_and_struct(
1168    o: &Object,
1169    layout: &Option<MoveStructLayout>,
1170) -> Result<Option<(StructTag, MoveStruct)>, ObjectDisplayError> {
1171    if let Some(object_type) = o.type_() {
1172        let move_struct = get_move_struct(o, layout)?;
1173        Ok(Some((object_type.clone().into(), move_struct)))
1174    } else {
1175        Ok(None)
1176    }
1177}
1178
1179fn get_move_struct(
1180    o: &Object,
1181    layout: &Option<MoveStructLayout>,
1182) -> Result<MoveStruct, ObjectDisplayError> {
1183    let layout = layout.as_ref().ok_or_else(|| ObjectDisplayError::Layout)?;
1184    Ok(o.data
1185        .try_as_move()
1186        .ok_or_else(|| ObjectDisplayError::MoveObject)?
1187        .to_move_struct(layout)?)
1188}
1189
1190pub fn get_rendered_fields(
1191    fields: VecMap<String, String>,
1192    move_struct: &MoveStruct,
1193) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1194    let iota_move_value: IotaMoveValue = MoveValue::Struct(move_struct.clone()).into();
1195    if let IotaMoveValue::Struct(move_struct) = iota_move_value {
1196        let fields =
1197            fields
1198                .contents
1199                .iter()
1200                .map(|entry| match parse_template(&entry.value, &move_struct) {
1201                    Ok(value) => Ok((entry.key.clone(), value)),
1202                    Err(e) => Err(e),
1203                });
1204        let (oks, errs): (Vec<_>, Vec<_>) = fields.partition(Result::is_ok);
1205        let success = oks.into_iter().filter_map(Result::ok).collect();
1206        let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
1207        let error_string = errors
1208            .iter()
1209            .map(|e| e.to_string())
1210            .collect::<Vec<String>>()
1211            .join("; ");
1212        let error = if !error_string.is_empty() {
1213            Some(IotaObjectResponseError::Display {
1214                error: anyhow!("{error_string}").to_string(),
1215            })
1216        } else {
1217            None
1218        };
1219
1220        return Ok(DisplayFieldsResponse {
1221            data: Some(success),
1222            error,
1223        });
1224    }
1225    Err(ObjectDisplayError::NotMoveStruct)?
1226}
1227
1228fn parse_template(template: &str, move_struct: &IotaMoveStruct) -> Result<String, Error> {
1229    let mut output = template.to_string();
1230    let mut var_name = String::new();
1231    let mut in_braces = false;
1232    let mut escaped = false;
1233
1234    for ch in template.chars() {
1235        match ch {
1236            '\\' => {
1237                escaped = true;
1238                continue;
1239            }
1240            '{' if !escaped => {
1241                in_braces = true;
1242                var_name.clear();
1243            }
1244            '}' if !escaped => {
1245                in_braces = false;
1246                let value = get_value_from_move_struct(move_struct, &var_name)?;
1247                output = output.replace(&format!("{{{}}}", var_name), &value.to_string());
1248            }
1249            _ if !escaped => {
1250                if in_braces {
1251                    var_name.push(ch);
1252                }
1253            }
1254            _ => {}
1255        }
1256        escaped = false;
1257    }
1258
1259    Ok(output.replace('\\', ""))
1260}
1261
1262fn get_value_from_move_struct(
1263    move_struct: &IotaMoveStruct,
1264    var_name: &str,
1265) -> Result<String, Error> {
1266    let parts: Vec<&str> = var_name.split('.').collect();
1267    if parts.is_empty() {
1268        Err(anyhow!("Display template value cannot be empty"))?;
1269    }
1270    if parts.len() > MAX_DISPLAY_NESTED_LEVEL {
1271        Err(anyhow!(
1272            "Display template value nested depth cannot exist {}",
1273            MAX_DISPLAY_NESTED_LEVEL
1274        ))?;
1275    }
1276    let mut current_value = &IotaMoveValue::Struct(move_struct.clone());
1277    // iterate over the parts and try to access the corresponding field
1278    for part in parts {
1279        match current_value {
1280            IotaMoveValue::Struct(move_struct) => {
1281                if let IotaMoveStruct::WithTypes { type_: _, fields }
1282                | IotaMoveStruct::WithFields(fields) = move_struct
1283                {
1284                    if let Some(value) = fields.get(part) {
1285                        current_value = value;
1286                    } else {
1287                        Err(anyhow!("Field value {var_name} cannot be found in struct"))?;
1288                    }
1289                } else {
1290                    Err(Error::Unexpected(format!(
1291                        "Unexpected move struct type for field {var_name}"
1292                    )))?;
1293                }
1294            }
1295            IotaMoveValue::Variant(IotaMoveVariant {
1296                fields, variant, ..
1297            }) => {
1298                if let Some(value) = fields.get(part) {
1299                    current_value = value;
1300                } else {
1301                    Err(anyhow!(
1302                        "Field value {var_name} cannot be found in variant {variant}",
1303                    ))?
1304                }
1305            }
1306            _ => {
1307                Err(Error::Unexpected(format!(
1308                    "Unexpected move value type for field {var_name}"
1309                )))?;
1310            }
1311        }
1312    }
1313
1314    match current_value {
1315        IotaMoveValue::Option(move_option) => match move_option.as_ref() {
1316            Some(move_value) => Ok(move_value.to_string()),
1317            None => Ok("".to_string()),
1318        },
1319        IotaMoveValue::Vector(_) => Err(anyhow!(
1320            "Vector is not supported as a Display value {var_name}"
1321        ))?,
1322
1323        _ => Ok(current_value.to_string()),
1324    }
1325}
1326
1327fn convert_to_response(
1328    cache: IntermediateTransactionResponse,
1329    opts: &IotaTransactionBlockResponseOptions,
1330    module_cache: &impl GetModule,
1331) -> RpcInterimResult<IotaTransactionBlockResponse> {
1332    let mut response = IotaTransactionBlockResponse::new(cache.digest);
1333    response.errors = cache.errors;
1334
1335    if opts.show_raw_input && cache.transaction.is_some() {
1336        let sender_signed_data = cache.transaction.as_ref().unwrap().data();
1337        let raw_tx = bcs::to_bytes(sender_signed_data)
1338            .map_err(|e| anyhow!("Failed to serialize raw transaction with error: {e}"))?; // TODO: is this a client or server error?
1339        response.raw_transaction = raw_tx;
1340    }
1341
1342    if opts.show_input && cache.transaction.is_some() {
1343        let tx_block = IotaTransactionBlock::try_from(
1344            cache.transaction.unwrap().into_data(),
1345            module_cache,
1346            cache.digest,
1347        )?;
1348        response.transaction = Some(tx_block);
1349    }
1350
1351    if opts.show_raw_effects {
1352        let raw_effects = cache
1353            .effects
1354            .as_ref()
1355            .map(bcs::to_bytes)
1356            .transpose()
1357            .map_err(|e| anyhow!("Failed to serialize raw effects with error: {e}"))?
1358            .unwrap_or_default();
1359        response.raw_effects = raw_effects;
1360    }
1361
1362    if opts.show_effects && cache.effects.is_some() {
1363        let effects = cache.effects.unwrap().try_into().map_err(|e| {
1364            anyhow!(
1365                // TODO: is this a client or server error?
1366                "Failed to convert transaction block effects with error: {e}"
1367            )
1368        })?;
1369        response.effects = Some(effects);
1370    }
1371
1372    response.checkpoint = cache.checkpoint_seq;
1373    response.timestamp_ms = cache.timestamp;
1374
1375    if opts.show_events {
1376        response.events = cache.events;
1377    }
1378
1379    if opts.show_balance_changes {
1380        response.balance_changes = cache.balance_changes;
1381    }
1382
1383    if opts.show_object_changes {
1384        response.object_changes = cache.object_changes;
1385    }
1386
1387    Ok(response)
1388}
1389
1390fn calculate_checkpoint_numbers(
1391    // If `Some`, the query will start from the next item after the specified cursor
1392    cursor: Option<CheckpointSequenceNumber>,
1393    limit: u64,
1394    descending_order: bool,
1395    max_checkpoint: CheckpointSequenceNumber,
1396) -> Vec<CheckpointSequenceNumber> {
1397    let (start_index, end_index) = match cursor {
1398        Some(t) => {
1399            if descending_order {
1400                let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1401                let end = start.saturating_sub(limit - 1);
1402                (end, start)
1403            } else {
1404                let start =
1405                    std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1406                let end = std::cmp::min(
1407                    start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1408                    max_checkpoint,
1409                );
1410                (start, end)
1411            }
1412        }
1413        None => {
1414            if descending_order {
1415                (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1416            } else {
1417                (0, std::cmp::min(limit - 1, max_checkpoint))
1418            }
1419        }
1420    };
1421
1422    if descending_order {
1423        (start_index..=end_index).rev().collect()
1424    } else {
1425        (start_index..=end_index).collect()
1426    }
1427}
1428
1429#[cfg(test)]
1430mod tests {
1431    use super::*;
1432
1433    #[test]
1434    fn test_calculate_checkpoint_numbers() {
1435        let cursor = Some(10);
1436        let limit = 5;
1437        let descending_order = true;
1438        let max_checkpoint = 15;
1439
1440        let checkpoint_numbers =
1441            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1442
1443        assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1444    }
1445
1446    #[test]
1447    fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1448        let cursor = None;
1449        let limit = 5;
1450        let descending_order = true;
1451        let max_checkpoint = 15;
1452
1453        let checkpoint_numbers =
1454            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1455
1456        assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1457    }
1458
1459    #[test]
1460    fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1461        let cursor = None;
1462        let limit = 5;
1463        let descending_order = false;
1464        let max_checkpoint = 15;
1465
1466        let checkpoint_numbers =
1467            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1468
1469        assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1470    }
1471
1472    #[test]
1473    fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1474        let cursor = Some(10);
1475        let limit = 5;
1476        let descending_order = false;
1477        let max_checkpoint = 15;
1478
1479        let checkpoint_numbers =
1480            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1481
1482        assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1483    }
1484
1485    #[test]
1486    fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1487        let cursor = None;
1488        let limit = 20;
1489        let descending_order = false;
1490        let max_checkpoint = 15;
1491
1492        let checkpoint_numbers =
1493            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1494
1495        assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1496    }
1497
1498    #[test]
1499    fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1500        let cursor = None;
1501        let limit = 20;
1502        let descending_order = true;
1503        let max_checkpoint = 15;
1504
1505        let checkpoint_numbers =
1506            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1507
1508        assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1509    }
1510}