1use std::collections::HashMap;
6
7use async_trait::async_trait;
8use iota_json_rpc::IotaRpcModule;
9use iota_json_rpc_api::{IndexerApiServer, cap_page_limit, error_object_from_rpc, internal_error};
10use iota_json_rpc_types::{
11 DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectData, IotaObjectDataFilter,
12 IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
13 IotaTransactionBlockResponseQuery, ObjectsPage, Page, TransactionBlocksPage, TransactionFilter,
14};
15use iota_names::{
16 IotaNamesNft, IotaNamesRegistration, config::IotaNamesConfig, domain::Domain,
17 error::IotaNamesError, registry::NameRecord,
18};
19use iota_open_rpc::Module;
20use iota_types::{
21 TypeTag,
22 base_types::{IotaAddress, ObjectID},
23 digests::TransactionDigest,
24 dynamic_field::{DynamicFieldName, Field},
25 error::IotaObjectResponseError,
26 event::EventID,
27 object::ObjectRead,
28};
29use jsonrpsee::{
30 PendingSubscriptionSink, RpcModule,
31 core::{RpcResult, SubscriptionResult, client::Error as RpcClientError},
32};
33use tap::TapFallible;
34
35use crate::{errors::IndexerError, indexer_reader::IndexerReader};
36
37pub(crate) struct IndexerApi {
38 inner: IndexerReader,
39 iota_names_config: IotaNamesConfig,
40}
41
42impl IndexerApi {
43 pub fn new(inner: IndexerReader, iota_names_config: IotaNamesConfig) -> Self {
44 Self {
45 inner,
46 iota_names_config,
47 }
48 }
49
50 async fn get_owned_objects_internal(
51 &self,
52 address: IotaAddress,
53 query: Option<IotaObjectResponseQuery>,
54 cursor: Option<ObjectID>,
55 limit: usize,
56 ) -> RpcResult<ObjectsPage> {
57 let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
58 let options = options.unwrap_or_default();
59 let objects = self
60 .inner
61 .get_owned_objects_in_blocking_task(address, filter, cursor, limit + 1)
62 .await?;
63
64 let mut object_futures = vec![];
65 for object in objects {
66 object_futures.push(tokio::task::spawn(
67 object.try_into_object_read(self.inner.package_resolver()),
68 ));
69 }
70 let mut objects = futures::future::try_join_all(object_futures)
71 .await
72 .map_err(|e| {
73 tracing::error!("Error joining object read futures.");
74 RpcClientError::Custom(format!("Error joining object read futures. {e}"))
75 })
76 .map_err(error_object_from_rpc)?
77 .into_iter()
78 .collect::<Result<Vec<_>, _>>()
79 .tap_err(|e| tracing::error!("Error converting object to object read: {e}"))?;
80 let has_next_page = objects.len() > limit;
81 objects.truncate(limit);
82
83 let next_cursor = objects.last().map(|o_read| o_read.object_id());
84 let construct_response_tasks = objects.into_iter().map(|object| {
85 tokio::task::spawn(construct_object_response(
86 object,
87 self.inner.clone(),
88 options.clone(),
89 ))
90 });
91 let data = futures::future::try_join_all(construct_response_tasks)
92 .await
93 .map_err(internal_error)?
94 .into_iter()
95 .collect::<Result<Vec<_>, _>>()
96 .map_err(internal_error)?;
97
98 Ok(Page {
99 data,
100 next_cursor,
101 has_next_page,
102 })
103 }
104
105 async fn get_dynamic_field_object(
106 &self,
107 parent_object_id: ObjectID,
108 name: DynamicFieldName,
109 options: Option<IotaObjectDataOptions>,
110 ) -> RpcResult<IotaObjectResponse> {
111 let name_bcs_value = self.inner.bcs_name_from_dynamic_field_name(&name).await?;
112
113 let id = iota_types::dynamic_field::derive_dynamic_field_id(
115 parent_object_id,
116 &name.type_,
117 &name_bcs_value,
118 )
119 .map_err(internal_error)?;
120
121 let options = options.unwrap_or_default();
122
123 match self.inner.get_object_read_in_blocking_task(id).await? {
124 ObjectRead::NotExists(_) | ObjectRead::Deleted(_) => {}
125 ObjectRead::Exists(object_ref, o, layout) => {
126 return Ok(IotaObjectResponse::new_with_data(
127 IotaObjectData::new(object_ref, o, layout, options, None)
128 .map_err(internal_error)?,
129 ));
130 }
131 }
132
133 let dynamic_object_field_struct =
135 iota_types::dynamic_field::DynamicFieldInfo::dynamic_object_field_wrapper(name.type_);
136 let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
137 let dynamic_object_field_id = iota_types::dynamic_field::derive_dynamic_field_id(
138 parent_object_id,
139 &dynamic_object_field_type,
140 &name_bcs_value,
141 )
142 .map_err(internal_error)?;
143
144 match self
145 .inner
146 .get_object_read_in_blocking_task(dynamic_object_field_id)
147 .await?
148 {
149 ObjectRead::NotExists(_) | ObjectRead::Deleted(_) => {}
150 ObjectRead::Exists(object_ref, o, layout) => {
151 return Ok(IotaObjectResponse::new_with_data(
152 IotaObjectData::new(object_ref, o, layout, options, None)
153 .map_err(internal_error)?,
154 ));
155 }
156 }
157
158 Ok(IotaObjectResponse::new_with_error(
159 IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
160 ))
161 }
162}
163
164async fn construct_object_response(
165 obj: ObjectRead,
166 reader: IndexerReader,
167 options: IotaObjectDataOptions,
168) -> anyhow::Result<IotaObjectResponse> {
169 match obj {
170 ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
171 IotaObjectResponseError::NotExists { object_id: id },
172 )),
173 ObjectRead::Exists(object_ref, o, layout) => {
174 if options.show_display {
175 match reader.get_display_fields(&o, &layout).await {
176 Ok(rendered_fields) => Ok(IotaObjectResponse::new_with_data(
177 IotaObjectData::new(object_ref, o, layout, options, rendered_fields)?,
178 )),
179 Err(e) => Ok(IotaObjectResponse::new(
180 Some(IotaObjectData::new(object_ref, o, layout, options, None)?),
181 Some(IotaObjectResponseError::Display {
182 error: e.to_string(),
183 }),
184 )),
185 }
186 } else {
187 Ok(IotaObjectResponse::new_with_data(IotaObjectData::new(
188 object_ref, o, layout, options, None,
189 )?))
190 }
191 }
192 ObjectRead::Deleted((object_id, version, digest)) => Ok(
193 IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
194 object_id,
195 version,
196 digest,
197 }),
198 ),
199 }
200}
201
202#[async_trait]
203impl IndexerApiServer for IndexerApi {
204 async fn get_owned_objects(
205 &self,
206 address: IotaAddress,
207 query: Option<IotaObjectResponseQuery>,
208 cursor: Option<ObjectID>,
209 limit: Option<usize>,
210 ) -> RpcResult<ObjectsPage> {
211 let limit = cap_page_limit(limit);
212 if limit == 0 {
213 return Ok(ObjectsPage::empty());
214 }
215 self.get_owned_objects_internal(address, query, cursor, limit)
216 .await
217 }
218
219 async fn query_transaction_blocks(
220 &self,
221 query: IotaTransactionBlockResponseQuery,
222 cursor: Option<TransactionDigest>,
223 limit: Option<usize>,
224 descending_order: Option<bool>,
225 ) -> RpcResult<TransactionBlocksPage> {
226 let limit = cap_page_limit(limit);
227 if limit == 0 {
228 return Ok(TransactionBlocksPage::empty());
229 }
230 let mut results = self
231 .inner
232 .query_transaction_blocks_in_blocking_task(
233 query.filter,
234 query.options.unwrap_or_default(),
235 cursor,
236 limit + 1,
237 descending_order.unwrap_or(false),
238 )
239 .await?;
240
241 let has_next_page = results.len() > limit;
242 results.truncate(limit);
243 let next_cursor = results.last().map(|o| o.digest);
244 Ok(Page {
245 data: results,
246 next_cursor,
247 has_next_page,
248 })
249 }
250
251 async fn query_events(
252 &self,
253 query: EventFilter,
254 cursor: Option<EventID>,
256 limit: Option<usize>,
257 descending_order: Option<bool>,
258 ) -> RpcResult<EventPage> {
259 let limit = cap_page_limit(limit);
260 if limit == 0 {
261 return Ok(EventPage::empty());
262 }
263 let descending_order = descending_order.unwrap_or(false);
264 let mut results = self
265 .inner
266 .query_events_in_blocking_task(query, cursor, limit + 1, descending_order)
267 .await?;
268
269 let has_next_page = results.len() > limit;
270 results.truncate(limit);
271 let next_cursor = results.last().map(|o| o.id);
272 Ok(Page {
273 data: results,
274 next_cursor,
275 has_next_page,
276 })
277 }
278
279 async fn get_dynamic_fields(
280 &self,
281 parent_object_id: ObjectID,
282 cursor: Option<ObjectID>,
283 limit: Option<usize>,
284 ) -> RpcResult<DynamicFieldPage> {
285 let limit = cap_page_limit(limit);
286 if limit == 0 {
287 return Ok(DynamicFieldPage::empty());
288 }
289 let mut results = self
290 .inner
291 .get_dynamic_fields_in_blocking_task(parent_object_id, cursor, limit + 1)
292 .await?;
293
294 let has_next_page = results.len() > limit;
295 results.truncate(limit);
296 let next_cursor = results.last().map(|o| o.object_id);
297 Ok(Page {
298 data: results.into_iter().map(Into::into).collect(),
299 next_cursor,
300 has_next_page,
301 })
302 }
303
304 async fn get_dynamic_field_object(
305 &self,
306 parent_object_id: ObjectID,
307 name: DynamicFieldName,
308 ) -> RpcResult<IotaObjectResponse> {
309 self.get_dynamic_field_object(
310 parent_object_id,
311 name,
312 Some(IotaObjectDataOptions::full_content()),
313 )
314 .await
315 }
316
317 async fn get_dynamic_field_object_v2(
318 &self,
319 parent_object_id: ObjectID,
320 name: DynamicFieldName,
321 options: Option<IotaObjectDataOptions>,
322 ) -> RpcResult<IotaObjectResponse> {
323 self.get_dynamic_field_object(parent_object_id, name, options)
324 .await
325 }
326
327 fn subscribe_event(
328 &self,
329 _sink: PendingSubscriptionSink,
330 _filter: EventFilter,
331 ) -> SubscriptionResult {
332 Err("empty subscription".into())
333 }
334
335 fn subscribe_transaction(
336 &self,
337 _sink: PendingSubscriptionSink,
338 _filter: TransactionFilter,
339 ) -> SubscriptionResult {
340 Err("empty subscription".into())
341 }
342
343 async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
344 let domain: Domain = name.parse().map_err(IndexerError::IotaNames)?;
345
346 let record_id = self.iota_names_config.record_field_id(&domain);
348
349 let mut requests = vec![record_id];
351
352 let parent_record_id = domain.parent().map(|parent_domain| {
355 let parent_record_id = self.iota_names_config.record_field_id(&parent_domain);
356 requests.push(parent_record_id);
357 parent_record_id
358 });
359
360 let mut domain_object_map = self
363 .inner
364 .multi_get_objects_in_blocking_task(requests)
365 .await?
366 .into_iter()
367 .map(iota_types::object::Object::try_from)
368 .try_fold(HashMap::new(), |mut map, res| {
369 let obj = res?;
370 map.insert(obj.id(), obj.try_into()?);
371 Ok::<HashMap<ObjectID, NameRecord>, IndexerError>(map)
372 })?;
373
374 let Some(name_record) = domain_object_map.remove(&record_id) else {
376 return Ok(None);
377 };
378
379 let current_timestamp = self
381 .inner
382 .get_latest_checkpoint_timestamp_ms_in_blocking_task()
383 .await?;
384
385 if !name_record.is_leaf_record() {
387 return if !name_record.is_node_expired(current_timestamp) {
388 Ok(Some(name_record.into()))
389 } else {
390 Err(IndexerError::IotaNames(IotaNamesError::NameExpired).into())
391 };
392 } else {
393 let parent_record_id = parent_record_id.expect("leaf record should have a parent");
396 let parent_record = domain_object_map
399 .remove(&parent_record_id)
400 .ok_or_else(|| IndexerError::IotaNames(IotaNamesError::NameExpired))?;
401
402 if parent_record.is_valid_leaf_parent(&name_record)
403 && !parent_record.is_node_expired(current_timestamp)
404 {
405 return Ok(Some(name_record.into()));
406 } else {
407 return Err(IndexerError::IotaNames(IotaNamesError::NameExpired).into());
408 }
409 }
410 }
411
412 async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
413 let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
414
415 let Some(field_reverse_record_object) = self
416 .inner
417 .get_object_in_blocking_task(reverse_record_id)
418 .await?
419 else {
420 return Ok(None);
421 };
422
423 let domain = field_reverse_record_object
424 .to_rust::<Field<IotaAddress, Domain>>()
425 .ok_or_else(|| {
426 IndexerError::PersistentStorageDataCorruption(format!(
427 "Malformed Object {reverse_record_id}"
428 ))
429 })?
430 .value;
431
432 let domain_name = domain.to_string();
433
434 let resolved_record = self.iota_names_lookup(&domain_name).await?;
436
437 if resolved_record.is_none() {
440 return Ok(None);
441 }
442
443 Ok(Some(domain_name))
444 }
445
446 async fn iota_names_find_all_registration_nfts(
447 &self,
448 address: IotaAddress,
449 cursor: Option<ObjectID>,
450 limit: Option<usize>,
451 options: Option<IotaObjectDataOptions>,
452 ) -> RpcResult<ObjectsPage> {
453 let query = IotaObjectResponseQuery {
454 filter: Some(IotaObjectDataFilter::StructType(
455 IotaNamesRegistration::type_(self.iota_names_config.package_address.into()),
456 )),
457 options,
458 };
459
460 let owned_objects = self
461 .get_owned_objects(address, Some(query), cursor, limit)
462 .await?;
463
464 Ok(owned_objects)
465 }
466}
467
468impl IotaRpcModule for IndexerApi {
469 fn rpc(self) -> RpcModule<Self> {
470 self.into_rpc()
471 }
472
473 fn rpc_doc_module() -> Module {
474 iota_json_rpc_api::IndexerApiOpenRpc::module_doc()
475 }
476}