iota_indexer/apis/
read_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use iota_json_rpc::{IotaRpcModule, error::IotaRpcInputError};
9use iota_json_rpc_api::{QUERY_MAX_RESULT_LIMIT, ReadApiServer, internal_error};
10use iota_json_rpc_types::{
11    Checkpoint, CheckpointId, CheckpointPage, IotaEvent, IotaGetPastObjectRequest, IotaObjectData,
12    IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
13    IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions, ProtocolConfigResponse,
14};
15use iota_open_rpc::Module;
16use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
17use iota_types::{
18    base_types::{ObjectID, SequenceNumber},
19    digests::{ChainIdentifier, TransactionDigest},
20    error::IotaObjectResponseError,
21    iota_serde::BigInt,
22    object::{ObjectRead, PastObjectRead},
23};
24use jsonrpsee::{RpcModule, core::RpcResult};
25
26use crate::{errors::IndexerError, indexer_reader::IndexerReader, models::objects::StoredObject};
27
28#[derive(Clone)]
29pub(crate) struct ReadApi {
30    inner: IndexerReader,
31}
32
33impl ReadApi {
34    pub fn new(inner: IndexerReader) -> Self {
35        Self { inner }
36    }
37
38    async fn get_checkpoint(&self, id: CheckpointId) -> Result<Checkpoint, IndexerError> {
39        match self
40            .inner
41            .spawn_blocking(move |this| this.get_checkpoint(id))
42            .await
43        {
44            Ok(Some(epoch_info)) => Ok(epoch_info),
45            Ok(None) => Err(IndexerError::InvalidArgument(format!(
46                "Checkpoint {id:?} not found"
47            ))),
48            Err(e) => Err(e),
49        }
50    }
51
52    async fn get_latest_checkpoint(&self) -> Result<Checkpoint, IndexerError> {
53        self.inner
54            .spawn_blocking(|this| this.get_latest_checkpoint())
55            .await
56    }
57
58    async fn get_chain_identifier(&self) -> RpcResult<ChainIdentifier> {
59        Ok(self.inner.get_chain_identifier_in_blocking_task().await?)
60    }
61
62    async fn object_read_to_object_response(
63        &self,
64        object_read: ObjectRead,
65        options: IotaObjectDataOptions,
66    ) -> RpcResult<IotaObjectResponse> {
67        match object_read {
68            ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
69                IotaObjectResponseError::NotExists { object_id: id },
70            )),
71            ObjectRead::Exists(object_ref, o, layout) => {
72                let mut display_fields = None;
73                if options.show_display {
74                    match self.inner.get_display_fields(&o, &layout).await {
75                        Ok(rendered_fields) => display_fields = Some(rendered_fields),
76                        Err(e) => {
77                            return Ok(IotaObjectResponse::new(
78                                Some(
79                                    IotaObjectData::new(object_ref, o, layout, options, None)
80                                        .map_err(internal_error)?,
81                                ),
82                                Some(IotaObjectResponseError::Display {
83                                    error: e.to_string(),
84                                }),
85                            ));
86                        }
87                    }
88                }
89                Ok(IotaObjectResponse::new_with_data(
90                    IotaObjectData::new(object_ref, o, layout, options, display_fields)
91                        .map_err(internal_error)?,
92                ))
93            }
94            ObjectRead::Deleted((object_id, version, digest)) => Ok(
95                IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
96                    object_id,
97                    version,
98                    digest,
99                }),
100            ),
101        }
102    }
103
104    async fn past_object_read_to_response(
105        &self,
106        options: Option<IotaObjectDataOptions>,
107        past_object_read: PastObjectRead,
108    ) -> RpcResult<IotaPastObjectResponse> {
109        let options = options.unwrap_or_default();
110
111        match past_object_read {
112            PastObjectRead::ObjectNotExists(id) => Ok(IotaPastObjectResponse::ObjectNotExists(id)),
113
114            PastObjectRead::ObjectDeleted(object_ref) => {
115                Ok(IotaPastObjectResponse::ObjectDeleted(object_ref.into()))
116            }
117
118            PastObjectRead::VersionFound(object_ref, object, layout) => {
119                let display_fields = if options.show_display {
120                    let rendered_fields = self
121                        .inner
122                        .get_display_fields(&object, &layout)
123                        .await
124                        .map_err(internal_error)?;
125
126                    Some(rendered_fields)
127                } else {
128                    None
129                };
130
131                Ok(IotaPastObjectResponse::VersionFound(
132                    IotaObjectData::new(object_ref, object, layout, options, display_fields)
133                        .map_err(internal_error)?,
134                ))
135            }
136
137            PastObjectRead::VersionNotFound(object_id, version) => {
138                Ok(IotaPastObjectResponse::VersionNotFound(object_id, version))
139            }
140
141            PastObjectRead::VersionTooHigh {
142                object_id,
143                asked_version,
144                latest_version,
145            } => Ok(IotaPastObjectResponse::VersionTooHigh {
146                object_id,
147                asked_version,
148                latest_version,
149            }),
150        }
151    }
152}
153
154#[async_trait]
155impl ReadApiServer for ReadApi {
156    async fn get_object(
157        &self,
158        object_id: ObjectID,
159        options: Option<IotaObjectDataOptions>,
160    ) -> RpcResult<IotaObjectResponse> {
161        let object_read = self
162            .inner
163            .get_object_read_in_blocking_task(object_id)
164            .await?;
165        self.object_read_to_object_response(object_read, options.unwrap_or_default())
166            .await
167    }
168
169    async fn multi_get_objects(
170        &self,
171        object_ids: Vec<ObjectID>,
172        options: Option<IotaObjectDataOptions>,
173    ) -> RpcResult<Vec<IotaObjectResponse>> {
174        if object_ids.len() > *QUERY_MAX_RESULT_LIMIT {
175            return Err(
176                IotaRpcInputError::SizeLimitExceeded(QUERY_MAX_RESULT_LIMIT.to_string()).into(),
177            );
178        }
179
180        // Doesn't take care of missing objects.
181        let stored_objects = self
182            .inner
183            .multi_get_objects_in_blocking_task(object_ids.clone())
184            .await?;
185
186        // Map the returned `StoredObject`s to `ObjectID`
187        let object_map: Arc<HashMap<ObjectID, StoredObject>> = Arc::new(
188            stored_objects
189                .into_iter()
190                .map(|obj| {
191                    let object_id = ObjectID::try_from(obj.object_id.clone()).map_err(|_| {
192                        IndexerError::PersistentStorageDataCorruption(format!(
193                            "failed to parse ObjectID: {:?}",
194                            obj.object_id
195                        ))
196                    })?;
197                    Ok::<(ObjectID, StoredObject), IndexerError>((object_id, obj))
198                })
199                .collect::<Result<_, IndexerError>>()?,
200        );
201
202        let options = options.unwrap_or_default();
203        let resolver = self.inner.package_resolver();
204
205        // Create a future for each requested object id
206        let futures = object_ids.into_iter().map(|object_id| {
207            let options = options.clone();
208            let resolver = resolver.clone();
209            let maybe_stored = object_map.get(&object_id).cloned();
210            async move {
211                match maybe_stored {
212                    Some(stored) => {
213                        let object_read = stored.try_into_object_read(resolver).await?;
214                        self.object_read_to_object_response(object_read, options)
215                            .await
216                    }
217                    None => {
218                        self.object_read_to_object_response(
219                            ObjectRead::NotExists(object_id),
220                            options,
221                        )
222                        .await
223                    }
224                }
225            }
226        });
227
228        futures::future::try_join_all(futures).await
229    }
230
231    async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
232        let checkpoint = self.get_latest_checkpoint().await?;
233        Ok(BigInt::from(checkpoint.network_total_transactions))
234    }
235
236    async fn get_transaction_block(
237        &self,
238        digest: TransactionDigest,
239        options: Option<IotaTransactionBlockResponseOptions>,
240    ) -> RpcResult<IotaTransactionBlockResponse> {
241        let mut txn = self
242            .multi_get_transaction_blocks(vec![digest], options)
243            .await?;
244
245        let txn = txn.pop().ok_or_else(|| {
246            IndexerError::InvalidArgument(format!("Transaction {digest} not found"))
247        })?;
248
249        Ok(txn)
250    }
251
252    async fn multi_get_transaction_blocks(
253        &self,
254        digests: Vec<TransactionDigest>,
255        options: Option<IotaTransactionBlockResponseOptions>,
256    ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
257        let num_digests = digests.len();
258        if num_digests > *QUERY_MAX_RESULT_LIMIT {
259            Err(IotaRpcInputError::SizeLimitExceeded(
260                QUERY_MAX_RESULT_LIMIT.to_string(),
261            ))?
262        }
263
264        let options = options.unwrap_or_default();
265        let txns = self
266            .inner
267            .multi_get_transaction_block_response_in_blocking_task(digests, options)
268            .await?;
269
270        Ok(txns)
271    }
272
273    async fn try_get_past_object(
274        &self,
275        object_id: ObjectID,
276        version: SequenceNumber,
277        options: Option<IotaObjectDataOptions>,
278    ) -> RpcResult<IotaPastObjectResponse> {
279        let past_object_read = self
280            .inner
281            .get_past_object_read(object_id, version, false)
282            .await?;
283
284        self.past_object_read_to_response(options, past_object_read)
285            .await
286    }
287
288    async fn try_get_object_before_version(
289        &self,
290        object_id: ObjectID,
291        version: SequenceNumber,
292    ) -> RpcResult<IotaPastObjectResponse> {
293        let past_object_read = self
294            .inner
295            .get_past_object_read(object_id, version, true)
296            .await?;
297
298        self.past_object_read_to_response(None, past_object_read)
299            .await
300    }
301
302    async fn try_multi_get_past_objects(
303        &self,
304        past_objects: Vec<IotaGetPastObjectRequest>,
305        options: Option<IotaObjectDataOptions>,
306    ) -> RpcResult<Vec<IotaPastObjectResponse>> {
307        let mut responses = Vec::with_capacity(past_objects.len());
308
309        for request in past_objects {
310            let past_object_read = self
311                .inner
312                .get_past_object_read(request.object_id, request.version, false)
313                .await?;
314
315            responses.push(
316                self.past_object_read_to_response(options.clone(), past_object_read)
317                    .await?,
318            );
319        }
320
321        Ok(responses)
322    }
323
324    async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
325        let checkpoint = self.get_latest_checkpoint().await?;
326        Ok(BigInt::from(checkpoint.sequence_number))
327    }
328
329    async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
330        Ok(self.get_checkpoint(id).await?)
331    }
332
333    async fn get_checkpoints(
334        &self,
335        cursor: Option<BigInt<u64>>,
336        limit: Option<usize>,
337        descending_order: bool,
338    ) -> RpcResult<CheckpointPage> {
339        let cursor = cursor.map(BigInt::into_inner);
340        let limit = iota_json_rpc_api::validate_limit(
341            limit,
342            iota_json_rpc_api::QUERY_MAX_RESULT_LIMIT_CHECKPOINTS,
343        )
344        .map_err(IotaRpcInputError::from)?;
345
346        let mut checkpoints = self
347            .inner
348            .spawn_blocking(move |this| this.get_checkpoints(cursor, limit + 1, descending_order))
349            .await?;
350
351        let has_next_page = checkpoints.len() > limit;
352        checkpoints.truncate(limit);
353
354        let next_cursor = checkpoints.last().map(|d| d.sequence_number.into());
355
356        Ok(CheckpointPage {
357            data: checkpoints,
358            next_cursor,
359            has_next_page,
360        })
361    }
362
363    async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
364        self.inner
365            .get_transaction_events_in_blocking_task(transaction_digest)
366            .await
367            .map_err(Into::into)
368    }
369
370    async fn get_protocol_config(
371        &self,
372        version: Option<BigInt<u64>>,
373    ) -> RpcResult<ProtocolConfigResponse> {
374        let chain = self.get_chain_identifier().await?.chain();
375        let version = if let Some(version) = version {
376            (*version).into()
377        } else {
378            let latest_epoch = self
379                .inner
380                .spawn_blocking(|this| this.get_latest_epoch_info_from_db())
381                .await?;
382            (latest_epoch.protocol_version as u64).into()
383        };
384
385        ProtocolConfig::get_for_version_if_supported(version, chain)
386            .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
387                ProtocolVersion::MIN.as_u64(),
388                ProtocolVersion::MAX.as_u64(),
389            ))
390            .map_err(Into::into)
391            .map(ProtocolConfigResponse::from)
392    }
393
394    async fn get_chain_identifier(&self) -> RpcResult<String> {
395        self.get_chain_identifier().await.map(|id| id.to_string())
396    }
397}
398
399impl IotaRpcModule for ReadApi {
400    fn rpc(self) -> RpcModule<Self> {
401        self.into_rpc()
402    }
403
404    fn rpc_doc_module() -> Module {
405        iota_json_rpc_api::ReadApiOpenRpc::module_doc()
406    }
407}