iota_json_rpc/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13    IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14    cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17    DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18    IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
19    IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery, ObjectsPage, Page,
20    TransactionBlocksPage, TransactionFilter,
21};
22use iota_metrics::spawn_monitored_task;
23use iota_names::{
24    IotaNamesNft, IotaNamesRegistration, config::IotaNamesConfig, domain::Domain,
25    error::IotaNamesError, registry::NameRecord,
26};
27use iota_open_rpc::Module;
28use iota_storage::key_value_store::TransactionKeyValueStore;
29use iota_types::{
30    base_types::{IotaAddress, ObjectID},
31    digests::TransactionDigest,
32    dynamic_field::{DynamicFieldName, Field},
33    error::IotaObjectResponseError,
34    event::EventID,
35};
36use jsonrpsee::{
37    PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
38    core::{RpcResult, SubscriptionResult},
39};
40use move_bytecode_utils::layout::TypeLayoutBuilder;
41use move_core_types::language_storage::TypeTag;
42use serde::Serialize;
43use tokio::sync::{OwnedSemaphorePermit, Semaphore};
44use tracing::{debug, instrument};
45
46use crate::{
47    IotaRpcModule,
48    authority_state::{StateRead, StateReadResult},
49    error::{Error, IotaRpcInputError},
50    logger::FutureWithTracing as _,
51};
52
53async fn pipe_from_stream<T: Serialize>(
54    pending: PendingSubscriptionSink,
55    mut stream: impl Stream<Item = T> + Unpin,
56) -> Result<(), anyhow::Error> {
57    let sink = pending.accept().await?;
58
59    loop {
60        tokio::select! {
61            _ = sink.closed() => break Ok(()),
62            maybe_item = stream.next() => {
63                let Some(item) = maybe_item else {
64                    break Ok(());
65                };
66
67                let msg = SubscriptionMessage::from_json(&item)?;
68
69                if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
70                    match e {
71                        // The subscription or connection was closed.
72                        SendTimeoutError::Closed(_) => break Ok(()),
73                        // The subscription send timeout expired
74                        // the message is returned and you could save that message
75                        // and retry again later.
76                        SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
77                    }
78                }
79            }
80        }
81    }
82}
83
84pub fn spawn_subscription<S, T>(
85    pending: PendingSubscriptionSink,
86    rx: S,
87    permit: Option<OwnedSemaphorePermit>,
88) where
89    S: Stream<Item = T> + Unpin + Send + 'static,
90    T: Serialize + Send,
91{
92    spawn_monitored_task!(async move {
93        let _permit = permit;
94        match pipe_from_stream(pending, rx).await {
95            Ok(_) => {
96                debug!("Subscription completed.");
97            }
98            Err(err) => {
99                debug!("Subscription failed: {err:?}");
100            }
101        }
102    });
103}
104const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
105
106pub struct IndexerApi<R> {
107    state: Arc<dyn StateRead>,
108    read_api: R,
109    transaction_kv_store: Arc<TransactionKeyValueStore>,
110    iota_names_config: IotaNamesConfig,
111    pub metrics: Arc<JsonRpcMetrics>,
112    subscription_semaphore: Arc<Semaphore>,
113}
114
115impl<R: ReadApiServer> IndexerApi<R> {
116    pub fn new(
117        state: Arc<AuthorityState>,
118        read_api: R,
119        transaction_kv_store: Arc<TransactionKeyValueStore>,
120        metrics: Arc<JsonRpcMetrics>,
121        iota_names_config: IotaNamesConfig,
122        max_subscriptions: Option<usize>,
123    ) -> Self {
124        let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
125        Self {
126            state,
127            transaction_kv_store,
128            read_api,
129            metrics,
130            iota_names_config,
131            subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
132        }
133    }
134
135    fn extract_values_from_dynamic_field_name(
136        &self,
137        name: DynamicFieldName,
138    ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
139        let DynamicFieldName {
140            type_: name_type,
141            value,
142        } = name;
143        let epoch_store = self.state.load_epoch_store_one_call_per_task();
144        let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
145        let iota_json_value = IotaJsonValue::new(value)?;
146        let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
147        Ok((name_type, name_bcs_value))
148    }
149
150    fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
151        match self.subscription_semaphore.clone().try_acquire_owned() {
152            Ok(p) => Ok(p),
153            Err(_) => bail!("Resources exhausted"),
154        }
155    }
156
157    async fn get_dynamic_field_object(
158        &self,
159        parent_object_id: ObjectID,
160        name: DynamicFieldName,
161        options: Option<IotaObjectDataOptions>,
162    ) -> RpcResult<IotaObjectResponse> {
163        async move {
164            let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
165
166            let id = self
167                .state
168                .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
169                .map_err(Error::from)?;
170
171            if let Some(id) = id {
172                self.read_api
173                    .get_object(id, options)
174                    .await
175                    .map_err(|e| Error::Internal(anyhow!(e)))
176            } else {
177                Ok(IotaObjectResponse::new_with_error(
178                    IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
179                ))
180            }
181        }
182        .trace()
183        .await
184    }
185
186    fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
187        let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
188
189        let checkpoint = self
190            .state
191            .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
192
193        Ok(checkpoint.timestamp_ms)
194    }
195}
196
197#[async_trait]
198impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
199    #[instrument(skip(self))]
200    async fn get_owned_objects(
201        &self,
202        address: IotaAddress,
203        query: Option<IotaObjectResponseQuery>,
204        cursor: Option<ObjectID>,
205        limit: Option<usize>,
206    ) -> RpcResult<ObjectsPage> {
207        async move {
208            let limit =
209                validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
210            self.metrics.get_owned_objects_limit.report(limit as u64);
211            let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
212            let options = options.unwrap_or_default();
213            let mut objects =
214                self.state
215                    .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
216
217            // objects here are of size (limit + 1), where the last one is the cursor for
218            // the next page
219            let has_next_page = objects.len() > limit;
220            objects.truncate(limit);
221            let next_cursor = objects
222                .last()
223                .cloned()
224                .map_or(cursor, |o_info| Some(o_info.object_id));
225
226            let data = match options.is_not_in_object_info() {
227                true => {
228                    let object_ids = objects.iter().map(|obj| obj.object_id).collect();
229                    self.read_api
230                        .multi_get_objects(object_ids, Some(options))
231                        .await
232                        .map_err(|e| Error::Internal(anyhow!(e)))?
233                }
234                false => objects
235                    .into_iter()
236                    .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
237                    .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
238            };
239
240            self.metrics
241                .get_owned_objects_result_size
242                .report(data.len() as u64);
243            self.metrics
244                .get_owned_objects_result_size_total
245                .inc_by(data.len() as u64);
246            Ok(Page {
247                data,
248                next_cursor,
249                has_next_page,
250            })
251        }
252        .trace()
253        .await
254    }
255
256    #[instrument(skip(self))]
257    async fn query_transaction_blocks(
258        &self,
259        query: IotaTransactionBlockResponseQuery,
260        // If `Some`, the query will start from the next item after the specified cursor
261        cursor: Option<TransactionDigest>,
262        limit: Option<usize>,
263        descending_order: Option<bool>,
264    ) -> RpcResult<TransactionBlocksPage> {
265        async move {
266            let limit = cap_page_limit(limit);
267            self.metrics.query_tx_blocks_limit.report(limit as u64);
268            let descending = descending_order.unwrap_or_default();
269            let opts = query.options.unwrap_or_default();
270
271            // Retrieve 1 extra item for next cursor
272            let mut digests = self
273                .state
274                .get_transactions(
275                    &self.transaction_kv_store,
276                    query.filter,
277                    cursor,
278                    Some(limit + 1),
279                    descending,
280                )
281                .await
282                .map_err(Error::from)?;
283            // De-dup digests, duplicate digests are possible, for example,
284            // when get_transactions_by_move_function with module or function being None.
285            let mut seen = HashSet::new();
286            digests.retain(|digest| seen.insert(*digest));
287
288            // extract next cursor
289            let has_next_page = digests.len() > limit;
290            digests.truncate(limit);
291            let next_cursor = digests.last().cloned().map_or(cursor, Some);
292
293            let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
294                digests
295                    .into_iter()
296                    .map(IotaTransactionBlockResponse::new)
297                    .collect()
298            } else {
299                self.read_api
300                    .multi_get_transaction_blocks(digests, Some(opts))
301                    .await
302                    .map_err(|e| Error::Internal(anyhow!(e)))?
303            };
304
305            self.metrics
306                .query_tx_blocks_result_size
307                .report(data.len() as u64);
308            self.metrics
309                .query_tx_blocks_result_size_total
310                .inc_by(data.len() as u64);
311            Ok(Page {
312                data,
313                next_cursor,
314                has_next_page,
315            })
316        }
317        .trace()
318        .await
319    }
320    #[instrument(skip(self))]
321    async fn query_events(
322        &self,
323        query: EventFilter,
324        // exclusive cursor if `Some`, otherwise start from the beginning
325        cursor: Option<EventID>,
326        limit: Option<usize>,
327        descending_order: Option<bool>,
328    ) -> RpcResult<EventPage> {
329        async move {
330            let descending = descending_order.unwrap_or_default();
331            let limit = cap_page_limit(limit);
332            self.metrics.query_events_limit.report(limit as u64);
333            // Retrieve 1 extra item for next cursor
334            let mut data = self
335                .state
336                .query_events(
337                    &self.transaction_kv_store,
338                    query,
339                    cursor,
340                    limit + 1,
341                    descending,
342                )
343                .await
344                .map_err(Error::from)?;
345            let has_next_page = data.len() > limit;
346            data.truncate(limit);
347            let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
348            self.metrics
349                .query_events_result_size
350                .report(data.len() as u64);
351            self.metrics
352                .query_events_result_size_total
353                .inc_by(data.len() as u64);
354            Ok(EventPage {
355                data,
356                next_cursor,
357                has_next_page,
358            })
359        }
360        .trace()
361        .await
362    }
363
364    #[instrument(skip(self))]
365    fn subscribe_event(
366        &self,
367        sink: PendingSubscriptionSink,
368        filter: EventFilter,
369    ) -> SubscriptionResult {
370        let permit = self.acquire_subscribe_permit()?;
371        spawn_subscription(
372            sink,
373            self.state
374                .get_subscription_handler()
375                .subscribe_events(filter),
376            Some(permit),
377        );
378        Ok(())
379    }
380
381    fn subscribe_transaction(
382        &self,
383        sink: PendingSubscriptionSink,
384        filter: TransactionFilter,
385    ) -> SubscriptionResult {
386        // Validate unsupported filters
387        if matches!(filter, TransactionFilter::Checkpoint(_)) {
388            return Err("checkpoint filter is not supported".into());
389        }
390
391        let permit = self.acquire_subscribe_permit()?;
392        spawn_subscription(
393            sink,
394            self.state
395                .get_subscription_handler()
396                .subscribe_transactions(filter),
397            Some(permit),
398        );
399        Ok(())
400    }
401
402    #[instrument(skip(self))]
403    async fn get_dynamic_fields(
404        &self,
405        parent_object_id: ObjectID,
406        // If `Some`, the query will start from the next item after the specified cursor
407        cursor: Option<ObjectID>,
408        limit: Option<usize>,
409    ) -> RpcResult<DynamicFieldPage> {
410        async move {
411            let limit = cap_page_limit(limit);
412            self.metrics.get_dynamic_fields_limit.report(limit as u64);
413            let mut data = self
414                .state
415                .get_dynamic_fields(parent_object_id, cursor, limit + 1)
416                .map_err(Error::from)?;
417            let has_next_page = data.len() > limit;
418            data.truncate(limit);
419            let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
420            self.metrics
421                .get_dynamic_fields_result_size
422                .report(data.len() as u64);
423            self.metrics
424                .get_dynamic_fields_result_size_total
425                .inc_by(data.len() as u64);
426            Ok(DynamicFieldPage {
427                data: data.into_iter().map(|(_, w)| w.into()).collect(),
428                next_cursor,
429                has_next_page,
430            })
431        }
432        .trace()
433        .await
434    }
435
436    #[instrument(skip(self))]
437    async fn get_dynamic_field_object(
438        &self,
439        parent_object_id: ObjectID,
440        name: DynamicFieldName,
441    ) -> RpcResult<IotaObjectResponse> {
442        self.get_dynamic_field_object(
443            parent_object_id,
444            name,
445            Some(IotaObjectDataOptions::full_content()),
446        )
447        .await
448    }
449
450    #[instrument(skip(self))]
451    async fn get_dynamic_field_object_v2(
452        &self,
453        parent_object_id: ObjectID,
454        name: DynamicFieldName,
455        options: Option<IotaObjectDataOptions>,
456    ) -> RpcResult<IotaObjectResponse> {
457        self.get_dynamic_field_object(parent_object_id, name, options)
458            .await
459    }
460
461    async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
462        let domain = name.parse::<Domain>().map_err(Error::from)?;
463
464        // Construct the record id to lookup.
465        let record_id = self.iota_names_config.record_field_id(&domain);
466
467        let parent_record_id = domain
468            .parent()
469            .map(|parent_domain| self.iota_names_config.record_field_id(&parent_domain));
470
471        // Keep record IDs alive by declaring both before creating futures
472        let mut requests = vec![self.state.get_object(&record_id)];
473
474        // We only want to fetch both the child and the parent if the domain is a
475        // subdomain.
476        if let Some(ref parent_record_id) = parent_record_id {
477            requests.push(self.state.get_object(parent_record_id));
478        }
479
480        // Couldn't find a `multi_get_object` for this crate (looks like it uses a k,v
481        // db) Always fetching both parent + child at the same time (even for
482        // node subdomains), to avoid sequential db reads. We do this because we
483        // do not know if the requested domain is a node subdomain or a leaf
484        // subdomain, and we can save a trip to the db.
485        let mut results = futures::future::try_join_all(requests)
486            .await
487            .map_err(Error::from)?;
488
489        // Removing without checking vector len, since it is known (== 1 or 2 depending
490        // on whether it is a subdomain or not).
491        let Some(object) = results.remove(0) else {
492            return Ok(None);
493        };
494
495        let name_record = NameRecord::try_from(object).map_err(Error::from)?;
496
497        let current_timestamp_ms = self
498            .get_latest_checkpoint_timestamp_ms()
499            .map_err(Error::from)?;
500
501        // Handling SLD names & node subdomains is the same (we handle them as `node`
502        // records). We check their expiration, and if not expired, return the
503        // target address.
504        if !name_record.is_leaf_record() {
505            return if !name_record.is_node_expired(current_timestamp_ms) {
506                Ok(Some(name_record.into()))
507            } else {
508                Err(Error::from(IotaNamesError::NameExpired).into())
509            };
510        } else {
511            // Handle the `leaf` record case which requires to check the parent for
512            // expiration. We can remove since we know that if we're here, we have a parent
513            // result for the parent request. If the parent result is `None` for the
514            // existing leaf record, we consider it expired.
515            let Some(parent_object) = results.remove(0) else {
516                return Err(Error::from(IotaNamesError::NameExpired).into());
517            };
518
519            let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
520
521            // For a leaf record, we check that:
522            // 1. The parent is a valid parent for that leaf record
523            // 2. The parent is not expired
524            if parent_name_record.is_valid_leaf_parent(&name_record)
525                && !parent_name_record.is_node_expired(current_timestamp_ms)
526            {
527                Ok(Some(name_record.into()))
528            } else {
529                Err(Error::from(IotaNamesError::NameExpired).into())
530            }
531        }
532    }
533
534    #[instrument(skip(self))]
535    async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
536        let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
537
538        let Some(field_reverse_record_object) = self
539            .state
540            .get_object(&reverse_record_id)
541            .await
542            .map_err(Error::from)?
543        else {
544            return Ok(None);
545        };
546
547        let domain = field_reverse_record_object
548            .to_rust::<Field<IotaAddress, Domain>>()
549            .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
550            .value;
551
552        let domain_name = domain.to_string();
553
554        let resolved_record = self.iota_names_lookup(&domain_name).await?;
555
556        // If looking up the domain returns an empty result, we return an empty result.
557        if resolved_record.is_none() {
558            return Ok(None);
559        }
560
561        Ok(Some(domain_name))
562    }
563
564    #[instrument(skip(self))]
565    async fn iota_names_find_all_registration_nfts(
566        &self,
567        address: IotaAddress,
568        cursor: Option<ObjectID>,
569        limit: Option<usize>,
570        options: Option<IotaObjectDataOptions>,
571    ) -> RpcResult<ObjectsPage> {
572        let query = IotaObjectResponseQuery {
573            filter: Some(IotaObjectDataFilter::StructType(
574                IotaNamesRegistration::type_(self.iota_names_config.package_address.into()),
575            )),
576            options,
577        };
578
579        let owned_objects = self
580            .get_owned_objects(address, Some(query), cursor, limit)
581            .await?;
582
583        Ok(owned_objects)
584    }
585}
586
587impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
588    fn rpc(self) -> RpcModule<Self> {
589        self.into_rpc()
590    }
591
592    fn rpc_doc_module() -> Module {
593        IndexerApiOpenRpc::module_doc()
594    }
595}