iota_json_rpc/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc, time::Duration};
6
7use anyhow::anyhow;
8use async_trait::async_trait;
9use backoff::{ExponentialBackoff, future::retry};
10use futures::future::join_all;
11use indexmap::map::IndexMap;
12use iota_core::authority::AuthorityState;
13use iota_json_rpc_api::{
14    JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS, ReadApiOpenRpc,
15    ReadApiServer, validate_limit,
16};
17use iota_json_rpc_types::{
18    BalanceChange, Checkpoint, CheckpointId, CheckpointPage, DisplayFieldsResponse, EventFilter,
19    IotaEvent, IotaGetPastObjectRequest, IotaMoveStruct, IotaMoveValue, IotaMoveVariant,
20    IotaObjectData, IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
21    IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
22    IotaTransactionBlockResponseOptions, ObjectChange, ProtocolConfigResponse,
23};
24use iota_metrics::{add_server_timing, spawn_monitored_task};
25use iota_open_rpc::Module;
26use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
27use iota_storage::key_value_store::TransactionKeyValueStore;
28use iota_types::{
29    base_types::{ObjectID, SequenceNumber, TransactionDigest},
30    collection_types::VecMap,
31    crypto::AggregateAuthoritySignature,
32    display::DisplayVersionUpdatedEvent,
33    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
34    error::{IotaError, IotaObjectResponseError},
35    iota_serde::BigInt,
36    messages_checkpoint::{
37        CheckpointContents, CheckpointSequenceNumber, CheckpointSummary, CheckpointTimestamp,
38    },
39    object::{Object, ObjectRead, PastObjectRead},
40    transaction::{Transaction, TransactionDataAPI},
41};
42use itertools::Itertools;
43use jsonrpsee::{RpcModule, core::RpcResult};
44use move_bytecode_utils::module_cache::GetModule;
45use move_core_types::{
46    annotated_value::{MoveStruct, MoveStructLayout, MoveValue},
47    language_storage::StructTag,
48};
49use tap::TapFallible;
50use tracing::{debug, error, instrument, trace, warn};
51
52use crate::{
53    IotaRpcModule, ObjectProvider, ObjectProviderCache,
54    authority_state::{StateRead, StateReadError, StateReadResult},
55    error::{Error, IotaRpcInputError, RpcInterimResult},
56    get_balance_changes_from_effect, get_object_changes,
57    logger::FutureWithTracing as _,
58};
59
60const MAX_DISPLAY_NESTED_LEVEL: usize = 10;
61
62// An implementation of the read portion of the JSON-RPC interface intended for
63// use in Fullnodes.
64#[derive(Clone)]
65pub struct ReadApi {
66    pub state: Arc<dyn StateRead>,
67    pub transaction_kv_store: Arc<TransactionKeyValueStore>,
68    pub metrics: Arc<JsonRpcMetrics>,
69}
70
71// Internal data structure to make it easy to work with data returned from
72// authority store and also enable code sharing between
73// get_transaction_with_options, multi_get_transaction_with_options, etc.
74#[derive(Default)]
75struct IntermediateTransactionResponse {
76    digest: TransactionDigest,
77    transaction: Option<Transaction>,
78    effects: Option<TransactionEffects>,
79    events: Option<IotaTransactionBlockEvents>,
80    checkpoint_seq: Option<CheckpointSequenceNumber>,
81    balance_changes: Option<Vec<BalanceChange>>,
82    object_changes: Option<Vec<ObjectChange>>,
83    timestamp: Option<CheckpointTimestamp>,
84    errors: Vec<String>,
85}
86
87impl IntermediateTransactionResponse {
88    pub fn new(digest: TransactionDigest) -> Self {
89        Self {
90            digest,
91            ..Default::default()
92        }
93    }
94
95    pub fn transaction(&self) -> &Option<Transaction> {
96        &self.transaction
97    }
98}
99
100impl ReadApi {
101    pub fn new(
102        state: Arc<AuthorityState>,
103        transaction_kv_store: Arc<TransactionKeyValueStore>,
104        metrics: Arc<JsonRpcMetrics>,
105    ) -> Self {
106        Self {
107            state,
108            transaction_kv_store,
109            metrics,
110        }
111    }
112
113    async fn get_checkpoint_internal(&self, id: CheckpointId) -> Result<Checkpoint, Error> {
114        Ok(match id {
115            CheckpointId::SequenceNumber(seq) => {
116                let verified_summary = self
117                    .transaction_kv_store
118                    .get_checkpoint_summary(seq)
119                    .await?;
120                let content = self
121                    .transaction_kv_store
122                    .get_checkpoint_contents(verified_summary.sequence_number)
123                    .await?;
124                let signature = verified_summary.auth_sig().signature.clone();
125                (verified_summary.into_data(), content, signature).into()
126            }
127            CheckpointId::Digest(digest) => {
128                let verified_summary = self
129                    .transaction_kv_store
130                    .get_checkpoint_summary_by_digest(digest)
131                    .await?;
132                let content = self
133                    .transaction_kv_store
134                    .get_checkpoint_contents(verified_summary.sequence_number)
135                    .await?;
136                let signature = verified_summary.auth_sig().signature.clone();
137                (verified_summary.into_data(), content, signature).into()
138            }
139        })
140    }
141
142    pub async fn get_checkpoints_internal(
143        state: Arc<dyn StateRead>,
144        transaction_kv_store: Arc<TransactionKeyValueStore>,
145        // If `Some`, the query will start from the next item after the specified cursor
146        cursor: Option<CheckpointSequenceNumber>,
147        limit: u64,
148        descending_order: bool,
149    ) -> StateReadResult<Vec<Checkpoint>> {
150        let max_checkpoint = state.get_latest_checkpoint_sequence_number()?;
151        let checkpoint_numbers =
152            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
153
154        let verified_checkpoints = transaction_kv_store
155            .multi_get_checkpoints_summaries(&checkpoint_numbers)
156            .await?;
157
158        let checkpoint_summaries_and_signatures: Vec<(
159            CheckpointSummary,
160            AggregateAuthoritySignature,
161        )> = verified_checkpoints
162            .into_iter()
163            .flatten()
164            .map(|check| {
165                (
166                    check.clone().into_summary_and_sequence().1,
167                    check.get_validator_signature(),
168                )
169            })
170            .collect();
171
172        let checkpoint_contents = transaction_kv_store
173            .multi_get_checkpoints_contents(&checkpoint_numbers)
174            .await?;
175        let contents: Vec<CheckpointContents> = checkpoint_contents.into_iter().flatten().collect();
176
177        let mut checkpoints: Vec<Checkpoint> = vec![];
178
179        for (summary_and_sig, content) in checkpoint_summaries_and_signatures
180            .into_iter()
181            .zip(contents.into_iter())
182        {
183            checkpoints.push(Checkpoint::from((
184                summary_and_sig.0,
185                content,
186                summary_and_sig.1,
187            )));
188        }
189
190        Ok(checkpoints)
191    }
192
193    async fn multi_get_transaction_blocks_internal(
194        &self,
195        digests: Vec<TransactionDigest>,
196        opts: Option<IotaTransactionBlockResponseOptions>,
197    ) -> Result<Vec<IotaTransactionBlockResponse>, Error> {
198        trace!("start");
199
200        let num_digests = digests.len();
201        if num_digests > *QUERY_MAX_RESULT_LIMIT {
202            Err(IotaRpcInputError::SizeLimitExceeded(
203                QUERY_MAX_RESULT_LIMIT.to_string(),
204            ))?
205        }
206        self.metrics
207            .get_tx_blocks_limit
208            .observe(digests.len() as f64);
209
210        let opts = opts.unwrap_or_default();
211
212        // use LinkedHashMap to dedup and can iterate in insertion order.
213        let mut temp_response: IndexMap<&TransactionDigest, IntermediateTransactionResponse> =
214            IndexMap::from_iter(
215                digests
216                    .iter()
217                    .map(|k| (k, IntermediateTransactionResponse::new(*k))),
218            );
219        if temp_response.len() < num_digests {
220            Err(IotaRpcInputError::ContainsDuplicates)?
221        }
222
223        if opts.require_input() {
224            trace!("getting input");
225            let digests_clone = digests.clone();
226            let transactions =
227                self.transaction_kv_store.multi_get_tx(&digests_clone).await.tap_err(
228                    |err| debug!(digests=?digests_clone, "Failed to multi get transactions: {:?}", err),
229                )?;
230
231            for ((_digest, cache_entry), txn) in
232                temp_response.iter_mut().zip(transactions.into_iter())
233            {
234                cache_entry.transaction = txn;
235            }
236        }
237
238        // Fetch effects when `show_events` is true because events relies on effects
239        if opts.require_effects() {
240            trace!("getting effects");
241            let digests_clone = digests.clone();
242            let effects_list = self.transaction_kv_store
243                .multi_get_fx_by_tx_digest(&digests_clone)
244                .await
245                .tap_err(
246                    |err| debug!(digests=?digests_clone, "Failed to multi get effects for transactions: {:?}", err),
247                )?;
248            for ((_digest, cache_entry), e) in
249                temp_response.iter_mut().zip(effects_list.into_iter())
250            {
251                cache_entry.effects = e;
252            }
253        }
254
255        trace!("getting checkpoint sequence numbers");
256        let checkpoint_seq_list = self
257            .transaction_kv_store
258            .multi_get_transactions_perpetual_checkpoints(&digests)
259            .await
260            .tap_err(
261                |err| debug!(digests=?digests, "Failed to multi get checkpoint sequence number: {:?}", err))?;
262        for ((_digest, cache_entry), seq) in temp_response
263            .iter_mut()
264            .zip(checkpoint_seq_list.into_iter())
265        {
266            cache_entry.checkpoint_seq = seq;
267        }
268
269        let unique_checkpoint_numbers = temp_response
270            .values()
271            .filter_map(|cache_entry| cache_entry.checkpoint_seq)
272            // It's likely that many transactions have the same checkpoint, so we don't
273            // need to over-fetch
274            .unique()
275            .collect::<Vec<CheckpointSequenceNumber>>();
276
277        // fetch timestamp from the DB
278        trace!("getting checkpoint summaries");
279        let timestamps = self
280            .transaction_kv_store
281            .multi_get_checkpoints_summaries(&unique_checkpoint_numbers)
282            .await
283            .map_err(|e| {
284                Error::Unexpected(format!("Failed to fetch checkpoint summaries by these checkpoint ids: {unique_checkpoint_numbers:?} with error: {e:?}"))
285            })?
286            .into_iter()
287            .map(|c| c.map(|checkpoint| checkpoint.timestamp_ms));
288
289        // construct a hashmap of checkpoint -> timestamp for fast lookup
290        let checkpoint_to_timestamp = unique_checkpoint_numbers
291            .into_iter()
292            .zip(timestamps)
293            .collect::<HashMap<_, _>>();
294
295        // fill cache with the timestamp
296        for (_, cache_entry) in temp_response.iter_mut() {
297            if cache_entry.checkpoint_seq.is_some() {
298                // safe to unwrap because is_some is checked
299                cache_entry.timestamp = *checkpoint_to_timestamp
300                    .get(cache_entry.checkpoint_seq.as_ref().unwrap())
301                    // Safe to unwrap because checkpoint_seq is guaranteed to exist in
302                    // checkpoint_to_timestamp
303                    .unwrap();
304            }
305        }
306
307        if opts.show_events {
308            trace!("getting events");
309            let mut non_empty_digests = vec![];
310            for cache_entry in temp_response.values() {
311                if let Some(effects) = &cache_entry.effects {
312                    if effects.events_digest().is_some() {
313                        non_empty_digests.push(cache_entry.digest);
314                    }
315                }
316            }
317            // fetch events from the DB with retry, retry each 0.5s for 3s
318            let backoff = ExponentialBackoff {
319                max_elapsed_time: Some(Duration::from_secs(3)),
320                multiplier: 1.0,
321                ..ExponentialBackoff::default()
322            };
323            let mut events = retry(backoff, || async {
324                match self
325                    .transaction_kv_store
326                    .multi_get_events_by_tx_digests(&non_empty_digests)
327                    .await
328                {
329                    // Only return Ok when all the queried transaction events are found, otherwise
330                    // retry until timeout, then return Err.
331                    Ok(events) if !events.contains(&None) => Ok(events),
332                    Ok(_) => Err(backoff::Error::transient(Error::Unexpected(
333                        "events not found, transaction execution may be incomplete.".into(),
334                    ))),
335                    Err(e) => Err(backoff::Error::permanent(Error::Unexpected(format!(
336                        "failed to call multi_get_events: {e:?}"
337                    )))),
338                }
339            })
340            .await
341            .map_err(|e| {
342                Error::Unexpected(format!(
343                    "retrieving events with retry failed for transaction digests {digests:?}: {e:?}"
344                ))
345            })?
346            .into_iter();
347
348            // fill cache with the events
349            for (_, cache_entry) in temp_response.iter_mut() {
350                let transaction_digest = cache_entry.digest;
351                if let Some(events_digest) =
352                    cache_entry.effects.as_ref().and_then(|e| e.events_digest())
353                {
354                    match events.next() {
355                        Some(Some(ev)) => {
356                            cache_entry.events =
357                                Some(to_iota_transaction_events(self, cache_entry.digest, ev)?)
358                        }
359                        None | Some(None) => {
360                            error!(
361                                "failed to fetch events with event digest {events_digest:?} for txn {transaction_digest}"
362                            );
363                            cache_entry.errors.push(format!(
364                                "failed to fetch events with event digest {events_digest:?}",
365                            ))
366                        }
367                    }
368                } else {
369                    // events field will be Some if and only if `show_events` is true and
370                    // there is no error in converting fetching events
371                    cache_entry.events = Some(IotaTransactionBlockEvents::default());
372                }
373            }
374        }
375
376        let object_cache =
377            ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
378        if opts.show_balance_changes {
379            trace!("getting balance changes");
380
381            let mut results = vec![];
382            for resp in temp_response.values() {
383                let input_objects = if let Some(tx) = resp.transaction() {
384                    tx.data()
385                        .inner()
386                        .intent_message
387                        .value
388                        .input_objects()
389                        .unwrap_or_default()
390                } else {
391                    // don't have the input tx, so not much we can do. perhaps this is an Err?
392                    Vec::new()
393                };
394                results.push(get_balance_changes_from_effect(
395                    &object_cache,
396                    resp.effects.as_ref().ok_or_else(|| {
397                        IotaRpcInputError::GenericNotFound(
398                            "unable to derive balance changes because effect is empty".to_string(),
399                        )
400                    })?,
401                    input_objects,
402                    None,
403                ));
404            }
405            let results = join_all(results).await;
406            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
407                match result {
408                    Ok(balance_changes) => entry.1.balance_changes = Some(balance_changes),
409                    Err(e) => entry
410                        .1
411                        .errors
412                        .push(format!("Failed to fetch balance changes {e:?}")),
413                }
414            }
415        }
416
417        if opts.show_object_changes {
418            trace!("getting object changes");
419
420            let mut results = vec![];
421            for resp in temp_response.values() {
422                let effects = resp.effects.as_ref().ok_or_else(|| {
423                    IotaRpcInputError::GenericNotFound(
424                        "unable to derive object changes because effect is empty".to_string(),
425                    )
426                })?;
427
428                results.push(get_object_changes(
429                    &object_cache,
430                    resp.transaction
431                        .as_ref()
432                        .ok_or_else(|| {
433                            IotaRpcInputError::GenericNotFound(
434                                "unable to derive object changes because transaction is empty"
435                                    .to_string(),
436                            )
437                        })?
438                        .data()
439                        .intent_message()
440                        .value
441                        .sender(),
442                    effects.modified_at_versions(),
443                    effects.all_changed_objects(),
444                    effects.all_removed_objects(),
445                ));
446            }
447            let results = join_all(results).await;
448            for (result, entry) in results.into_iter().zip(temp_response.iter_mut()) {
449                match result {
450                    Ok(object_changes) => entry.1.object_changes = Some(object_changes),
451                    Err(e) => entry
452                        .1
453                        .errors
454                        .push(format!("Failed to fetch object changes {e:?}")),
455                }
456            }
457        }
458
459        let epoch_store = self.state.load_epoch_store_one_call_per_task();
460
461        let converted_tx_block_resps = temp_response
462            .into_iter()
463            .map(|c| convert_to_response(c.1, &opts, epoch_store.module_cache()))
464            .collect::<Result<Vec<_>, _>>()?;
465
466        self.metrics
467            .get_tx_blocks_result_size
468            .observe(converted_tx_block_resps.len() as f64);
469        self.metrics
470            .get_tx_blocks_result_size_total
471            .inc_by(converted_tx_block_resps.len() as u64);
472
473        trace!("done");
474
475        Ok(converted_tx_block_resps)
476    }
477}
478
479#[async_trait]
480impl ReadApiServer for ReadApi {
481    #[instrument(skip(self))]
482    async fn get_object(
483        &self,
484        object_id: ObjectID,
485        options: Option<IotaObjectDataOptions>,
486    ) -> RpcResult<IotaObjectResponse> {
487        async move {
488            let state = self.state.clone();
489            let object_read = spawn_monitored_task!(async move {
490                state.get_object_read(&object_id).map_err(|e| {
491                    warn!(?object_id, "failed to get object: {:?}", e);
492                    Error::from(e)
493                })
494            })
495            .await
496            .map_err(Error::from)??;
497            let options = options.unwrap_or_default();
498
499            match object_read {
500                ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
501                    IotaObjectResponseError::NotExists { object_id: id },
502                )),
503                ObjectRead::Exists(object_ref, o, layout) => {
504                    let mut display_fields = None;
505                    if options.show_display {
506                        match get_display_fields(self, &self.transaction_kv_store, &o, &layout)
507                            .await
508                        {
509                            Ok(rendered_fields) => display_fields = Some(rendered_fields),
510                            Err(e) => {
511                                return Ok(IotaObjectResponse::new(
512                                    Some(IotaObjectData::new(
513                                        object_ref, o, layout, options, None,
514                                    )?),
515                                    Some(IotaObjectResponseError::Display {
516                                        error: e.to_string(),
517                                    }),
518                                ));
519                            }
520                        }
521                    }
522                    Ok(IotaObjectResponse::new_with_data(IotaObjectData::new(
523                        object_ref,
524                        o,
525                        layout,
526                        options,
527                        display_fields,
528                    )?))
529                }
530                ObjectRead::Deleted((object_id, version, digest)) => Ok(
531                    IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
532                        object_id,
533                        version,
534                        digest,
535                    }),
536                ),
537            }
538        }
539        .trace()
540        .await
541    }
542
543    #[instrument(skip(self))]
544    async fn multi_get_objects(
545        &self,
546        object_ids: Vec<ObjectID>,
547        options: Option<IotaObjectDataOptions>,
548    ) -> RpcResult<Vec<IotaObjectResponse>> {
549        async move {
550            if object_ids.len() <= *QUERY_MAX_RESULT_LIMIT {
551                self.metrics
552                    .get_objects_limit
553                    .observe(object_ids.len() as f64);
554                let mut futures = vec![];
555                for object_id in object_ids {
556                    futures.push(self.get_object(object_id, options.clone()));
557                }
558                let results = join_all(futures).await;
559
560                let objects_result: Result<Vec<IotaObjectResponse>, String> = results
561                    .into_iter()
562                    .map(|result| match result {
563                        Ok(response) => Ok(response),
564                        Err(error) => {
565                            error!("failed to fetch object with error: {error:?}");
566                            Err(format!("Error: {error}"))
567                        }
568                    })
569                    .collect();
570
571                let objects = objects_result.map_err(|err| {
572                    Error::Unexpected(format!("Failed to fetch objects with error: {err}"))
573                })?;
574
575                self.metrics
576                    .get_objects_result_size
577                    .observe(objects.len() as f64);
578                self.metrics
579                    .get_objects_result_size_total
580                    .inc_by(objects.len() as u64);
581                Ok(objects)
582            } else {
583                Err(IotaRpcInputError::SizeLimitExceeded(
584                    QUERY_MAX_RESULT_LIMIT.to_string(),
585                ))?
586            }
587        }
588        .trace()
589        .await
590    }
591
592    #[instrument(skip(self))]
593    async fn try_get_past_object(
594        &self,
595        object_id: ObjectID,
596        version: SequenceNumber,
597        options: Option<IotaObjectDataOptions>,
598    ) -> RpcResult<IotaPastObjectResponse> {
599        async move {
600            let state = self.state.clone();
601            let past_read = spawn_monitored_task!(async move {
602            state.get_past_object_read(&object_id, version)
603            .map_err(|e| {
604                error!("failed to call try_get_past_object for object: {object_id:?} version: {version:?} with error: {e:?}");
605                Error::from(e)
606            })}).await.map_err(Error::from)??;
607            let options = options.unwrap_or_default();
608            match past_read {
609                PastObjectRead::ObjectNotExists(id) => {
610                    Ok(IotaPastObjectResponse::ObjectNotExists(id))
611                }
612                PastObjectRead::VersionFound(object_ref, o, layout) => {
613                    let display_fields = if options.show_display {
614                        // TODO (jian): api breaking change to also modify past objects.
615                        Some(
616                            get_display_fields(self, &self.transaction_kv_store, &o, &layout)
617                                .await
618                                .map_err(|e| {
619                                    Error::Unexpected(format!(
620                                        "Unable to render object at version {version}: {e}"
621                                    ))
622                                })?,
623                        )
624                    } else {
625                        None
626                    };
627                    Ok(IotaPastObjectResponse::VersionFound(
628                        IotaObjectData::new(object_ref, o, layout, options, display_fields)?,
629                    ))
630                }
631                PastObjectRead::ObjectDeleted(oref) => {
632                    Ok(IotaPastObjectResponse::ObjectDeleted(oref.into()))
633                }
634                PastObjectRead::VersionNotFound(id, seq_num) => {
635                    Ok(IotaPastObjectResponse::VersionNotFound(id, seq_num))
636                }
637                PastObjectRead::VersionTooHigh {
638                    object_id,
639                    asked_version,
640                    latest_version,
641                } => Ok(IotaPastObjectResponse::VersionTooHigh {
642                    object_id,
643                    asked_version,
644                    latest_version,
645                }),
646            }
647        }
648        .trace()
649        .await
650    }
651
652    #[instrument(skip(self))]
653    async fn try_get_object_before_version(
654        &self,
655        object_id: ObjectID,
656        version: SequenceNumber,
657    ) -> RpcResult<IotaPastObjectResponse> {
658        let version = self
659            .state
660            .find_object_lt_or_eq_version(&object_id, &version)
661            .await
662            .map_err(Error::from)?
663            .map(|obj| obj.version())
664            .unwrap_or_default();
665        self.try_get_past_object(
666            object_id,
667            version,
668            Some(IotaObjectDataOptions::bcs_lossless()),
669        )
670        .await
671    }
672
673    #[instrument(skip(self))]
674    async fn try_multi_get_past_objects(
675        &self,
676        past_objects: Vec<IotaGetPastObjectRequest>,
677        options: Option<IotaObjectDataOptions>,
678    ) -> RpcResult<Vec<IotaPastObjectResponse>> {
679        async move {
680            if past_objects.len() <= *QUERY_MAX_RESULT_LIMIT {
681                let mut futures = vec![];
682                for past_object in past_objects {
683                    futures.push(self.try_get_past_object(
684                        past_object.object_id,
685                        past_object.version,
686                        options.clone(),
687                    ));
688                }
689                let results = join_all(futures).await;
690
691                let (oks, errs): (Vec<_>, Vec<_>) = results.into_iter().partition(Result::is_ok);
692                let success = oks.into_iter().filter_map(Result::ok).collect();
693                let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
694                if !errors.is_empty() {
695                    let error_string = errors
696                        .iter()
697                        .map(|e| e.to_string())
698                        .collect::<Vec<String>>()
699                        .join("; ");
700                    Err(anyhow!("{error_string}").into()) // Collects errors not
701                // related to
702                // IotaPastObjectResponse
703                // variants
704                } else {
705                    Ok(success)
706                }
707            } else {
708                Err(IotaRpcInputError::SizeLimitExceeded(
709                    QUERY_MAX_RESULT_LIMIT.to_string(),
710                ))?
711            }
712        }
713        .trace()
714        .await
715    }
716
717    #[instrument(skip(self))]
718    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
719        async move {
720            Ok(self
721                .state
722                .get_total_transaction_blocks()
723                .map_err(Error::from)?
724                .into()) // converts into BigInt<u64>
725        }
726        .trace()
727        .await
728    }
729
730    #[instrument(skip(self))]
731    async fn is_transaction_indexed_on_node(&self, digest: TransactionDigest) -> RpcResult<bool> {
732        let transaction = async move {
733            let transaction_kv_store = self.transaction_kv_store.clone();
734            let mut transactions = spawn_monitored_task!(async move {
735                let ret = transaction_kv_store
736                    .multi_get_tx(&[digest])
737                    .await
738                    .map_err(|err| {
739                        debug!(tx_digest=?digest, "Failed to get transaction: {:?}", err);
740                        Error::from(err)
741                    });
742                add_server_timing("tx_kv_lookup");
743                ret
744            })
745            .await??;
746            Ok(transactions
747                .pop()
748                .expect("there should be one tx lookup response"))
749        }
750        .trace()
751        .await?;
752        Ok(transaction.map(|tx| *tx.digest()) == Some(digest))
753    }
754
755    #[instrument(skip(self))]
756    async fn get_transaction_block(
757        &self,
758        digest: TransactionDigest,
759        opts: Option<IotaTransactionBlockResponseOptions>,
760    ) -> RpcResult<IotaTransactionBlockResponse> {
761        async move {
762            let opts = opts.unwrap_or_default();
763            let mut temp_response = IntermediateTransactionResponse::new(digest);
764
765            // Fetch transaction to determine existence
766            let transaction_kv_store = self.transaction_kv_store.clone();
767            let transaction = spawn_monitored_task!(async move {
768                let ret = transaction_kv_store.get_tx(digest).await.map_err(|err| {
769                    debug!(tx_digest=?digest, "Failed to get transaction: {:?}", err);
770                    Error::from(err)
771                });
772                add_server_timing("tx_kv_lookup");
773                ret
774            })
775            .await
776            .map_err(Error::from)??;
777            let input_objects = transaction
778                .data()
779                .inner()
780                .intent_message
781                .value
782                .input_objects()
783                .unwrap_or_default();
784
785            // the input is needed for object_changes to retrieve the sender address.
786            if opts.require_input() {
787                temp_response.transaction = Some(transaction);
788            }
789
790            // Fetch effects when `show_events` is true because events relies on effects
791            if opts.require_effects() {
792                let transaction_kv_store = self.transaction_kv_store.clone();
793                temp_response.effects = Some(
794                    spawn_monitored_task!(async move {
795                        transaction_kv_store
796                            .get_fx_by_tx_digest(digest)
797                            .await
798                            .map_err(|err| {
799                                debug!(tx_digest=?digest, "Failed to get effects: {:?}", err);
800                                Error::from(err)
801                            })
802                    })
803                    .await
804                    .map_err(Error::from)??,
805                );
806            }
807
808            // `AuthorityPerpetualTables::executed_transactions_to_checkpoint`
809            // table and `CheckpointCache` trait exist for the sole purpose
810            // of being able to execute the following call below.
811            // It if gets removed or rewritten then the table and associated
812            // code can be removed as well.
813            temp_response.checkpoint_seq = self
814                .transaction_kv_store
815                .get_transaction_perpetual_checkpoint(digest)
816                .await
817                .map_err(|e| {
818                    error!("failed to retrieve checkpoint sequence for transaction {digest:?} with error: {e:?}");
819                    Error::from(e)
820                })?;
821
822            if let Some(checkpoint_seq) = &temp_response.checkpoint_seq {
823                let kv_store = self.transaction_kv_store.clone();
824                let checkpoint_seq = *checkpoint_seq;
825                let checkpoint = spawn_monitored_task!(async move {
826                    kv_store
827                    // safe to unwrap because we have checked `is_some` above
828                    .get_checkpoint_summary(checkpoint_seq)
829                    .await
830                    .map_err(|e| {
831                        error!("failed to get checkpoint by sequence number: {checkpoint_seq:?} with error: {e:?}");
832                        Error::from(e)
833                    })
834                }).await.map_err(Error::from)??;
835                // TODO(chris): we don't need to fetch the whole checkpoint summary
836                temp_response.timestamp = Some(checkpoint.timestamp_ms);
837            }
838
839            if opts.show_events && temp_response.effects.is_some() {
840                let transaction_kv_store = self.transaction_kv_store.clone();
841                let events = spawn_monitored_task!(async move {
842                    transaction_kv_store
843                        .multi_get_events_by_tx_digests(&[digest])
844                        .await
845                        .map_err(|e| {
846                            error!("failed to call get transaction events for transaction: {digest:?} with error {e:?}");
847                            Error::from(e)
848                        })
849                    })
850                    .await
851                    .map_err(Error::from)??
852                    .pop()
853                    .flatten();
854                match events {
855                    None => temp_response.events = Some(IotaTransactionBlockEvents::default()),
856                    Some(events) => match to_iota_transaction_events(self, digest, events) {
857                        Ok(e) => temp_response.events = Some(e),
858                        Err(e) => temp_response.errors.push(e.to_string()),
859                    },
860                }
861            }
862
863            let object_cache =
864                ObjectProviderCache::new((self.state.clone(), self.transaction_kv_store.clone()));
865            if opts.show_balance_changes {
866                if let Some(effects) = &temp_response.effects {
867                    let balance_changes = get_balance_changes_from_effect(
868                        &object_cache,
869                        effects,
870                        input_objects,
871                        None,
872                    )
873                    .await;
874
875                    if let Ok(balance_changes) = balance_changes {
876                        temp_response.balance_changes = Some(balance_changes);
877                    } else {
878                        temp_response.errors.push(format!(
879                            "Cannot retrieve balance changes: {}",
880                            balance_changes.unwrap_err()
881                        ));
882                    }
883                }
884            }
885
886            if opts.show_object_changes {
887                if let (Some(effects), Some(input)) =
888                    (&temp_response.effects, &temp_response.transaction)
889                {
890                    let sender = input.data().intent_message().value.sender();
891                    let object_changes = get_object_changes(
892                        &object_cache,
893                        sender,
894                        effects.modified_at_versions(),
895                        effects.all_changed_objects(),
896                        effects.all_removed_objects(),
897                    )
898                    .await;
899
900                    if let Ok(object_changes) = object_changes {
901                        temp_response.object_changes = Some(object_changes);
902                    } else {
903                        temp_response.errors.push(format!(
904                            "Cannot retrieve object changes: {}",
905                            object_changes.unwrap_err()
906                        ));
907                    }
908                }
909            }
910            let epoch_store = self.state.load_epoch_store_one_call_per_task();
911
912            convert_to_response(temp_response, &opts, epoch_store.module_cache())
913        }
914        .trace()
915        .await
916    }
917
918    #[instrument(skip(self))]
919    async fn multi_get_transaction_blocks(
920        &self,
921        digests: Vec<TransactionDigest>,
922        opts: Option<IotaTransactionBlockResponseOptions>,
923    ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
924        async move {
925            let cloned_self = self.clone();
926            spawn_monitored_task!(async move {
927                cloned_self
928                    .multi_get_transaction_blocks_internal(digests, opts)
929                    .await
930            })
931            .await
932            .map_err(Error::from)?
933        }
934        .trace()
935        .await
936    }
937
938    #[instrument(skip(self))]
939    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
940        async move {
941            let state = self.state.clone();
942            let transaction_kv_store = self.transaction_kv_store.clone();
943            spawn_monitored_task!(async move{
944                let store = state.load_epoch_store_one_call_per_task();
945                let events = transaction_kv_store
946                    .multi_get_events_by_tx_digests(&[transaction_digest])
947                    .await
948                    .map_err(
949                        |e| {
950                            error!("failed to get transaction events for transaction {transaction_digest:?} with error: {e:?}");
951                            Error::StateRead(e.into())
952                        })?
953                    .pop()
954                    .flatten();
955                Ok(match events {
956                    Some(events) => events
957                        .data
958                        .into_iter()
959                        .enumerate()
960                        .map(|(seq, e)| {
961                            let layout = store.executor().type_layout_resolver(Box::new(&state.get_backing_package_store().as_ref())).get_annotated_layout(&e.type_)?;
962                            IotaEvent::try_from(e, transaction_digest, seq as u64, None, layout)
963                        })
964                        .collect::<Result<Vec<_>, _>>()
965                        .map_err(Error::Iota)?,
966                    None => vec![],
967                })
968            })
969            .await
970            .map_err(Error::from)?
971        }
972        .trace()
973        .await
974    }
975
976    #[instrument(skip(self))]
977    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
978        async move {
979            Ok(self
980                .state
981                .get_latest_checkpoint_sequence_number()
982                .map_err(|e| {
983                    IotaRpcInputError::GenericNotFound(format!(
984                        "Latest checkpoint sequence number was not found with error :{e}"
985                    ))
986                })?
987                .into())
988        }
989        .trace()
990        .await
991    }
992
993    #[instrument(skip(self))]
994    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
995        self.get_checkpoint_internal(id).trace().await
996    }
997
998    #[instrument(skip(self))]
999    async fn get_checkpoints(
1000        &self,
1001        // If `Some`, the query will start from the next item after the specified cursor
1002        cursor: Option<BigInt<u64>>,
1003        limit: Option<usize>,
1004        descending_order: bool,
1005    ) -> RpcResult<CheckpointPage> {
1006        async move {
1007            let limit = validate_limit(limit, QUERY_MAX_RESULT_LIMIT_CHECKPOINTS)
1008                .map_err(IotaRpcInputError::from)?;
1009
1010            let state = self.state.clone();
1011            let kv_store = self.transaction_kv_store.clone();
1012
1013            self.metrics.get_checkpoints_limit.observe(limit as f64);
1014
1015            let mut data = spawn_monitored_task!(Self::get_checkpoints_internal(
1016                state,
1017                kv_store,
1018                cursor.map(|s| *s),
1019                limit as u64 + 1,
1020                descending_order,
1021            ))
1022            .await
1023            .map_err(Error::from)?
1024            .map_err(Error::from)?;
1025
1026            let has_next_page = data.len() > limit;
1027            data.truncate(limit);
1028
1029            let next_cursor = if has_next_page {
1030                data.last().cloned().map(|d| d.sequence_number.into())
1031            } else {
1032                None
1033            };
1034
1035            self.metrics
1036                .get_checkpoints_result_size
1037                .observe(data.len() as f64);
1038            self.metrics
1039                .get_checkpoints_result_size_total
1040                .inc_by(data.len() as u64);
1041
1042            Ok(CheckpointPage {
1043                data,
1044                next_cursor,
1045                has_next_page,
1046            })
1047        }
1048        .trace()
1049        .await
1050    }
1051
1052    #[instrument(skip(self))]
1053    async fn get_protocol_config(
1054        &self,
1055        version: Option<BigInt<u64>>,
1056    ) -> RpcResult<ProtocolConfigResponse> {
1057        async move {
1058            version
1059                .map(|v| {
1060                    ProtocolConfig::get_for_version_if_supported(
1061                        (*v).into(),
1062                        self.state.get_chain_identifier()?.chain(),
1063                    )
1064                    .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
1065                        ProtocolVersion::MIN.as_u64(),
1066                        ProtocolVersion::MAX.as_u64(),
1067                    ))
1068                    .map_err(Error::from)
1069                })
1070                .unwrap_or(Ok(self
1071                    .state
1072                    .load_epoch_store_one_call_per_task()
1073                    .protocol_config()
1074                    .clone()))
1075                .map(ProtocolConfigResponse::from)
1076        }
1077        .trace()
1078        .await
1079    }
1080
1081    #[instrument(skip(self))]
1082    async fn get_chain_identifier(&self) -> RpcResult<String> {
1083        async move {
1084            let ci = self.state.get_chain_identifier()?;
1085            Ok(ci.to_string())
1086        }
1087        .trace()
1088        .await
1089    }
1090}
1091
1092impl IotaRpcModule for ReadApi {
1093    fn rpc(self) -> RpcModule<Self> {
1094        self.into_rpc()
1095    }
1096
1097    fn rpc_doc_module() -> Module {
1098        ReadApiOpenRpc::module_doc()
1099    }
1100}
1101
1102fn to_iota_transaction_events(
1103    fullnode_api: &ReadApi,
1104    tx_digest: TransactionDigest,
1105    events: TransactionEvents,
1106) -> Result<IotaTransactionBlockEvents, Error> {
1107    let epoch_store = fullnode_api.state.load_epoch_store_one_call_per_task();
1108    let backing_package_store = fullnode_api.state.get_backing_package_store();
1109    let mut layout_resolver = epoch_store
1110        .executor()
1111        .type_layout_resolver(Box::new(backing_package_store.as_ref()));
1112    Ok(IotaTransactionBlockEvents::try_from(
1113        events,
1114        tx_digest,
1115        None,
1116        layout_resolver.as_mut(),
1117    )?)
1118}
1119
1120#[derive(Debug, thiserror::Error)]
1121pub enum ObjectDisplayError {
1122    #[error("Not a move struct")]
1123    NotMoveStruct,
1124
1125    #[error("Failed to extract layout")]
1126    Layout,
1127
1128    #[error("Failed to extract Move object")]
1129    MoveObject,
1130
1131    #[error(transparent)]
1132    Deserialization(#[from] IotaError),
1133
1134    #[error("Failed to deserialize 'VersionUpdatedEvent': {0}")]
1135    Bcs(#[from] bcs::Error),
1136
1137    #[error(transparent)]
1138    StateRead(#[from] StateReadError),
1139}
1140
1141async fn get_display_fields(
1142    fullnode_api: &ReadApi,
1143    kv_store: &Arc<TransactionKeyValueStore>,
1144    original_object: &Object,
1145    original_layout: &Option<MoveStructLayout>,
1146) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1147    let Some((object_type, layout)) = get_object_type_and_struct(original_object, original_layout)?
1148    else {
1149        return Ok(DisplayFieldsResponse {
1150            data: None,
1151            error: None,
1152        });
1153    };
1154    if let Some(display_object) =
1155        get_display_object_by_type(kv_store, fullnode_api, &object_type).await?
1156    {
1157        return get_rendered_fields(display_object.fields, &layout);
1158    }
1159    Ok(DisplayFieldsResponse {
1160        data: None,
1161        error: None,
1162    })
1163}
1164
1165async fn get_display_object_by_type(
1166    kv_store: &Arc<TransactionKeyValueStore>,
1167    fullnode_api: &ReadApi,
1168    object_type: &StructTag,
1169    // TODO: add query version support
1170) -> Result<Option<DisplayVersionUpdatedEvent>, ObjectDisplayError> {
1171    let mut events = fullnode_api
1172        .state
1173        .query_events(
1174            kv_store,
1175            EventFilter::MoveEventType(DisplayVersionUpdatedEvent::type_(object_type)),
1176            None,
1177            1,
1178            true,
1179        )
1180        .await?;
1181
1182    // If there's any recent version of Display, give it to the client.
1183    // TODO: add support for version query.
1184    if let Some(event) = events.pop() {
1185        let display: DisplayVersionUpdatedEvent = bcs::from_bytes(&event.bcs.into_bytes())?;
1186        Ok(Some(display))
1187    } else {
1188        Ok(None)
1189    }
1190}
1191
1192pub fn get_object_type_and_struct(
1193    o: &Object,
1194    layout: &Option<MoveStructLayout>,
1195) -> Result<Option<(StructTag, MoveStruct)>, ObjectDisplayError> {
1196    if let Some(object_type) = o.type_() {
1197        let move_struct = get_move_struct(o, layout)?;
1198        Ok(Some((object_type.clone().into(), move_struct)))
1199    } else {
1200        Ok(None)
1201    }
1202}
1203
1204fn get_move_struct(
1205    o: &Object,
1206    layout: &Option<MoveStructLayout>,
1207) -> Result<MoveStruct, ObjectDisplayError> {
1208    let layout = layout.as_ref().ok_or_else(|| ObjectDisplayError::Layout)?;
1209    Ok(o.data
1210        .try_as_move()
1211        .ok_or_else(|| ObjectDisplayError::MoveObject)?
1212        .to_move_struct(layout)?)
1213}
1214
1215pub fn get_rendered_fields(
1216    fields: VecMap<String, String>,
1217    move_struct: &MoveStruct,
1218) -> Result<DisplayFieldsResponse, ObjectDisplayError> {
1219    let iota_move_value: IotaMoveValue = MoveValue::Struct(move_struct.clone()).into();
1220    if let IotaMoveValue::Struct(move_struct) = iota_move_value {
1221        let fields =
1222            fields
1223                .contents
1224                .iter()
1225                .map(|entry| match parse_template(&entry.value, &move_struct) {
1226                    Ok(value) => Ok((entry.key.clone(), value)),
1227                    Err(e) => Err(e),
1228                });
1229        let (oks, errs): (Vec<_>, Vec<_>) = fields.partition(Result::is_ok);
1230        let success = oks.into_iter().filter_map(Result::ok).collect();
1231        let errors: Vec<_> = errs.into_iter().filter_map(Result::err).collect();
1232        let error_string = errors
1233            .iter()
1234            .map(|e| e.to_string())
1235            .collect::<Vec<String>>()
1236            .join("; ");
1237        let error = if !error_string.is_empty() {
1238            Some(IotaObjectResponseError::Display {
1239                error: anyhow!("{error_string}").to_string(),
1240            })
1241        } else {
1242            None
1243        };
1244
1245        return Ok(DisplayFieldsResponse {
1246            data: Some(success),
1247            error,
1248        });
1249    }
1250    Err(ObjectDisplayError::NotMoveStruct)?
1251}
1252
1253fn parse_template(template: &str, move_struct: &IotaMoveStruct) -> Result<String, Error> {
1254    let mut output = template.to_string();
1255    let mut var_name = String::new();
1256    let mut in_braces = false;
1257    let mut escaped = false;
1258
1259    for ch in template.chars() {
1260        match ch {
1261            '\\' => {
1262                escaped = true;
1263                continue;
1264            }
1265            '{' if !escaped => {
1266                in_braces = true;
1267                var_name.clear();
1268            }
1269            '}' if !escaped => {
1270                in_braces = false;
1271                let value = get_value_from_move_struct(move_struct, &var_name)?;
1272                output = output.replace(&format!("{{{var_name}}}"), &value.to_string());
1273            }
1274            _ if !escaped => {
1275                if in_braces {
1276                    var_name.push(ch);
1277                }
1278            }
1279            _ => {}
1280        }
1281        escaped = false;
1282    }
1283
1284    Ok(output.replace('\\', ""))
1285}
1286
1287fn get_value_from_move_struct(
1288    move_struct: &IotaMoveStruct,
1289    var_name: &str,
1290) -> Result<String, Error> {
1291    let parts: Vec<&str> = var_name.split('.').collect();
1292    if parts.is_empty() {
1293        Err(anyhow!("Display template value cannot be empty"))?;
1294    }
1295    if parts.len() > MAX_DISPLAY_NESTED_LEVEL {
1296        Err(anyhow!(
1297            "Display template value nested depth cannot exist {}",
1298            MAX_DISPLAY_NESTED_LEVEL
1299        ))?;
1300    }
1301    let mut current_value = &IotaMoveValue::Struct(move_struct.clone());
1302    // iterate over the parts and try to access the corresponding field
1303    for part in parts {
1304        match current_value {
1305            IotaMoveValue::Struct(move_struct) => {
1306                if let IotaMoveStruct::WithTypes { type_: _, fields }
1307                | IotaMoveStruct::WithFields(fields) = move_struct
1308                {
1309                    if let Some(value) = fields.get(part) {
1310                        current_value = value;
1311                    } else {
1312                        Err(anyhow!("Field value {var_name} cannot be found in struct"))?;
1313                    }
1314                } else {
1315                    Err(Error::Unexpected(format!(
1316                        "Unexpected move struct type for field {var_name}"
1317                    )))?;
1318                }
1319            }
1320            IotaMoveValue::Variant(IotaMoveVariant {
1321                fields, variant, ..
1322            }) => {
1323                if let Some(value) = fields.get(part) {
1324                    current_value = value;
1325                } else {
1326                    Err(anyhow!(
1327                        "Field value {var_name} cannot be found in variant {variant}",
1328                    ))?
1329                }
1330            }
1331            _ => {
1332                Err(Error::Unexpected(format!(
1333                    "Unexpected move value type for field {var_name}"
1334                )))?;
1335            }
1336        }
1337    }
1338
1339    match current_value {
1340        IotaMoveValue::Option(move_option) => match move_option.as_ref() {
1341            Some(move_value) => Ok(move_value.to_string()),
1342            None => Ok("".to_string()),
1343        },
1344        IotaMoveValue::Vector(_) => Err(anyhow!(
1345            "Vector is not supported as a Display value {var_name}"
1346        ))?,
1347
1348        _ => Ok(current_value.to_string()),
1349    }
1350}
1351
1352fn convert_to_response(
1353    cache: IntermediateTransactionResponse,
1354    opts: &IotaTransactionBlockResponseOptions,
1355    module_cache: &impl GetModule,
1356) -> RpcInterimResult<IotaTransactionBlockResponse> {
1357    let mut response = IotaTransactionBlockResponse::new(cache.digest);
1358    response.errors = cache.errors;
1359
1360    if opts.show_raw_input && cache.transaction.is_some() {
1361        let sender_signed_data = cache.transaction.as_ref().unwrap().data();
1362        let raw_tx = bcs::to_bytes(sender_signed_data)
1363            .map_err(|e| anyhow!("Failed to serialize raw transaction with error: {e}"))?; // TODO: is this a client or server error?
1364        response.raw_transaction = raw_tx;
1365    }
1366
1367    if opts.show_input && cache.transaction.is_some() {
1368        let tx_block = IotaTransactionBlock::try_from(
1369            cache.transaction.unwrap().into_data(),
1370            module_cache,
1371            cache.digest,
1372        )?;
1373        response.transaction = Some(tx_block);
1374    }
1375
1376    if opts.show_raw_effects {
1377        let raw_effects = cache
1378            .effects
1379            .as_ref()
1380            .map(bcs::to_bytes)
1381            .transpose()
1382            .map_err(|e| anyhow!("Failed to serialize raw effects with error: {e}"))?
1383            .unwrap_or_default();
1384        response.raw_effects = raw_effects;
1385    }
1386
1387    if opts.show_effects && cache.effects.is_some() {
1388        let effects = cache.effects.unwrap().try_into().map_err(|e| {
1389            anyhow!(
1390                // TODO: is this a client or server error?
1391                "Failed to convert transaction block effects with error: {e}"
1392            )
1393        })?;
1394        response.effects = Some(effects);
1395    }
1396
1397    response.checkpoint = cache.checkpoint_seq;
1398    response.timestamp_ms = cache.timestamp;
1399
1400    if opts.show_events {
1401        response.events = cache.events;
1402    }
1403
1404    if opts.show_balance_changes {
1405        response.balance_changes = cache.balance_changes;
1406    }
1407
1408    if opts.show_object_changes {
1409        response.object_changes = cache.object_changes;
1410    }
1411
1412    Ok(response)
1413}
1414
1415fn calculate_checkpoint_numbers(
1416    // If `Some`, the query will start from the next item after the specified cursor
1417    cursor: Option<CheckpointSequenceNumber>,
1418    limit: u64,
1419    descending_order: bool,
1420    max_checkpoint: CheckpointSequenceNumber,
1421) -> Vec<CheckpointSequenceNumber> {
1422    let (start_index, end_index) = match cursor {
1423        Some(t) => {
1424            if descending_order {
1425                let start = std::cmp::min(t.saturating_sub(1), max_checkpoint);
1426                let end = start.saturating_sub(limit - 1);
1427                (end, start)
1428            } else {
1429                let start =
1430                    std::cmp::min(t.checked_add(1).unwrap_or(max_checkpoint), max_checkpoint);
1431                let end = std::cmp::min(
1432                    start.checked_add(limit - 1).unwrap_or(max_checkpoint),
1433                    max_checkpoint,
1434                );
1435                (start, end)
1436            }
1437        }
1438        None => {
1439            if descending_order {
1440                (max_checkpoint.saturating_sub(limit - 1), max_checkpoint)
1441            } else {
1442                (0, std::cmp::min(limit - 1, max_checkpoint))
1443            }
1444        }
1445    };
1446
1447    if descending_order {
1448        (start_index..=end_index).rev().collect()
1449    } else {
1450        (start_index..=end_index).collect()
1451    }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456    use super::*;
1457
1458    #[test]
1459    fn test_calculate_checkpoint_numbers() {
1460        let cursor = Some(10);
1461        let limit = 5;
1462        let descending_order = true;
1463        let max_checkpoint = 15;
1464
1465        let checkpoint_numbers =
1466            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1467
1468        assert_eq!(checkpoint_numbers, vec![9, 8, 7, 6, 5]);
1469    }
1470
1471    #[test]
1472    fn test_calculate_checkpoint_numbers_descending_no_cursor() {
1473        let cursor = None;
1474        let limit = 5;
1475        let descending_order = true;
1476        let max_checkpoint = 15;
1477
1478        let checkpoint_numbers =
1479            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1480
1481        assert_eq!(checkpoint_numbers, vec![15, 14, 13, 12, 11]);
1482    }
1483
1484    #[test]
1485    fn test_calculate_checkpoint_numbers_ascending_no_cursor() {
1486        let cursor = None;
1487        let limit = 5;
1488        let descending_order = false;
1489        let max_checkpoint = 15;
1490
1491        let checkpoint_numbers =
1492            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1493
1494        assert_eq!(checkpoint_numbers, vec![0, 1, 2, 3, 4]);
1495    }
1496
1497    #[test]
1498    fn test_calculate_checkpoint_numbers_ascending_with_cursor() {
1499        let cursor = Some(10);
1500        let limit = 5;
1501        let descending_order = false;
1502        let max_checkpoint = 15;
1503
1504        let checkpoint_numbers =
1505            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1506
1507        assert_eq!(checkpoint_numbers, vec![11, 12, 13, 14, 15]);
1508    }
1509
1510    #[test]
1511    fn test_calculate_checkpoint_numbers_ascending_limit_exceeds_max() {
1512        let cursor = None;
1513        let limit = 20;
1514        let descending_order = false;
1515        let max_checkpoint = 15;
1516
1517        let checkpoint_numbers =
1518            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1519
1520        assert_eq!(checkpoint_numbers, (0..=15).collect::<Vec<_>>());
1521    }
1522
1523    #[test]
1524    fn test_calculate_checkpoint_numbers_descending_limit_exceeds_max() {
1525        let cursor = None;
1526        let limit = 20;
1527        let descending_order = true;
1528        let max_checkpoint = 15;
1529
1530        let checkpoint_numbers =
1531            calculate_checkpoint_numbers(cursor, limit, descending_order, max_checkpoint);
1532
1533        assert_eq!(checkpoint_numbers, (0..=15).rev().collect::<Vec<_>>());
1534    }
1535}