iota_indexer/apis/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::HashMap;
6
7use async_trait::async_trait;
8use iota_json_rpc::IotaRpcModule;
9use iota_json_rpc_api::{IndexerApiServer, cap_page_limit, error_object_from_rpc, internal_error};
10use iota_json_rpc_types::{
11    DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectData, IotaObjectDataFilter,
12    IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
13    IotaTransactionBlockResponseQuery, ObjectsPage, Page, TransactionBlocksPage, TransactionFilter,
14};
15use iota_names::{
16    IotaNamesNft, IotaNamesRegistration, config::IotaNamesConfig, domain::Domain,
17    error::IotaNamesError, registry::NameRecord,
18};
19use iota_open_rpc::Module;
20use iota_types::{
21    TypeTag,
22    base_types::{IotaAddress, ObjectID},
23    digests::TransactionDigest,
24    dynamic_field::{DynamicFieldName, Field},
25    error::IotaObjectResponseError,
26    event::EventID,
27    object::ObjectRead,
28};
29use jsonrpsee::{
30    PendingSubscriptionSink, RpcModule,
31    core::{RpcResult, SubscriptionResult, client::Error as RpcClientError},
32};
33use tap::TapFallible;
34
35use crate::{errors::IndexerError, indexer_reader::IndexerReader};
36
37pub(crate) struct IndexerApi {
38    inner: IndexerReader,
39    iota_names_config: IotaNamesConfig,
40}
41
42impl IndexerApi {
43    pub fn new(inner: IndexerReader, iota_names_config: IotaNamesConfig) -> Self {
44        Self {
45            inner,
46            iota_names_config,
47        }
48    }
49
50    async fn get_owned_objects_internal(
51        &self,
52        address: IotaAddress,
53        query: Option<IotaObjectResponseQuery>,
54        cursor: Option<ObjectID>,
55        limit: usize,
56    ) -> RpcResult<ObjectsPage> {
57        let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
58        let options = options.unwrap_or_default();
59        let objects = self
60            .inner
61            .get_owned_objects_in_blocking_task(address, filter, cursor, limit + 1)
62            .await?;
63
64        let mut object_futures = vec![];
65        for object in objects {
66            object_futures.push(tokio::task::spawn(
67                object.try_into_object_read(self.inner.package_resolver()),
68            ));
69        }
70        let mut objects = futures::future::try_join_all(object_futures)
71            .await
72            .map_err(|e| {
73                tracing::error!("Error joining object read futures.");
74                RpcClientError::Custom(format!("Error joining object read futures. {e}"))
75            })
76            .map_err(error_object_from_rpc)?
77            .into_iter()
78            .collect::<Result<Vec<_>, _>>()
79            .tap_err(|e| tracing::error!("Error converting object to object read: {e}"))?;
80        let has_next_page = objects.len() > limit;
81        objects.truncate(limit);
82
83        let next_cursor = objects.last().map(|o_read| o_read.object_id());
84        let construct_response_tasks = objects.into_iter().map(|object| {
85            tokio::task::spawn(construct_object_response(
86                object,
87                self.inner.clone(),
88                options.clone(),
89            ))
90        });
91        let data = futures::future::try_join_all(construct_response_tasks)
92            .await
93            .map_err(internal_error)?
94            .into_iter()
95            .collect::<Result<Vec<_>, _>>()
96            .map_err(internal_error)?;
97
98        Ok(Page {
99            data,
100            next_cursor,
101            has_next_page,
102        })
103    }
104
105    async fn get_dynamic_field_object(
106        &self,
107        parent_object_id: ObjectID,
108        name: DynamicFieldName,
109        options: Option<IotaObjectDataOptions>,
110    ) -> RpcResult<IotaObjectResponse> {
111        let name_bcs_value = self.inner.bcs_name_from_dynamic_field_name(&name).await?;
112
113        // Try as Dynamic Field
114        let id = iota_types::dynamic_field::derive_dynamic_field_id(
115            parent_object_id,
116            &name.type_,
117            &name_bcs_value,
118        )
119        .map_err(internal_error)?;
120
121        let options = options.unwrap_or_default();
122
123        match self.inner.get_object_read_in_blocking_task(id).await? {
124            ObjectRead::NotExists(_) | ObjectRead::Deleted(_) => {}
125            ObjectRead::Exists(object_ref, o, layout) => {
126                return Ok(IotaObjectResponse::new_with_data(
127                    IotaObjectData::new(object_ref, o, layout, options, None)
128                        .map_err(internal_error)?,
129                ));
130            }
131        }
132
133        // Try as Dynamic Field Object
134        let dynamic_object_field_struct =
135            iota_types::dynamic_field::DynamicFieldInfo::dynamic_object_field_wrapper(name.type_);
136        let dynamic_object_field_type = TypeTag::Struct(Box::new(dynamic_object_field_struct));
137        let dynamic_object_field_id = iota_types::dynamic_field::derive_dynamic_field_id(
138            parent_object_id,
139            &dynamic_object_field_type,
140            &name_bcs_value,
141        )
142        .map_err(internal_error)?;
143
144        match self
145            .inner
146            .get_object_read_in_blocking_task(dynamic_object_field_id)
147            .await?
148        {
149            ObjectRead::NotExists(_) | ObjectRead::Deleted(_) => {}
150            ObjectRead::Exists(object_ref, o, layout) => {
151                return Ok(IotaObjectResponse::new_with_data(
152                    IotaObjectData::new(object_ref, o, layout, options, None)
153                        .map_err(internal_error)?,
154                ));
155            }
156        }
157
158        Ok(IotaObjectResponse::new_with_error(
159            IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
160        ))
161    }
162}
163
164async fn construct_object_response(
165    obj: ObjectRead,
166    reader: IndexerReader,
167    options: IotaObjectDataOptions,
168) -> anyhow::Result<IotaObjectResponse> {
169    match obj {
170        ObjectRead::NotExists(id) => Ok(IotaObjectResponse::new_with_error(
171            IotaObjectResponseError::NotExists { object_id: id },
172        )),
173        ObjectRead::Exists(object_ref, o, layout) => {
174            if options.show_display {
175                match reader.get_display_fields(&o, &layout).await {
176                    Ok(rendered_fields) => Ok(IotaObjectResponse::new_with_data(
177                        IotaObjectData::new(object_ref, o, layout, options, rendered_fields)?,
178                    )),
179                    Err(e) => Ok(IotaObjectResponse::new(
180                        Some(IotaObjectData::new(object_ref, o, layout, options, None)?),
181                        Some(IotaObjectResponseError::Display {
182                            error: e.to_string(),
183                        }),
184                    )),
185                }
186            } else {
187                Ok(IotaObjectResponse::new_with_data(IotaObjectData::new(
188                    object_ref, o, layout, options, None,
189                )?))
190            }
191        }
192        ObjectRead::Deleted((object_id, version, digest)) => Ok(
193            IotaObjectResponse::new_with_error(IotaObjectResponseError::Deleted {
194                object_id,
195                version,
196                digest,
197            }),
198        ),
199    }
200}
201
202#[async_trait]
203impl IndexerApiServer for IndexerApi {
204    async fn get_owned_objects(
205        &self,
206        address: IotaAddress,
207        query: Option<IotaObjectResponseQuery>,
208        cursor: Option<ObjectID>,
209        limit: Option<usize>,
210    ) -> RpcResult<ObjectsPage> {
211        let limit = cap_page_limit(limit);
212        if limit == 0 {
213            return Ok(ObjectsPage::empty());
214        }
215        self.get_owned_objects_internal(address, query, cursor, limit)
216            .await
217    }
218
219    async fn query_transaction_blocks(
220        &self,
221        query: IotaTransactionBlockResponseQuery,
222        cursor: Option<TransactionDigest>,
223        limit: Option<usize>,
224        descending_order: Option<bool>,
225    ) -> RpcResult<TransactionBlocksPage> {
226        let limit = cap_page_limit(limit);
227        if limit == 0 {
228            return Ok(TransactionBlocksPage::empty());
229        }
230        let mut results = self
231            .inner
232            .query_transaction_blocks_in_blocking_task(
233                query.filter,
234                query.options.unwrap_or_default(),
235                cursor,
236                limit + 1,
237                descending_order.unwrap_or(false),
238            )
239            .await?;
240
241        let has_next_page = results.len() > limit;
242        results.truncate(limit);
243        let next_cursor = results.last().map(|o| o.digest);
244        Ok(Page {
245            data: results,
246            next_cursor,
247            has_next_page,
248        })
249    }
250
251    async fn query_events(
252        &self,
253        query: EventFilter,
254        // exclusive cursor if `Some`, otherwise start from the beginning
255        cursor: Option<EventID>,
256        limit: Option<usize>,
257        descending_order: Option<bool>,
258    ) -> RpcResult<EventPage> {
259        let limit = cap_page_limit(limit);
260        if limit == 0 {
261            return Ok(EventPage::empty());
262        }
263        let descending_order = descending_order.unwrap_or(false);
264        let mut results = self
265            .inner
266            .query_events_in_blocking_task(query, cursor, limit + 1, descending_order)
267            .await?;
268
269        let has_next_page = results.len() > limit;
270        results.truncate(limit);
271        let next_cursor = results.last().map(|o| o.id);
272        Ok(Page {
273            data: results,
274            next_cursor,
275            has_next_page,
276        })
277    }
278
279    async fn get_dynamic_fields(
280        &self,
281        parent_object_id: ObjectID,
282        cursor: Option<ObjectID>,
283        limit: Option<usize>,
284    ) -> RpcResult<DynamicFieldPage> {
285        let limit = cap_page_limit(limit);
286        if limit == 0 {
287            return Ok(DynamicFieldPage::empty());
288        }
289        let mut results = self
290            .inner
291            .get_dynamic_fields_in_blocking_task(parent_object_id, cursor, limit + 1)
292            .await?;
293
294        let has_next_page = results.len() > limit;
295        results.truncate(limit);
296        let next_cursor = results.last().map(|o| o.object_id);
297        Ok(Page {
298            data: results.into_iter().map(Into::into).collect(),
299            next_cursor,
300            has_next_page,
301        })
302    }
303
304    async fn get_dynamic_field_object(
305        &self,
306        parent_object_id: ObjectID,
307        name: DynamicFieldName,
308    ) -> RpcResult<IotaObjectResponse> {
309        self.get_dynamic_field_object(
310            parent_object_id,
311            name,
312            Some(IotaObjectDataOptions::full_content()),
313        )
314        .await
315    }
316
317    async fn get_dynamic_field_object_v2(
318        &self,
319        parent_object_id: ObjectID,
320        name: DynamicFieldName,
321        options: Option<IotaObjectDataOptions>,
322    ) -> RpcResult<IotaObjectResponse> {
323        self.get_dynamic_field_object(parent_object_id, name, options)
324            .await
325    }
326
327    fn subscribe_event(
328        &self,
329        _sink: PendingSubscriptionSink,
330        _filter: EventFilter,
331    ) -> SubscriptionResult {
332        Err("empty subscription".into())
333    }
334
335    fn subscribe_transaction(
336        &self,
337        _sink: PendingSubscriptionSink,
338        _filter: TransactionFilter,
339    ) -> SubscriptionResult {
340        Err("empty subscription".into())
341    }
342
343    async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
344        let domain: Domain = name.parse().map_err(IndexerError::IotaNames)?;
345
346        // Construct the record id to lookup.
347        let record_id = self.iota_names_config.record_field_id(&domain);
348
349        // Gather the requests to fetch in the multi_get_objs.
350        let mut requests = vec![record_id];
351
352        // We only want to fetch both the child and the parent if the domain is a
353        // subdomain.
354        let parent_record_id = domain.parent().map(|parent_domain| {
355            let parent_record_id = self.iota_names_config.record_field_id(&parent_domain);
356            requests.push(parent_record_id);
357            parent_record_id
358        });
359
360        // Fetch both parent (if subdomain) and child records in a single get query.
361        // We do this as we do not know if the subdomain is a node or leaf record.
362        let mut domain_object_map = self
363            .inner
364            .multi_get_objects_in_blocking_task(requests)
365            .await?
366            .into_iter()
367            .map(iota_types::object::Object::try_from)
368            .try_fold(HashMap::new(), |mut map, res| {
369                let obj = res?;
370                map.insert(obj.id(), obj.try_into()?);
371                Ok::<HashMap<ObjectID, NameRecord>, IndexerError>(map)
372            })?;
373
374        // Extract the name record for the provided domain
375        let Some(name_record) = domain_object_map.remove(&record_id) else {
376            return Ok(None);
377        };
378
379        // get latest timestamp to check expiration.
380        let current_timestamp = self
381            .inner
382            .get_latest_checkpoint_timestamp_ms_in_blocking_task()
383            .await?;
384
385        // If the provided domain is a `node` record, we can check for expiration
386        if !name_record.is_leaf_record() {
387            return if !name_record.is_node_expired(current_timestamp) {
388                Ok(Some(name_record.into()))
389            } else {
390                Err(IndexerError::IotaNames(IotaNamesError::NameExpired).into())
391            };
392        } else {
393            // Handle the `leaf` record case which requires to check the parent for
394            // expiration.
395            let parent_record_id = parent_record_id.expect("leaf record should have a parent");
396            // If the parent record is not found for the existing leaf, we consider it
397            // expired.
398            let parent_record = domain_object_map
399                .remove(&parent_record_id)
400                .ok_or_else(|| IndexerError::IotaNames(IotaNamesError::NameExpired))?;
401
402            if parent_record.is_valid_leaf_parent(&name_record)
403                && !parent_record.is_node_expired(current_timestamp)
404            {
405                return Ok(Some(name_record.into()));
406            } else {
407                return Err(IndexerError::IotaNames(IotaNamesError::NameExpired).into());
408            }
409        }
410    }
411
412    async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
413        let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
414
415        let Some(field_reverse_record_object) = self
416            .inner
417            .get_object_in_blocking_task(reverse_record_id)
418            .await?
419        else {
420            return Ok(None);
421        };
422
423        let domain = field_reverse_record_object
424            .to_rust::<Field<IotaAddress, Domain>>()
425            .ok_or_else(|| {
426                IndexerError::PersistentStorageDataCorruption(format!(
427                    "Malformed Object {reverse_record_id}"
428                ))
429            })?
430            .value;
431
432        let domain_name = domain.to_string();
433
434        // Tries to resolve the name, to verify it is not expired.
435        let resolved_record = self.iota_names_lookup(&domain_name).await?;
436
437        // If we do not have a resolved address, we do not include the domain in the
438        // result.
439        if resolved_record.is_none() {
440            return Ok(None);
441        }
442
443        Ok(Some(domain_name))
444    }
445
446    async fn iota_names_find_all_registration_nfts(
447        &self,
448        address: IotaAddress,
449        cursor: Option<ObjectID>,
450        limit: Option<usize>,
451        options: Option<IotaObjectDataOptions>,
452    ) -> RpcResult<ObjectsPage> {
453        let query = IotaObjectResponseQuery {
454            filter: Some(IotaObjectDataFilter::StructType(
455                IotaNamesRegistration::type_(self.iota_names_config.package_address.into()),
456            )),
457            options,
458        };
459
460        let owned_objects = self
461            .get_owned_objects(address, Some(query), cursor, limit)
462            .await?;
463
464        Ok(owned_objects)
465    }
466}
467
468impl IotaRpcModule for IndexerApi {
469    fn rpc(self) -> RpcModule<Self> {
470        self.into_rpc()
471    }
472
473    fn rpc_doc_module() -> Module {
474        iota_json_rpc_api::IndexerApiOpenRpc::module_doc()
475    }
476}