1use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use iota_json_rpc::{IotaRpcModule, error::IotaRpcInputError};
9use iota_json_rpc_api::{QUERY_MAX_RESULT_LIMIT, ReadApiServer, internal_error};
10use iota_json_rpc_types::{
11 Checkpoint, CheckpointId, CheckpointPage, IotaEvent, IotaGetPastObjectRequest, IotaObjectData,
12 IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
13 IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions, ProtocolConfigResponse,
14};
15use iota_open_rpc::Module;
16use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
17use iota_types::{
18 base_types::{ObjectID, SequenceNumber},
19 digests::{ChainIdentifier, TransactionDigest},
20 error::IotaObjectResponseError,
21 iota_serde::BigInt,
22 object::{ObjectRead, PastObjectRead},
23};
24use jsonrpsee::{RpcModule, core::RpcResult};
25
26use crate::{errors::IndexerError, indexer_reader::IndexerReader, models::objects::StoredObject};
27
28#[derive(Clone)]
29pub(crate) struct ReadApi {
30 inner: IndexerReader,
31}
32
33impl ReadApi {
34 pub fn new(inner: IndexerReader) -> Self {
35 Self { inner }
36 }
37
38 async fn get_checkpoint(&self, id: CheckpointId) -> Result<Checkpoint, IndexerError> {
39 match self
40 .inner
41 .spawn_blocking(move |this| this.get_checkpoint(id))
42 .await
43 {
44 Ok(Some(epoch_info)) => Ok(epoch_info),
45 Ok(None) => Err(IndexerError::InvalidArgument(format!(
46 "Checkpoint {id:?} not found"
47 ))),
48 Err(e) => Err(e),
49 }
50 }
51
52 async fn get_latest_checkpoint(&self) -> Result<Checkpoint, IndexerError> {
53 self.inner
54 .spawn_blocking(|this| this.get_latest_checkpoint())
55 .await
56 }
57
58 async fn get_chain_identifier(&self) -> RpcResult<ChainIdentifier> {
59 Ok(self.inner.get_chain_identifier_in_blocking_task().await?)
60 }
61
62 async fn object_read_to_object_response(
63 &self,
64 object_read: ObjectRead,
65 options: IotaObjectDataOptions,
66 ) -> RpcResult<IotaObjectResponse> {
67 match object_read {
68 ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
69 IotaObjectResponseError::NotExists { object_id: id },
70 )),
71 ObjectRead::Exists(object_ref, o, layout) => {
72 let mut display_fields = None;
73 if options.show_display {
74 match self.inner.get_display_fields(&o, &layout).await {
75 Ok(rendered_fields) => display_fields = Some(rendered_fields),
76 Err(e) => {
77 return Ok(IotaObjectResponse::new(
78 Some(
79 IotaObjectData::new(object_ref, o, layout, options, None)
80 .map_err(internal_error)?,
81 ),
82 Some(IotaObjectResponseError::Display {
83 error: e.to_string(),
84 }),
85 ));
86 }
87 }
88 }
89 Ok(IotaObjectResponse::new_with_data(
90 IotaObjectData::new(object_ref, o, layout, options, display_fields)
91 .map_err(internal_error)?,
92 ))
93 }
94 ObjectRead::Deleted((object_id, version, digest)) => Ok(
95 IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
96 object_id,
97 version,
98 digest,
99 }),
100 ),
101 }
102 }
103
104 async fn past_object_read_to_response(
105 &self,
106 options: Option<IotaObjectDataOptions>,
107 past_object_read: PastObjectRead,
108 ) -> RpcResult<IotaPastObjectResponse> {
109 let options = options.unwrap_or_default();
110
111 match past_object_read {
112 PastObjectRead::ObjectNotExists(id) => Ok(IotaPastObjectResponse::ObjectNotExists(id)),
113
114 PastObjectRead::ObjectDeleted(object_ref) => {
115 Ok(IotaPastObjectResponse::ObjectDeleted(object_ref.into()))
116 }
117
118 PastObjectRead::VersionFound(object_ref, object, layout) => {
119 let display_fields = if options.show_display {
120 let rendered_fields = self
121 .inner
122 .get_display_fields(&object, &layout)
123 .await
124 .map_err(internal_error)?;
125
126 Some(rendered_fields)
127 } else {
128 None
129 };
130
131 Ok(IotaPastObjectResponse::VersionFound(
132 IotaObjectData::new(object_ref, object, layout, options, display_fields)
133 .map_err(internal_error)?,
134 ))
135 }
136
137 PastObjectRead::VersionNotFound(object_id, version) => {
138 Ok(IotaPastObjectResponse::VersionNotFound(object_id, version))
139 }
140
141 PastObjectRead::VersionTooHigh {
142 object_id,
143 asked_version,
144 latest_version,
145 } => Ok(IotaPastObjectResponse::VersionTooHigh {
146 object_id,
147 asked_version,
148 latest_version,
149 }),
150 }
151 }
152}
153
154#[async_trait]
155impl ReadApiServer for ReadApi {
156 async fn get_object(
157 &self,
158 object_id: ObjectID,
159 options: Option<IotaObjectDataOptions>,
160 ) -> RpcResult<IotaObjectResponse> {
161 let object_read = self
162 .inner
163 .get_object_read_in_blocking_task(object_id)
164 .await?;
165 self.object_read_to_object_response(object_read, options.unwrap_or_default())
166 .await
167 }
168
169 async fn multi_get_objects(
170 &self,
171 object_ids: Vec<ObjectID>,
172 options: Option<IotaObjectDataOptions>,
173 ) -> RpcResult<Vec<IotaObjectResponse>> {
174 if object_ids.len() > *QUERY_MAX_RESULT_LIMIT {
175 return Err(
176 IotaRpcInputError::SizeLimitExceeded(QUERY_MAX_RESULT_LIMIT.to_string()).into(),
177 );
178 }
179
180 let stored_objects = self
182 .inner
183 .multi_get_objects_in_blocking_task(object_ids.clone())
184 .await?;
185
186 let object_map: Arc<HashMap<ObjectID, StoredObject>> = Arc::new(
188 stored_objects
189 .into_iter()
190 .map(|obj| {
191 let object_id = ObjectID::try_from(obj.object_id.clone()).map_err(|_| {
192 IndexerError::PersistentStorageDataCorruption(format!(
193 "failed to parse ObjectID: {:?}",
194 obj.object_id
195 ))
196 })?;
197 Ok::<(ObjectID, StoredObject), IndexerError>((object_id, obj))
198 })
199 .collect::<Result<_, IndexerError>>()?,
200 );
201
202 let options = options.unwrap_or_default();
203 let resolver = self.inner.package_resolver();
204
205 let futures = object_ids.into_iter().map(|object_id| {
207 let options = options.clone();
208 let resolver = resolver.clone();
209 let maybe_stored = object_map.get(&object_id).cloned();
210 async move {
211 match maybe_stored {
212 Some(stored) => {
213 let object_read = stored.try_into_object_read(resolver).await?;
214 self.object_read_to_object_response(object_read, options)
215 .await
216 }
217 None => {
218 self.object_read_to_object_response(
219 ObjectRead::NotExists(object_id),
220 options,
221 )
222 .await
223 }
224 }
225 }
226 });
227
228 futures::future::try_join_all(futures).await
229 }
230
231 async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
232 let checkpoint = self.get_latest_checkpoint().await?;
233 Ok(BigInt::from(checkpoint.network_total_transactions))
234 }
235
236 async fn get_transaction_block(
237 &self,
238 digest: TransactionDigest,
239 options: Option<IotaTransactionBlockResponseOptions>,
240 ) -> RpcResult<IotaTransactionBlockResponse> {
241 let mut txn = self
242 .multi_get_transaction_blocks(vec![digest], options)
243 .await?;
244
245 let txn = txn.pop().ok_or_else(|| {
246 IndexerError::InvalidArgument(format!("Transaction {digest} not found"))
247 })?;
248
249 Ok(txn)
250 }
251
252 async fn multi_get_transaction_blocks(
253 &self,
254 digests: Vec<TransactionDigest>,
255 options: Option<IotaTransactionBlockResponseOptions>,
256 ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
257 let num_digests = digests.len();
258 if num_digests > *QUERY_MAX_RESULT_LIMIT {
259 Err(IotaRpcInputError::SizeLimitExceeded(
260 QUERY_MAX_RESULT_LIMIT.to_string(),
261 ))?
262 }
263
264 let options = options.unwrap_or_default();
265 let txns = self
266 .inner
267 .multi_get_transaction_block_response_in_blocking_task(digests, options)
268 .await?;
269
270 Ok(txns)
271 }
272
273 async fn try_get_past_object(
274 &self,
275 object_id: ObjectID,
276 version: SequenceNumber,
277 options: Option<IotaObjectDataOptions>,
278 ) -> RpcResult<IotaPastObjectResponse> {
279 let past_object_read = self
280 .inner
281 .get_past_object_read(object_id, version, false)
282 .await?;
283
284 self.past_object_read_to_response(options, past_object_read)
285 .await
286 }
287
288 async fn try_get_object_before_version(
289 &self,
290 object_id: ObjectID,
291 version: SequenceNumber,
292 ) -> RpcResult<IotaPastObjectResponse> {
293 let past_object_read = self
294 .inner
295 .get_past_object_read(object_id, version, true)
296 .await?;
297
298 self.past_object_read_to_response(None, past_object_read)
299 .await
300 }
301
302 async fn try_multi_get_past_objects(
303 &self,
304 past_objects: Vec<IotaGetPastObjectRequest>,
305 options: Option<IotaObjectDataOptions>,
306 ) -> RpcResult<Vec<IotaPastObjectResponse>> {
307 let mut responses = Vec::with_capacity(past_objects.len());
308
309 for request in past_objects {
310 let past_object_read = self
311 .inner
312 .get_past_object_read(request.object_id, request.version, false)
313 .await?;
314
315 responses.push(
316 self.past_object_read_to_response(options.clone(), past_object_read)
317 .await?,
318 );
319 }
320
321 Ok(responses)
322 }
323
324 async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
325 let checkpoint = self.get_latest_checkpoint().await?;
326 Ok(BigInt::from(checkpoint.sequence_number))
327 }
328
329 async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
330 Ok(self.get_checkpoint(id).await?)
331 }
332
333 async fn get_checkpoints(
334 &self,
335 cursor: Option<BigInt<u64>>,
336 limit: Option<usize>,
337 descending_order: bool,
338 ) -> RpcResult<CheckpointPage> {
339 let cursor = cursor.map(BigInt::into_inner);
340 let limit = iota_json_rpc_api::validate_limit(
341 limit,
342 iota_json_rpc_api::QUERY_MAX_RESULT_LIMIT_CHECKPOINTS,
343 )
344 .map_err(IotaRpcInputError::from)?;
345
346 let mut checkpoints = self
347 .inner
348 .spawn_blocking(move |this| this.get_checkpoints(cursor, limit + 1, descending_order))
349 .await?;
350
351 let has_next_page = checkpoints.len() > limit;
352 checkpoints.truncate(limit);
353
354 let next_cursor = checkpoints.last().map(|d| d.sequence_number.into());
355
356 Ok(CheckpointPage {
357 data: checkpoints,
358 next_cursor,
359 has_next_page,
360 })
361 }
362
363 async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
364 self.inner
365 .get_transaction_events_in_blocking_task(transaction_digest)
366 .await
367 .map_err(Into::into)
368 }
369
370 async fn get_protocol_config(
371 &self,
372 version: Option<BigInt<u64>>,
373 ) -> RpcResult<ProtocolConfigResponse> {
374 let chain = self.get_chain_identifier().await?.chain();
375 let version = if let Some(version) = version {
376 (*version).into()
377 } else {
378 let latest_epoch = self
379 .inner
380 .spawn_blocking(|this| this.get_latest_epoch_info_from_db())
381 .await?;
382 (latest_epoch.protocol_version as u64).into()
383 };
384
385 ProtocolConfig::get_for_version_if_supported(version, chain)
386 .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
387 ProtocolVersion::MIN.as_u64(),
388 ProtocolVersion::MAX.as_u64(),
389 ))
390 .map_err(Into::into)
391 .map(ProtocolConfigResponse::from)
392 }
393
394 async fn get_chain_identifier(&self) -> RpcResult<String> {
395 self.get_chain_identifier().await.map(|id| id.to_string())
396 }
397}
398
399impl IotaRpcModule for ReadApi {
400 fn rpc(self) -> RpcModule<Self> {
401 self.into_rpc()
402 }
403
404 fn rpc_doc_module() -> Module {
405 iota_json_rpc_api::ReadApiOpenRpc::module_doc()
406 }
407}