iota_indexer/apis/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use iota_json_rpc::{IotaRpcModule, error::IotaRpcInputError};
9use iota_json_rpc_api::{QUERY_MAX_RESULT_LIMIT, ReadApiServer, internal_error};
10use iota_json_rpc_types::{
11    Checkpoint, CheckpointId, CheckpointPage, IotaEvent, IotaGetPastObjectRequest, IotaObjectData,
12    IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
13    IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions, ProtocolConfigResponse,
14};
15use iota_open_rpc::Module;
16use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
17use iota_types::{
18    base_types::{ObjectID, SequenceNumber},
19    digests::{ChainIdentifier, TransactionDigest},
20    error::IotaObjectResponseError,
21    iota_serde::BigInt,
22    object::{ObjectRead, PastObjectRead},
23};
24use jsonrpsee::{RpcModule, core::RpcResult};
25
26use crate::{errors::IndexerError, indexer_reader::IndexerReader, models::objects::StoredObject};
27
28#[derive(Clone)]
29pub(crate) struct ReadApi {
30    inner: IndexerReader,
31}
32
33impl ReadApi {
34    pub fn new(inner: IndexerReader) -> Self {
35        Self { inner }
36    }
37
38    async fn get_checkpoint(&self, id: CheckpointId) -> Result<Checkpoint, IndexerError> {
39        match self
40            .inner
41            .spawn_blocking(move |this| this.get_checkpoint(id))
42            .await
43        {
44            Ok(Some(epoch_info)) => Ok(epoch_info),
45            Ok(None) => Err(IndexerError::InvalidArgument(format!(
46                "Checkpoint {id:?} not found"
47            ))),
48            Err(e) => Err(e),
49        }
50    }
51
52    async fn get_latest_checkpoint(&self) -> Result<Checkpoint, IndexerError> {
53        self.inner
54            .spawn_blocking(|this| this.get_latest_checkpoint())
55            .await
56    }
57
58    async fn get_chain_identifier(&self) -> RpcResult<ChainIdentifier> {
59        let genesis_checkpoint = self.get_checkpoint(CheckpointId::SequenceNumber(0)).await?;
60        Ok(ChainIdentifier::from(genesis_checkpoint.digest))
61    }
62
63    async fn object_read_to_object_response(
64        &self,
65        object_read: ObjectRead,
66        options: IotaObjectDataOptions,
67    ) -> RpcResult<IotaObjectResponse> {
68        match object_read {
69            ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
70                IotaObjectResponseError::NotExists { object_id: id },
71            )),
72            ObjectRead::Exists(object_ref, o, layout) => {
73                let mut display_fields = None;
74                if options.show_display {
75                    match self.inner.get_display_fields(&o, &layout).await {
76                        Ok(rendered_fields) => display_fields = Some(rendered_fields),
77                        Err(e) => {
78                            return Ok(IotaObjectResponse::new(
79                                Some(
80                                    IotaObjectData::new(object_ref, o, layout, options, None)
81                                        .map_err(internal_error)?,
82                                ),
83                                Some(IotaObjectResponseError::Display {
84                                    error: e.to_string(),
85                                }),
86                            ));
87                        }
88                    }
89                }
90                Ok(IotaObjectResponse::new_with_data(
91                    IotaObjectData::new(object_ref, o, layout, options, display_fields)
92                        .map_err(internal_error)?,
93                ))
94            }
95            ObjectRead::Deleted((object_id, version, digest)) => Ok(
96                IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
97                    object_id,
98                    version,
99                    digest,
100                }),
101            ),
102        }
103    }
104
105    async fn past_object_read_to_response(
106        &self,
107        options: Option<IotaObjectDataOptions>,
108        past_object_read: PastObjectRead,
109    ) -> RpcResult<IotaPastObjectResponse> {
110        let options = options.unwrap_or_default();
111
112        match past_object_read {
113            PastObjectRead::ObjectNotExists(id) => Ok(IotaPastObjectResponse::ObjectNotExists(id)),
114
115            PastObjectRead::ObjectDeleted(object_ref) => {
116                Ok(IotaPastObjectResponse::ObjectDeleted(object_ref.into()))
117            }
118
119            PastObjectRead::VersionFound(object_ref, object, layout) => {
120                let display_fields = if options.show_display {
121                    let rendered_fields = self
122                        .inner
123                        .get_display_fields(&object, &layout)
124                        .await
125                        .map_err(internal_error)?;
126
127                    Some(rendered_fields)
128                } else {
129                    None
130                };
131
132                Ok(IotaPastObjectResponse::VersionFound(
133                    IotaObjectData::new(object_ref, object, layout, options, display_fields)
134                        .map_err(internal_error)?,
135                ))
136            }
137
138            PastObjectRead::VersionNotFound(object_id, version) => {
139                Ok(IotaPastObjectResponse::VersionNotFound(object_id, version))
140            }
141
142            PastObjectRead::VersionTooHigh {
143                object_id,
144                asked_version,
145                latest_version,
146            } => Ok(IotaPastObjectResponse::VersionTooHigh {
147                object_id,
148                asked_version,
149                latest_version,
150            }),
151        }
152    }
153}
154
155#[async_trait]
156impl ReadApiServer for ReadApi {
157    async fn get_object(
158        &self,
159        object_id: ObjectID,
160        options: Option<IotaObjectDataOptions>,
161    ) -> RpcResult<IotaObjectResponse> {
162        let object_read = self
163            .inner
164            .get_object_read_in_blocking_task(object_id)
165            .await?;
166        self.object_read_to_object_response(object_read, options.unwrap_or_default())
167            .await
168    }
169
170    async fn multi_get_objects(
171        &self,
172        object_ids: Vec<ObjectID>,
173        options: Option<IotaObjectDataOptions>,
174    ) -> RpcResult<Vec<IotaObjectResponse>> {
175        if object_ids.len() > *QUERY_MAX_RESULT_LIMIT {
176            return Err(
177                IotaRpcInputError::SizeLimitExceeded(QUERY_MAX_RESULT_LIMIT.to_string()).into(),
178            );
179        }
180
181        // Doesn't take care of missing objects.
182        let stored_objects = self
183            .inner
184            .multi_get_objects_in_blocking_task(object_ids.clone())
185            .await?;
186
187        // Map the returned `StoredObject`s to `ObjectID`
188        let object_map: Arc<HashMap<ObjectID, StoredObject>> = Arc::new(
189            stored_objects
190                .into_iter()
191                .map(|obj| {
192                    let object_id = ObjectID::try_from(obj.object_id.clone()).map_err(|_| {
193                        IndexerError::PersistentStorageDataCorruption(format!(
194                            "failed to parse ObjectID: {:?}",
195                            obj.object_id
196                        ))
197                    })?;
198                    Ok::<(ObjectID, StoredObject), IndexerError>((object_id, obj))
199                })
200                .collect::<Result<_, IndexerError>>()?,
201        );
202
203        let options = options.unwrap_or_default();
204        let resolver = self.inner.package_resolver();
205
206        // Create a future for each requested object id
207        let futures = object_ids.into_iter().map(|object_id| {
208            let options = options.clone();
209            let resolver = resolver.clone();
210            let maybe_stored = object_map.get(&object_id).cloned();
211            async move {
212                match maybe_stored {
213                    Some(stored) => {
214                        let object_read = stored.try_into_object_read(resolver).await?;
215                        self.object_read_to_object_response(object_read, options)
216                            .await
217                    }
218                    None => {
219                        self.object_read_to_object_response(
220                            ObjectRead::NotExists(object_id),
221                            options,
222                        )
223                        .await
224                    }
225                }
226            }
227        });
228
229        futures::future::try_join_all(futures).await
230    }
231
232    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
233        let checkpoint = self.get_latest_checkpoint().await?;
234        Ok(BigInt::from(checkpoint.network_total_transactions))
235    }
236
237    async fn get_transaction_block(
238        &self,
239        digest: TransactionDigest,
240        options: Option<IotaTransactionBlockResponseOptions>,
241    ) -> RpcResult<IotaTransactionBlockResponse> {
242        let mut txn = self
243            .multi_get_transaction_blocks(vec![digest], options)
244            .await?;
245
246        let txn = txn.pop().ok_or_else(|| {
247            IndexerError::InvalidArgument(format!("Transaction {digest} not found"))
248        })?;
249
250        Ok(txn)
251    }
252
253    async fn multi_get_transaction_blocks(
254        &self,
255        digests: Vec<TransactionDigest>,
256        options: Option<IotaTransactionBlockResponseOptions>,
257    ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
258        let num_digests = digests.len();
259        if num_digests > *QUERY_MAX_RESULT_LIMIT {
260            Err(IotaRpcInputError::SizeLimitExceeded(
261                QUERY_MAX_RESULT_LIMIT.to_string(),
262            ))?
263        }
264
265        let options = options.unwrap_or_default();
266        let txns = self
267            .inner
268            .multi_get_transaction_block_response_in_blocking_task(digests, options)
269            .await?;
270
271        Ok(txns)
272    }
273
274    async fn try_get_past_object(
275        &self,
276        object_id: ObjectID,
277        version: SequenceNumber,
278        options: Option<IotaObjectDataOptions>,
279    ) -> RpcResult<IotaPastObjectResponse> {
280        let past_object_read = self
281            .inner
282            .get_past_object_read(object_id, version, false)
283            .await?;
284
285        self.past_object_read_to_response(options, past_object_read)
286            .await
287    }
288
289    async fn try_get_object_before_version(
290        &self,
291        object_id: ObjectID,
292        version: SequenceNumber,
293    ) -> RpcResult<IotaPastObjectResponse> {
294        let past_object_read = self
295            .inner
296            .get_past_object_read(object_id, version, true)
297            .await?;
298
299        self.past_object_read_to_response(None, past_object_read)
300            .await
301    }
302
303    async fn try_multi_get_past_objects(
304        &self,
305        past_objects: Vec<IotaGetPastObjectRequest>,
306        options: Option<IotaObjectDataOptions>,
307    ) -> RpcResult<Vec<IotaPastObjectResponse>> {
308        let mut responses = Vec::with_capacity(past_objects.len());
309
310        for request in past_objects {
311            let past_object_read = self
312                .inner
313                .get_past_object_read(request.object_id, request.version, false)
314                .await?;
315
316            responses.push(
317                self.past_object_read_to_response(options.clone(), past_object_read)
318                    .await?,
319            );
320        }
321
322        Ok(responses)
323    }
324
325    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
326        let checkpoint = self.get_latest_checkpoint().await?;
327        Ok(BigInt::from(checkpoint.sequence_number))
328    }
329
330    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
331        Ok(self.get_checkpoint(id).await?)
332    }
333
334    async fn get_checkpoints(
335        &self,
336        cursor: Option<BigInt<u64>>,
337        limit: Option<usize>,
338        descending_order: bool,
339    ) -> RpcResult<CheckpointPage> {
340        let cursor = cursor.map(BigInt::into_inner);
341        let limit = iota_json_rpc_api::validate_limit(
342            limit,
343            iota_json_rpc_api::QUERY_MAX_RESULT_LIMIT_CHECKPOINTS,
344        )
345        .map_err(IotaRpcInputError::from)?;
346
347        let mut checkpoints = self
348            .inner
349            .spawn_blocking(move |this| this.get_checkpoints(cursor, limit + 1, descending_order))
350            .await?;
351
352        let has_next_page = checkpoints.len() > limit;
353        checkpoints.truncate(limit);
354
355        let next_cursor = checkpoints.last().map(|d| d.sequence_number.into());
356
357        Ok(CheckpointPage {
358            data: checkpoints,
359            next_cursor,
360            has_next_page,
361        })
362    }
363
364    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
365        self.inner
366            .get_transaction_events_in_blocking_task(transaction_digest)
367            .await
368            .map_err(Into::into)
369    }
370
371    async fn get_protocol_config(
372        &self,
373        version: Option<BigInt<u64>>,
374    ) -> RpcResult<ProtocolConfigResponse> {
375        let chain = self.get_chain_identifier().await?.chain();
376        let version = if let Some(version) = version {
377            (*version).into()
378        } else {
379            let latest_epoch = self
380                .inner
381                .spawn_blocking(|this| this.get_latest_epoch_info_from_db())
382                .await?;
383            (latest_epoch.protocol_version as u64).into()
384        };
385
386        ProtocolConfig::get_for_version_if_supported(version, chain)
387            .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
388                ProtocolVersion::MIN.as_u64(),
389                ProtocolVersion::MAX.as_u64(),
390            ))
391            .map_err(Into::into)
392            .map(ProtocolConfigResponse::from)
393    }
394
395    async fn get_chain_identifier(&self) -> RpcResult<String> {
396        self.get_chain_identifier().await.map(|id| id.to_string())
397    }
398}
399
400impl IotaRpcModule for ReadApi {
401    fn rpc(self) -> RpcModule<Self> {
402        self.into_rpc()
403    }
404
405    fn rpc_doc_module() -> Module {
406        iota_json_rpc_api::ReadApiOpenRpc::module_doc()
407    }
408}