iota_json_rpc/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13    IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14    cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17    DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18    IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseError, IotaObjectResponseQuery,
19    IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery,
20    IotaTransactionBlockResponseQueryV2, ObjectsPage, Page, TransactionBlocksPage,
21    TransactionFilter,
22};
23use iota_metrics::spawn_monitored_task;
24use iota_names::{
25    IotaNamesNft, NameRegistration, config::IotaNamesConfig, error::IotaNamesError, name::Name,
26    registry::NameRecord,
27};
28use iota_open_rpc::Module;
29use iota_storage::key_value_store::TransactionKeyValueStore;
30use iota_types::{
31    base_types::{IotaAddress, ObjectID, TypeTag},
32    digests::TransactionDigest,
33    dynamic_field::{DynamicFieldName, Field},
34    error::UserInputError,
35    event::EventID,
36    iota_sdk_types_conversions::type_tag_sdk_to_core,
37};
38use jsonrpsee::{
39    PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
40    core::{RpcResult, SubscriptionResult},
41};
42use move_bytecode_utils::layout::TypeLayoutBuilder;
43use serde::Serialize;
44use tokio::sync::{OwnedSemaphorePermit, Semaphore};
45use tracing::{debug, instrument};
46
47use crate::{
48    IotaRpcModule,
49    authority_state::{StateRead, StateReadResult},
50    error::{Error, IotaRpcInputError},
51    logger::FutureWithTracing as _,
52};
53
54async fn pipe_from_stream<T: Serialize>(
55    pending: PendingSubscriptionSink,
56    mut stream: impl Stream<Item = T> + Unpin,
57) -> Result<(), anyhow::Error> {
58    let sink = pending.accept().await?;
59
60    loop {
61        tokio::select! {
62            _ = sink.closed() => break Ok(()),
63            maybe_item = stream.next() => {
64                let Some(item) = maybe_item else {
65                    break Ok(());
66                };
67
68                let msg = SubscriptionMessage::from_json(&item)?;
69
70                if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
71                    match e {
72                        // The subscription or connection was closed.
73                        SendTimeoutError::Closed(_) => break Ok(()),
74                        // The subscription send timeout expired
75                        // the message is returned and you could save that message
76                        // and retry again later.
77                        SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
78                    }
79                }
80            }
81        }
82    }
83}
84
85pub fn spawn_subscription<S, T>(
86    pending: PendingSubscriptionSink,
87    rx: S,
88    permit: Option<OwnedSemaphorePermit>,
89) where
90    S: Stream<Item = T> + Unpin + Send + 'static,
91    T: Serialize + Send,
92{
93    spawn_monitored_task!(async move {
94        let _permit = permit;
95        match pipe_from_stream(pending, rx).await {
96            Ok(_) => {
97                debug!("Subscription completed.");
98            }
99            Err(err) => {
100                debug!("Subscription failed: {err:?}");
101            }
102        }
103    });
104}
105const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
106
107pub struct IndexerApi<R> {
108    state: Arc<dyn StateRead>,
109    read_api: R,
110    transaction_kv_store: Arc<TransactionKeyValueStore>,
111    iota_names_config: IotaNamesConfig,
112    pub metrics: Arc<JsonRpcMetrics>,
113    subscription_semaphore: Arc<Semaphore>,
114}
115
116impl<R: ReadApiServer> IndexerApi<R> {
117    pub fn new(
118        state: Arc<AuthorityState>,
119        read_api: R,
120        transaction_kv_store: Arc<TransactionKeyValueStore>,
121        metrics: Arc<JsonRpcMetrics>,
122        iota_names_config: IotaNamesConfig,
123        max_subscriptions: Option<usize>,
124    ) -> Self {
125        let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
126        Self {
127            state,
128            transaction_kv_store,
129            read_api,
130            metrics,
131            iota_names_config,
132            subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
133        }
134    }
135
136    fn extract_values_from_dynamic_field_name(
137        &self,
138        name: DynamicFieldName,
139    ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
140        let DynamicFieldName {
141            type_: name_type,
142            value,
143        } = name;
144        let epoch_store = self.state.load_epoch_store_one_call_per_task();
145        let layout = TypeLayoutBuilder::build_with_types(
146            &type_tag_sdk_to_core(&name_type),
147            epoch_store.module_cache(),
148        )?;
149        let iota_json_value = IotaJsonValue::new(value)?;
150        let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
151        Ok((name_type, name_bcs_value))
152    }
153
154    fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
155        match self.subscription_semaphore.clone().try_acquire_owned() {
156            Ok(p) => Ok(p),
157            Err(_) => bail!("Resources exhausted"),
158        }
159    }
160
161    async fn get_dynamic_field_object(
162        &self,
163        parent_object_id: ObjectID,
164        name: DynamicFieldName,
165        options: Option<IotaObjectDataOptions>,
166    ) -> RpcResult<IotaObjectResponse> {
167        async move {
168            let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
169
170            let id = self
171                .state
172                .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
173                .map_err(Error::from)?;
174
175            if let Some(id) = id {
176                self.read_api
177                    .get_object(id, options)
178                    .await
179                    .map_err(|e| Error::Internal(anyhow!(e)))
180            } else {
181                Ok(IotaObjectResponse::new_with_error(
182                    IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
183                ))
184            }
185        }
186        .trace()
187        .await
188    }
189
190    fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
191        let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
192
193        let checkpoint = self
194            .state
195            .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
196
197        Ok(checkpoint.timestamp_ms)
198    }
199}
200
201#[async_trait]
202impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
203    #[instrument(skip(self, address), fields(address = %address))]
204    async fn get_owned_objects(
205        &self,
206        address: IotaAddress,
207        query: Option<IotaObjectResponseQuery>,
208        cursor: Option<ObjectID>,
209        limit: Option<usize>,
210    ) -> RpcResult<ObjectsPage> {
211        async move {
212            let limit =
213                validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
214            self.metrics.get_owned_objects_limit.observe(limit as f64);
215            let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
216            let options = options.unwrap_or_default();
217            let mut objects =
218                self.state
219                    .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
220
221            // objects here are of size (limit + 1), where the last one is the cursor for
222            // the next page
223            let has_next_page = objects.len() > limit && limit > 0;
224            objects.truncate(limit);
225            let next_cursor = (has_next_page).then_some(
226                objects
227                    .last()
228                    .map(|obj| obj.object_id)
229                    .unwrap_or(ObjectID::ZERO),
230            );
231
232            let data = match options.is_not_in_object_info() {
233                true => {
234                    let object_ids = objects.iter().map(|obj| obj.object_id).collect();
235                    self.read_api
236                        .multi_get_objects(object_ids, Some(options))
237                        .await
238                        .map_err(|e| Error::Internal(anyhow!(e)))?
239                }
240                false => objects
241                    .into_iter()
242                    .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
243                    .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
244            };
245
246            self.metrics
247                .get_owned_objects_result_size
248                .observe(data.len() as f64);
249            self.metrics
250                .get_owned_objects_result_size_total
251                .inc_by(data.len() as u64);
252            Ok(Page {
253                data,
254                next_cursor,
255                has_next_page,
256            })
257        }
258        .trace()
259        .await
260    }
261
262    #[instrument(skip(self))]
263    async fn query_transaction_blocks(
264        &self,
265        query: IotaTransactionBlockResponseQuery,
266        // If `Some`, the query will start from the next item after the specified cursor
267        cursor: Option<TransactionDigest>,
268        limit: Option<usize>,
269        descending_order: Option<bool>,
270    ) -> RpcResult<TransactionBlocksPage> {
271        async move {
272            let limit = cap_page_limit(limit);
273            self.metrics.query_tx_blocks_limit.observe(limit as f64);
274            let descending = descending_order.unwrap_or_default();
275            let opts = query.options.unwrap_or_default();
276
277            // Retrieve 1 extra item for next cursor
278            let mut digests = self
279                .state
280                .get_transactions(
281                    &self.transaction_kv_store,
282                    query.filter,
283                    cursor,
284                    Some(limit + 1),
285                    descending,
286                )
287                .await
288                .map_err(Error::from)?;
289            // De-dup digests, duplicate digests are possible, for example,
290            // when get_transactions_by_move_function with module or function being None.
291            let mut seen = HashSet::new();
292            digests.retain(|digest| seen.insert(*digest));
293
294            // extract next cursor
295            let has_next_page = digests.len() > limit;
296            digests.truncate(limit);
297            let next_cursor = digests.last().cloned().map_or(cursor, Some);
298
299            let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
300                digests
301                    .into_iter()
302                    .map(IotaTransactionBlockResponse::new)
303                    .collect()
304            } else {
305                self.read_api
306                    .multi_get_transaction_blocks(digests, Some(opts))
307                    .await
308                    .map_err(|e| Error::Internal(anyhow!(e)))?
309            };
310
311            self.metrics
312                .query_tx_blocks_result_size
313                .observe(data.len() as f64);
314            self.metrics
315                .query_tx_blocks_result_size_total
316                .inc_by(data.len() as u64);
317            Ok(Page {
318                data,
319                next_cursor,
320                has_next_page,
321            })
322        }
323        .trace()
324        .await
325    }
326
327    #[instrument(skip(self))]
328    async fn query_transaction_blocks_v2(
329        &self,
330        query: IotaTransactionBlockResponseQueryV2,
331        // If `Some`, the query will start from the next item after the specified cursor
332        cursor: Option<TransactionDigest>,
333        limit: Option<usize>,
334        descending_order: Option<bool>,
335    ) -> RpcResult<TransactionBlocksPage> {
336        let v1_filter = query
337            .filter
338            .map(|f| {
339                f.as_v1().ok_or_else(|| {
340                    Error::UserInput(UserInputError::Unsupported(
341                        "transaction filter is not supported".to_string(),
342                    ))
343                })
344            })
345            .transpose()?;
346
347        let v1_query = IotaTransactionBlockResponseQuery {
348            filter: v1_filter,
349            options: query.options,
350        };
351        self.query_transaction_blocks(v1_query, cursor, limit, descending_order)
352            .await
353    }
354
355    #[instrument(skip(self))]
356    async fn query_events(
357        &self,
358        query: EventFilter,
359        // exclusive cursor if `Some`, otherwise start from the beginning
360        cursor: Option<EventID>,
361        limit: Option<usize>,
362        descending_order: Option<bool>,
363    ) -> RpcResult<EventPage> {
364        async move {
365            let descending = descending_order.unwrap_or_default();
366            let limit = cap_page_limit(limit);
367            self.metrics.query_events_limit.observe(limit as f64);
368            // Retrieve 1 extra item for next cursor
369            let mut data = self
370                .state
371                .query_events(
372                    &self.transaction_kv_store,
373                    query,
374                    cursor,
375                    limit + 1,
376                    descending,
377                )
378                .await
379                .map_err(Error::from)?;
380            let has_next_page = data.len() > limit;
381            data.truncate(limit);
382            let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
383            self.metrics
384                .query_events_result_size
385                .observe(data.len() as f64);
386            self.metrics
387                .query_events_result_size_total
388                .inc_by(data.len() as u64);
389            Ok(EventPage {
390                data,
391                next_cursor,
392                has_next_page,
393            })
394        }
395        .trace()
396        .await
397    }
398
399    #[instrument(skip(self))]
400    fn subscribe_event(
401        &self,
402        sink: PendingSubscriptionSink,
403        filter: EventFilter,
404    ) -> SubscriptionResult {
405        let permit = self.acquire_subscribe_permit()?;
406        spawn_subscription(
407            sink,
408            self.state
409                .get_subscription_handler()
410                .subscribe_events(filter),
411            Some(permit),
412        );
413        Ok(())
414    }
415
416    fn subscribe_transaction(
417        &self,
418        sink: PendingSubscriptionSink,
419        filter: TransactionFilter,
420    ) -> SubscriptionResult {
421        // Validate unsupported filters
422        if matches!(filter, TransactionFilter::Checkpoint(_)) {
423            return Err("checkpoint filter is not supported".into());
424        }
425
426        let permit = self.acquire_subscribe_permit()?;
427        spawn_subscription(
428            sink,
429            self.state
430                .get_subscription_handler()
431                .subscribe_transactions(filter),
432            Some(permit),
433        );
434        Ok(())
435    }
436
437    #[instrument(skip(self, parent_object_id), fields(parent_object_id = %parent_object_id))]
438    async fn get_dynamic_fields(
439        &self,
440        parent_object_id: ObjectID,
441        // If `Some`, the query will start from the next item after the specified cursor
442        cursor: Option<ObjectID>,
443        limit: Option<usize>,
444    ) -> RpcResult<DynamicFieldPage> {
445        async move {
446            let limit = cap_page_limit(limit);
447            self.metrics.get_dynamic_fields_limit.observe(limit as f64);
448            let mut data = self
449                .state
450                .get_dynamic_fields(parent_object_id, cursor, limit + 1)
451                .map_err(Error::from)?;
452            let has_next_page = data.len() > limit;
453            data.truncate(limit);
454            let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
455            self.metrics
456                .get_dynamic_fields_result_size
457                .observe(data.len() as f64);
458            self.metrics
459                .get_dynamic_fields_result_size_total
460                .inc_by(data.len() as u64);
461            Ok(DynamicFieldPage {
462                data: data.into_iter().map(|(_, w)| w.into()).collect(),
463                next_cursor,
464                has_next_page,
465            })
466        }
467        .trace()
468        .await
469    }
470
471    #[instrument(skip(self, parent_object_id), fields(parent_object_id = %parent_object_id))]
472    async fn get_dynamic_field_object(
473        &self,
474        parent_object_id: ObjectID,
475        name: DynamicFieldName,
476    ) -> RpcResult<IotaObjectResponse> {
477        self.get_dynamic_field_object(
478            parent_object_id,
479            name,
480            Some(IotaObjectDataOptions::full_content()),
481        )
482        .await
483    }
484
485    #[instrument(skip(self, parent_object_id), fields(parent_object_id = %parent_object_id))]
486    async fn get_dynamic_field_object_v2(
487        &self,
488        parent_object_id: ObjectID,
489        name: DynamicFieldName,
490        options: Option<IotaObjectDataOptions>,
491    ) -> RpcResult<IotaObjectResponse> {
492        self.get_dynamic_field_object(parent_object_id, name, options)
493            .await
494    }
495
496    async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
497        let name = name.parse::<Name>().map_err(Error::from)?;
498
499        // Construct the record id to lookup.
500        let record_id = self.iota_names_config.record_field_id(&name);
501
502        let parent_record_id = name
503            .parent()
504            .map(|parent_name| self.iota_names_config.record_field_id(&parent_name));
505
506        // Keep record IDs alive by declaring both before creating futures
507        let mut requests = vec![self.state.get_object(&record_id)];
508
509        // We only want to fetch both the child and the parent if the name is a
510        // subname.
511        if let Some(ref parent_record_id) = parent_record_id {
512            requests.push(self.state.get_object(parent_record_id));
513        }
514
515        // Couldn't find a `multi_get_object` for this crate (looks like it uses a k,v
516        // db) Always fetching both parent + child at the same time (even for
517        // node subnames), to avoid sequential db reads. We do this because we
518        // do not know if the requested name is a node subname or a leaf
519        // subname, and we can save a trip to the db.
520        let mut results = futures::future::try_join_all(requests)
521            .await
522            .map_err(Error::from)?;
523
524        // Removing without checking vector len, since it is known (== 1 or 2 depending
525        // on whether it is a subname or not).
526        let Some(object) = results.remove(0) else {
527            return Ok(None);
528        };
529
530        let name_record = NameRecord::try_from(object).map_err(Error::from)?;
531
532        let current_timestamp_ms = self
533            .get_latest_checkpoint_timestamp_ms()
534            .map_err(Error::from)?;
535
536        // Handling second-level names & node subnames is the same (we handle them as
537        // `node` records). We check their expiration, and if not expired,
538        // return the target address.
539        if !name_record.is_leaf_record() {
540            return if !name_record.is_node_expired(current_timestamp_ms) {
541                Ok(Some(name_record.into()))
542            } else {
543                Err(Error::from(IotaNamesError::NameExpired).into())
544            };
545        } else {
546            // Handle the `leaf` record case which requires to check the parent for
547            // expiration. We can remove since we know that if we're here, we have a parent
548            // result for the parent request. If the parent result is `None` for the
549            // existing leaf record, we consider it expired.
550            let Some(parent_object) = results.remove(0) else {
551                return Err(Error::from(IotaNamesError::NameExpired).into());
552            };
553
554            let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
555
556            // For a leaf record, we check that:
557            // 1. The parent is a valid parent for that leaf record
558            // 2. The parent is not expired
559            if parent_name_record.is_valid_leaf_parent(&name_record)
560                && !parent_name_record.is_node_expired(current_timestamp_ms)
561            {
562                Ok(Some(name_record.into()))
563            } else {
564                Err(Error::from(IotaNamesError::NameExpired).into())
565            }
566        }
567    }
568
569    #[instrument(skip(self, address), fields(address = %address))]
570    async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
571        let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
572
573        let Some(field_reverse_record_object) = self
574            .state
575            .get_object(&reverse_record_id)
576            .await
577            .map_err(Error::from)?
578        else {
579            return Ok(None);
580        };
581
582        let name = field_reverse_record_object
583            .to_rust::<Field<IotaAddress, Name>>()
584            .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
585            .value;
586
587        let name = name.to_string();
588
589        let resolved_record = self.iota_names_lookup(&name).await?;
590
591        // If looking up the name returns an empty result, we return an empty result.
592        if resolved_record.is_none() {
593            return Ok(None);
594        }
595
596        Ok(Some(name))
597    }
598
599    #[instrument(skip(self, address), fields(address = %address))]
600    async fn iota_names_find_all_registration_nfts(
601        &self,
602        address: IotaAddress,
603        cursor: Option<ObjectID>,
604        limit: Option<usize>,
605        options: Option<IotaObjectDataOptions>,
606    ) -> RpcResult<ObjectsPage> {
607        let query = IotaObjectResponseQuery {
608            filter: Some(IotaObjectDataFilter::StructType(NameRegistration::type_(
609                self.iota_names_config.package_address,
610            ))),
611            options,
612        };
613
614        let owned_objects = self
615            .get_owned_objects(address, Some(query), cursor, limit)
616            .await?;
617
618        Ok(owned_objects)
619    }
620}
621
622impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
623    fn rpc(self) -> RpcModule<Self> {
624        self.into_rpc()
625    }
626
627    fn rpc_doc_module() -> Module {
628        IndexerApiOpenRpc::module_doc()
629    }
630}