1use std::{collections::HashMap, sync::Arc};
6
7use async_trait::async_trait;
8use iota_json_rpc::{IotaRpcModule, error::IotaRpcInputError};
9use iota_json_rpc_api::{QUERY_MAX_RESULT_LIMIT, ReadApiServer, internal_error};
10use iota_json_rpc_types::{
11 Checkpoint, CheckpointId, CheckpointPage, IotaEvent, IotaGetPastObjectRequest, IotaObjectData,
12 IotaObjectDataOptions, IotaObjectResponse, IotaPastObjectResponse,
13 IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions, ProtocolConfigResponse,
14};
15use iota_open_rpc::Module;
16use iota_protocol_config::{ProtocolConfig, ProtocolVersion};
17use iota_types::{
18 base_types::{ObjectID, SequenceNumber},
19 digests::{ChainIdentifier, TransactionDigest},
20 error::IotaObjectResponseError,
21 iota_serde::BigInt,
22 object::{ObjectRead, PastObjectRead},
23};
24use jsonrpsee::{RpcModule, core::RpcResult};
25
26use crate::{errors::IndexerError, indexer_reader::IndexerReader, models::objects::StoredObject};
27
28#[derive(Clone)]
29pub(crate) struct ReadApi {
30 inner: IndexerReader,
31}
32
33impl ReadApi {
34 pub fn new(inner: IndexerReader) -> Self {
35 Self { inner }
36 }
37
38 async fn get_checkpoint(&self, id: CheckpointId) -> Result<Checkpoint, IndexerError> {
39 match self
40 .inner
41 .spawn_blocking(move |this| this.get_checkpoint(id))
42 .await
43 {
44 Ok(Some(epoch_info)) => Ok(epoch_info),
45 Ok(None) => Err(IndexerError::InvalidArgument(format!(
46 "Checkpoint {id:?} not found"
47 ))),
48 Err(e) => Err(e),
49 }
50 }
51
52 async fn get_latest_checkpoint(&self) -> Result<Checkpoint, IndexerError> {
53 self.inner
54 .spawn_blocking(|this| this.get_latest_checkpoint())
55 .await
56 }
57
58 async fn get_chain_identifier(&self) -> RpcResult<ChainIdentifier> {
59 let genesis_checkpoint = self.get_checkpoint(CheckpointId::SequenceNumber(0)).await?;
60 Ok(ChainIdentifier::from(genesis_checkpoint.digest))
61 }
62
63 async fn object_read_to_object_response(
64 &self,
65 object_read: ObjectRead,
66 options: IotaObjectDataOptions,
67 ) -> RpcResult<IotaObjectResponse> {
68 match object_read {
69 ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
70 IotaObjectResponseError::NotExists { object_id: id },
71 )),
72 ObjectRead::Exists(object_ref, o, layout) => {
73 let mut display_fields = None;
74 if options.show_display {
75 match self.inner.get_display_fields(&o, &layout).await {
76 Ok(rendered_fields) => display_fields = Some(rendered_fields),
77 Err(e) => {
78 return Ok(IotaObjectResponse::new(
79 Some(
80 IotaObjectData::new(object_ref, o, layout, options, None)
81 .map_err(internal_error)?,
82 ),
83 Some(IotaObjectResponseError::Display {
84 error: e.to_string(),
85 }),
86 ));
87 }
88 }
89 }
90 Ok(IotaObjectResponse::new_with_data(
91 IotaObjectData::new(object_ref, o, layout, options, display_fields)
92 .map_err(internal_error)?,
93 ))
94 }
95 ObjectRead::Deleted((object_id, version, digest)) => Ok(
96 IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
97 object_id,
98 version,
99 digest,
100 }),
101 ),
102 }
103 }
104
105 async fn past_object_read_to_response(
106 &self,
107 options: Option<IotaObjectDataOptions>,
108 past_object_read: PastObjectRead,
109 ) -> RpcResult<IotaPastObjectResponse> {
110 let options = options.unwrap_or_default();
111
112 match past_object_read {
113 PastObjectRead::ObjectNotExists(id) => Ok(IotaPastObjectResponse::ObjectNotExists(id)),
114
115 PastObjectRead::ObjectDeleted(object_ref) => {
116 Ok(IotaPastObjectResponse::ObjectDeleted(object_ref.into()))
117 }
118
119 PastObjectRead::VersionFound(object_ref, object, layout) => {
120 let display_fields = if options.show_display {
121 let rendered_fields = self
122 .inner
123 .get_display_fields(&object, &layout)
124 .await
125 .map_err(internal_error)?;
126
127 Some(rendered_fields)
128 } else {
129 None
130 };
131
132 Ok(IotaPastObjectResponse::VersionFound(
133 IotaObjectData::new(object_ref, object, layout, options, display_fields)
134 .map_err(internal_error)?,
135 ))
136 }
137
138 PastObjectRead::VersionNotFound(object_id, version) => {
139 Ok(IotaPastObjectResponse::VersionNotFound(object_id, version))
140 }
141
142 PastObjectRead::VersionTooHigh {
143 object_id,
144 asked_version,
145 latest_version,
146 } => Ok(IotaPastObjectResponse::VersionTooHigh {
147 object_id,
148 asked_version,
149 latest_version,
150 }),
151 }
152 }
153}
154
155#[async_trait]
156impl ReadApiServer for ReadApi {
157 async fn get_object(
158 &self,
159 object_id: ObjectID,
160 options: Option<IotaObjectDataOptions>,
161 ) -> RpcResult<IotaObjectResponse> {
162 let object_read = self
163 .inner
164 .get_object_read_in_blocking_task(object_id)
165 .await?;
166 self.object_read_to_object_response(object_read, options.unwrap_or_default())
167 .await
168 }
169
170 async fn multi_get_objects(
171 &self,
172 object_ids: Vec<ObjectID>,
173 options: Option<IotaObjectDataOptions>,
174 ) -> RpcResult<Vec<IotaObjectResponse>> {
175 if object_ids.len() > *QUERY_MAX_RESULT_LIMIT {
176 return Err(
177 IotaRpcInputError::SizeLimitExceeded(QUERY_MAX_RESULT_LIMIT.to_string()).into(),
178 );
179 }
180
181 let stored_objects = self
183 .inner
184 .multi_get_objects_in_blocking_task(object_ids.clone())
185 .await?;
186
187 let object_map: Arc<HashMap<ObjectID, StoredObject>> = Arc::new(
189 stored_objects
190 .into_iter()
191 .map(|obj| {
192 let object_id = ObjectID::try_from(obj.object_id.clone()).map_err(|_| {
193 IndexerError::PersistentStorageDataCorruption(format!(
194 "failed to parse ObjectID: {:?}",
195 obj.object_id
196 ))
197 })?;
198 Ok::<(ObjectID, StoredObject), IndexerError>((object_id, obj))
199 })
200 .collect::<Result<_, IndexerError>>()?,
201 );
202
203 let options = options.unwrap_or_default();
204 let resolver = self.inner.package_resolver();
205
206 let futures = object_ids.into_iter().map(|object_id| {
208 let options = options.clone();
209 let resolver = resolver.clone();
210 let maybe_stored = object_map.get(&object_id).cloned();
211 async move {
212 match maybe_stored {
213 Some(stored) => {
214 let object_read = stored.try_into_object_read(resolver).await?;
215 self.object_read_to_object_response(object_read, options)
216 .await
217 }
218 None => {
219 self.object_read_to_object_response(
220 ObjectRead::NotExists(object_id),
221 options,
222 )
223 .await
224 }
225 }
226 }
227 });
228
229 futures::future::try_join_all(futures).await
230 }
231
232 async fn get_total_transaction_blocks(&self) -> RpcResult<BigInt<u64>> {
233 let checkpoint = self.get_latest_checkpoint().await?;
234 Ok(BigInt::from(checkpoint.network_total_transactions))
235 }
236
237 async fn get_transaction_block(
238 &self,
239 digest: TransactionDigest,
240 options: Option<IotaTransactionBlockResponseOptions>,
241 ) -> RpcResult<IotaTransactionBlockResponse> {
242 let mut txn = self
243 .multi_get_transaction_blocks(vec![digest], options)
244 .await?;
245
246 let txn = txn.pop().ok_or_else(|| {
247 IndexerError::InvalidArgument(format!("Transaction {digest} not found"))
248 })?;
249
250 Ok(txn)
251 }
252
253 async fn multi_get_transaction_blocks(
254 &self,
255 digests: Vec<TransactionDigest>,
256 options: Option<IotaTransactionBlockResponseOptions>,
257 ) -> RpcResult<Vec<IotaTransactionBlockResponse>> {
258 let num_digests = digests.len();
259 if num_digests > *QUERY_MAX_RESULT_LIMIT {
260 Err(IotaRpcInputError::SizeLimitExceeded(
261 QUERY_MAX_RESULT_LIMIT.to_string(),
262 ))?
263 }
264
265 let options = options.unwrap_or_default();
266 let txns = self
267 .inner
268 .multi_get_transaction_block_response_in_blocking_task(digests, options)
269 .await?;
270
271 Ok(txns)
272 }
273
274 async fn try_get_past_object(
275 &self,
276 object_id: ObjectID,
277 version: SequenceNumber,
278 options: Option<IotaObjectDataOptions>,
279 ) -> RpcResult<IotaPastObjectResponse> {
280 let past_object_read = self
281 .inner
282 .get_past_object_read(object_id, version, false)
283 .await?;
284
285 self.past_object_read_to_response(options, past_object_read)
286 .await
287 }
288
289 async fn try_get_object_before_version(
290 &self,
291 object_id: ObjectID,
292 version: SequenceNumber,
293 ) -> RpcResult<IotaPastObjectResponse> {
294 let past_object_read = self
295 .inner
296 .get_past_object_read(object_id, version, true)
297 .await?;
298
299 self.past_object_read_to_response(None, past_object_read)
300 .await
301 }
302
303 async fn try_multi_get_past_objects(
304 &self,
305 past_objects: Vec<IotaGetPastObjectRequest>,
306 options: Option<IotaObjectDataOptions>,
307 ) -> RpcResult<Vec<IotaPastObjectResponse>> {
308 let mut responses = Vec::with_capacity(past_objects.len());
309
310 for request in past_objects {
311 let past_object_read = self
312 .inner
313 .get_past_object_read(request.object_id, request.version, false)
314 .await?;
315
316 responses.push(
317 self.past_object_read_to_response(options.clone(), past_object_read)
318 .await?,
319 );
320 }
321
322 Ok(responses)
323 }
324
325 async fn get_latest_checkpoint_sequence_number(&self) -> RpcResult<BigInt<u64>> {
326 let checkpoint = self.get_latest_checkpoint().await?;
327 Ok(BigInt::from(checkpoint.sequence_number))
328 }
329
330 async fn get_checkpoint(&self, id: CheckpointId) -> RpcResult<Checkpoint> {
331 Ok(self.get_checkpoint(id).await?)
332 }
333
334 async fn get_checkpoints(
335 &self,
336 cursor: Option<BigInt<u64>>,
337 limit: Option<usize>,
338 descending_order: bool,
339 ) -> RpcResult<CheckpointPage> {
340 let cursor = cursor.map(BigInt::into_inner);
341 let limit = iota_json_rpc_api::validate_limit(
342 limit,
343 iota_json_rpc_api::QUERY_MAX_RESULT_LIMIT_CHECKPOINTS,
344 )
345 .map_err(IotaRpcInputError::from)?;
346
347 let mut checkpoints = self
348 .inner
349 .spawn_blocking(move |this| this.get_checkpoints(cursor, limit + 1, descending_order))
350 .await?;
351
352 let has_next_page = checkpoints.len() > limit;
353 checkpoints.truncate(limit);
354
355 let next_cursor = checkpoints.last().map(|d| d.sequence_number.into());
356
357 Ok(CheckpointPage {
358 data: checkpoints,
359 next_cursor,
360 has_next_page,
361 })
362 }
363
364 async fn get_events(&self, transaction_digest: TransactionDigest) -> RpcResult<Vec<IotaEvent>> {
365 self.inner
366 .get_transaction_events_in_blocking_task(transaction_digest)
367 .await
368 .map_err(Into::into)
369 }
370
371 async fn get_protocol_config(
372 &self,
373 version: Option<BigInt<u64>>,
374 ) -> RpcResult<ProtocolConfigResponse> {
375 let chain = self.get_chain_identifier().await?.chain();
376 let version = if let Some(version) = version {
377 (*version).into()
378 } else {
379 let latest_epoch = self
380 .inner
381 .spawn_blocking(|this| this.get_latest_epoch_info_from_db())
382 .await?;
383 (latest_epoch.protocol_version as u64).into()
384 };
385
386 ProtocolConfig::get_for_version_if_supported(version, chain)
387 .ok_or(IotaRpcInputError::ProtocolVersionUnsupported(
388 ProtocolVersion::MIN.as_u64(),
389 ProtocolVersion::MAX.as_u64(),
390 ))
391 .map_err(Into::into)
392 .map(ProtocolConfigResponse::from)
393 }
394
395 async fn get_chain_identifier(&self) -> RpcResult<String> {
396 self.get_chain_identifier().await.map(|id| id.to_string())
397 }
398}
399
400impl IotaRpcModule for ReadApi {
401 fn rpc(self) -> RpcModule<Self> {
402 self.into_rpc()
403 }
404
405 fn rpc_doc_module() -> Module {
406 iota_json_rpc_api::ReadApiOpenRpc::module_doc()
407 }
408}