iota_json_rpc/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13    IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14    cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17    DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18    IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
19    IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery,
20    IotaTransactionBlockResponseQueryV2, ObjectsPage, Page, TransactionBlocksPage,
21    TransactionFilter,
22};
23use iota_metrics::spawn_monitored_task;
24use iota_names::{
25    IotaNamesNft, NameRegistration, config::IotaNamesConfig, error::IotaNamesError, name::Name,
26    registry::NameRecord,
27};
28use iota_open_rpc::Module;
29use iota_storage::key_value_store::TransactionKeyValueStore;
30use iota_types::{
31    base_types::{IotaAddress, ObjectID},
32    digests::TransactionDigest,
33    dynamic_field::{DynamicFieldName, Field},
34    error::{IotaObjectResponseError, UserInputError},
35    event::EventID,
36};
37use jsonrpsee::{
38    PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
39    core::{RpcResult, SubscriptionResult},
40};
41use move_bytecode_utils::layout::TypeLayoutBuilder;
42use move_core_types::language_storage::TypeTag;
43use serde::Serialize;
44use tokio::sync::{OwnedSemaphorePermit, Semaphore};
45use tracing::{debug, instrument};
46
47use crate::{
48    IotaRpcModule,
49    authority_state::{StateRead, StateReadResult},
50    error::{Error, IotaRpcInputError},
51    logger::FutureWithTracing as _,
52};
53
54async fn pipe_from_stream<T: Serialize>(
55    pending: PendingSubscriptionSink,
56    mut stream: impl Stream<Item = T> + Unpin,
57) -> Result<(), anyhow::Error> {
58    let sink = pending.accept().await?;
59
60    loop {
61        tokio::select! {
62            _ = sink.closed() => break Ok(()),
63            maybe_item = stream.next() => {
64                let Some(item) = maybe_item else {
65                    break Ok(());
66                };
67
68                let msg = SubscriptionMessage::from_json(&item)?;
69
70                if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
71                    match e {
72                        // The subscription or connection was closed.
73                        SendTimeoutError::Closed(_) => break Ok(()),
74                        // The subscription send timeout expired
75                        // the message is returned and you could save that message
76                        // and retry again later.
77                        SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
78                    }
79                }
80            }
81        }
82    }
83}
84
85pub fn spawn_subscription<S, T>(
86    pending: PendingSubscriptionSink,
87    rx: S,
88    permit: Option<OwnedSemaphorePermit>,
89) where
90    S: Stream<Item = T> + Unpin + Send + 'static,
91    T: Serialize + Send,
92{
93    spawn_monitored_task!(async move {
94        let _permit = permit;
95        match pipe_from_stream(pending, rx).await {
96            Ok(_) => {
97                debug!("Subscription completed.");
98            }
99            Err(err) => {
100                debug!("Subscription failed: {err:?}");
101            }
102        }
103    });
104}
105const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
106
107pub struct IndexerApi<R> {
108    state: Arc<dyn StateRead>,
109    read_api: R,
110    transaction_kv_store: Arc<TransactionKeyValueStore>,
111    iota_names_config: IotaNamesConfig,
112    pub metrics: Arc<JsonRpcMetrics>,
113    subscription_semaphore: Arc<Semaphore>,
114}
115
116impl<R: ReadApiServer> IndexerApi<R> {
117    pub fn new(
118        state: Arc<AuthorityState>,
119        read_api: R,
120        transaction_kv_store: Arc<TransactionKeyValueStore>,
121        metrics: Arc<JsonRpcMetrics>,
122        iota_names_config: IotaNamesConfig,
123        max_subscriptions: Option<usize>,
124    ) -> Self {
125        let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
126        Self {
127            state,
128            transaction_kv_store,
129            read_api,
130            metrics,
131            iota_names_config,
132            subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
133        }
134    }
135
136    fn extract_values_from_dynamic_field_name(
137        &self,
138        name: DynamicFieldName,
139    ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
140        let DynamicFieldName {
141            type_: name_type,
142            value,
143        } = name;
144        let epoch_store = self.state.load_epoch_store_one_call_per_task();
145        let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
146        let iota_json_value = IotaJsonValue::new(value)?;
147        let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
148        Ok((name_type, name_bcs_value))
149    }
150
151    fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
152        match self.subscription_semaphore.clone().try_acquire_owned() {
153            Ok(p) => Ok(p),
154            Err(_) => bail!("Resources exhausted"),
155        }
156    }
157
158    async fn get_dynamic_field_object(
159        &self,
160        parent_object_id: ObjectID,
161        name: DynamicFieldName,
162        options: Option<IotaObjectDataOptions>,
163    ) -> RpcResult<IotaObjectResponse> {
164        async move {
165            let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
166
167            let id = self
168                .state
169                .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
170                .map_err(Error::from)?;
171
172            if let Some(id) = id {
173                self.read_api
174                    .get_object(id, options)
175                    .await
176                    .map_err(|e| Error::Internal(anyhow!(e)))
177            } else {
178                Ok(IotaObjectResponse::new_with_error(
179                    IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
180                ))
181            }
182        }
183        .trace()
184        .await
185    }
186
187    fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
188        let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
189
190        let checkpoint = self
191            .state
192            .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
193
194        Ok(checkpoint.timestamp_ms)
195    }
196}
197
198#[async_trait]
199impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
200    #[instrument(skip(self))]
201    async fn get_owned_objects(
202        &self,
203        address: IotaAddress,
204        query: Option<IotaObjectResponseQuery>,
205        cursor: Option<ObjectID>,
206        limit: Option<usize>,
207    ) -> RpcResult<ObjectsPage> {
208        async move {
209            let limit =
210                validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
211            self.metrics.get_owned_objects_limit.observe(limit as f64);
212            let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
213            let options = options.unwrap_or_default();
214            let mut objects =
215                self.state
216                    .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
217
218            // objects here are of size (limit + 1), where the last one is the cursor for
219            // the next page
220            let has_next_page = objects.len() > limit && limit > 0;
221            objects.truncate(limit);
222            let next_cursor = (has_next_page).then_some(
223                objects
224                    .last()
225                    .map(|obj| obj.object_id)
226                    .unwrap_or(ObjectID::ZERO),
227            );
228
229            let data = match options.is_not_in_object_info() {
230                true => {
231                    let object_ids = objects.iter().map(|obj| obj.object_id).collect();
232                    self.read_api
233                        .multi_get_objects(object_ids, Some(options))
234                        .await
235                        .map_err(|e| Error::Internal(anyhow!(e)))?
236                }
237                false => objects
238                    .into_iter()
239                    .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
240                    .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
241            };
242
243            self.metrics
244                .get_owned_objects_result_size
245                .observe(data.len() as f64);
246            self.metrics
247                .get_owned_objects_result_size_total
248                .inc_by(data.len() as u64);
249            Ok(Page {
250                data,
251                next_cursor,
252                has_next_page,
253            })
254        }
255        .trace()
256        .await
257    }
258
259    #[instrument(skip(self))]
260    async fn query_transaction_blocks(
261        &self,
262        query: IotaTransactionBlockResponseQuery,
263        // If `Some`, the query will start from the next item after the specified cursor
264        cursor: Option<TransactionDigest>,
265        limit: Option<usize>,
266        descending_order: Option<bool>,
267    ) -> RpcResult<TransactionBlocksPage> {
268        async move {
269            let limit = cap_page_limit(limit);
270            self.metrics.query_tx_blocks_limit.observe(limit as f64);
271            let descending = descending_order.unwrap_or_default();
272            let opts = query.options.unwrap_or_default();
273
274            // Retrieve 1 extra item for next cursor
275            let mut digests = self
276                .state
277                .get_transactions(
278                    &self.transaction_kv_store,
279                    query.filter,
280                    cursor,
281                    Some(limit + 1),
282                    descending,
283                )
284                .await
285                .map_err(Error::from)?;
286            // De-dup digests, duplicate digests are possible, for example,
287            // when get_transactions_by_move_function with module or function being None.
288            let mut seen = HashSet::new();
289            digests.retain(|digest| seen.insert(*digest));
290
291            // extract next cursor
292            let has_next_page = digests.len() > limit;
293            digests.truncate(limit);
294            let next_cursor = digests.last().cloned().map_or(cursor, Some);
295
296            let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
297                digests
298                    .into_iter()
299                    .map(IotaTransactionBlockResponse::new)
300                    .collect()
301            } else {
302                self.read_api
303                    .multi_get_transaction_blocks(digests, Some(opts))
304                    .await
305                    .map_err(|e| Error::Internal(anyhow!(e)))?
306            };
307
308            self.metrics
309                .query_tx_blocks_result_size
310                .observe(data.len() as f64);
311            self.metrics
312                .query_tx_blocks_result_size_total
313                .inc_by(data.len() as u64);
314            Ok(Page {
315                data,
316                next_cursor,
317                has_next_page,
318            })
319        }
320        .trace()
321        .await
322    }
323
324    #[instrument(skip(self))]
325    async fn query_transaction_blocks_v2(
326        &self,
327        query: IotaTransactionBlockResponseQueryV2,
328        // If `Some`, the query will start from the next item after the specified cursor
329        cursor: Option<TransactionDigest>,
330        limit: Option<usize>,
331        descending_order: Option<bool>,
332    ) -> RpcResult<TransactionBlocksPage> {
333        let v1_filter = query
334            .filter
335            .map(|f| {
336                f.as_v1().ok_or_else(|| {
337                    Error::UserInput(UserInputError::Unsupported(
338                        "transaction filter is not supported".to_string(),
339                    ))
340                })
341            })
342            .transpose()?;
343
344        let v1_query = IotaTransactionBlockResponseQuery {
345            filter: v1_filter,
346            options: query.options,
347        };
348        self.query_transaction_blocks(v1_query, cursor, limit, descending_order)
349            .await
350    }
351
352    #[instrument(skip(self))]
353    async fn query_events(
354        &self,
355        query: EventFilter,
356        // exclusive cursor if `Some`, otherwise start from the beginning
357        cursor: Option<EventID>,
358        limit: Option<usize>,
359        descending_order: Option<bool>,
360    ) -> RpcResult<EventPage> {
361        async move {
362            let descending = descending_order.unwrap_or_default();
363            let limit = cap_page_limit(limit);
364            self.metrics.query_events_limit.observe(limit as f64);
365            // Retrieve 1 extra item for next cursor
366            let mut data = self
367                .state
368                .query_events(
369                    &self.transaction_kv_store,
370                    query,
371                    cursor,
372                    limit + 1,
373                    descending,
374                )
375                .await
376                .map_err(Error::from)?;
377            let has_next_page = data.len() > limit;
378            data.truncate(limit);
379            let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
380            self.metrics
381                .query_events_result_size
382                .observe(data.len() as f64);
383            self.metrics
384                .query_events_result_size_total
385                .inc_by(data.len() as u64);
386            Ok(EventPage {
387                data,
388                next_cursor,
389                has_next_page,
390            })
391        }
392        .trace()
393        .await
394    }
395
396    #[instrument(skip(self))]
397    fn subscribe_event(
398        &self,
399        sink: PendingSubscriptionSink,
400        filter: EventFilter,
401    ) -> SubscriptionResult {
402        let permit = self.acquire_subscribe_permit()?;
403        spawn_subscription(
404            sink,
405            self.state
406                .get_subscription_handler()
407                .subscribe_events(filter),
408            Some(permit),
409        );
410        Ok(())
411    }
412
413    fn subscribe_transaction(
414        &self,
415        sink: PendingSubscriptionSink,
416        filter: TransactionFilter,
417    ) -> SubscriptionResult {
418        // Validate unsupported filters
419        if matches!(filter, TransactionFilter::Checkpoint(_)) {
420            return Err("checkpoint filter is not supported".into());
421        }
422
423        let permit = self.acquire_subscribe_permit()?;
424        spawn_subscription(
425            sink,
426            self.state
427                .get_subscription_handler()
428                .subscribe_transactions(filter),
429            Some(permit),
430        );
431        Ok(())
432    }
433
434    #[instrument(skip(self))]
435    async fn get_dynamic_fields(
436        &self,
437        parent_object_id: ObjectID,
438        // If `Some`, the query will start from the next item after the specified cursor
439        cursor: Option<ObjectID>,
440        limit: Option<usize>,
441    ) -> RpcResult<DynamicFieldPage> {
442        async move {
443            let limit = cap_page_limit(limit);
444            self.metrics.get_dynamic_fields_limit.observe(limit as f64);
445            let mut data = self
446                .state
447                .get_dynamic_fields(parent_object_id, cursor, limit + 1)
448                .map_err(Error::from)?;
449            let has_next_page = data.len() > limit;
450            data.truncate(limit);
451            let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
452            self.metrics
453                .get_dynamic_fields_result_size
454                .observe(data.len() as f64);
455            self.metrics
456                .get_dynamic_fields_result_size_total
457                .inc_by(data.len() as u64);
458            Ok(DynamicFieldPage {
459                data: data.into_iter().map(|(_, w)| w.into()).collect(),
460                next_cursor,
461                has_next_page,
462            })
463        }
464        .trace()
465        .await
466    }
467
468    #[instrument(skip(self))]
469    async fn get_dynamic_field_object(
470        &self,
471        parent_object_id: ObjectID,
472        name: DynamicFieldName,
473    ) -> RpcResult<IotaObjectResponse> {
474        self.get_dynamic_field_object(
475            parent_object_id,
476            name,
477            Some(IotaObjectDataOptions::full_content()),
478        )
479        .await
480    }
481
482    #[instrument(skip(self))]
483    async fn get_dynamic_field_object_v2(
484        &self,
485        parent_object_id: ObjectID,
486        name: DynamicFieldName,
487        options: Option<IotaObjectDataOptions>,
488    ) -> RpcResult<IotaObjectResponse> {
489        self.get_dynamic_field_object(parent_object_id, name, options)
490            .await
491    }
492
493    async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
494        let name = name.parse::<Name>().map_err(Error::from)?;
495
496        // Construct the record id to lookup.
497        let record_id = self.iota_names_config.record_field_id(&name);
498
499        let parent_record_id = name
500            .parent()
501            .map(|parent_name| self.iota_names_config.record_field_id(&parent_name));
502
503        // Keep record IDs alive by declaring both before creating futures
504        let mut requests = vec![self.state.get_object(&record_id)];
505
506        // We only want to fetch both the child and the parent if the name is a
507        // subname.
508        if let Some(ref parent_record_id) = parent_record_id {
509            requests.push(self.state.get_object(parent_record_id));
510        }
511
512        // Couldn't find a `multi_get_object` for this crate (looks like it uses a k,v
513        // db) Always fetching both parent + child at the same time (even for
514        // node subnames), to avoid sequential db reads. We do this because we
515        // do not know if the requested name is a node subname or a leaf
516        // subname, and we can save a trip to the db.
517        let mut results = futures::future::try_join_all(requests)
518            .await
519            .map_err(Error::from)?;
520
521        // Removing without checking vector len, since it is known (== 1 or 2 depending
522        // on whether it is a subname or not).
523        let Some(object) = results.remove(0) else {
524            return Ok(None);
525        };
526
527        let name_record = NameRecord::try_from(object).map_err(Error::from)?;
528
529        let current_timestamp_ms = self
530            .get_latest_checkpoint_timestamp_ms()
531            .map_err(Error::from)?;
532
533        // Handling second-level names & node subnames is the same (we handle them as
534        // `node` records). We check their expiration, and if not expired,
535        // return the target address.
536        if !name_record.is_leaf_record() {
537            return if !name_record.is_node_expired(current_timestamp_ms) {
538                Ok(Some(name_record.into()))
539            } else {
540                Err(Error::from(IotaNamesError::NameExpired).into())
541            };
542        } else {
543            // Handle the `leaf` record case which requires to check the parent for
544            // expiration. We can remove since we know that if we're here, we have a parent
545            // result for the parent request. If the parent result is `None` for the
546            // existing leaf record, we consider it expired.
547            let Some(parent_object) = results.remove(0) else {
548                return Err(Error::from(IotaNamesError::NameExpired).into());
549            };
550
551            let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
552
553            // For a leaf record, we check that:
554            // 1. The parent is a valid parent for that leaf record
555            // 2. The parent is not expired
556            if parent_name_record.is_valid_leaf_parent(&name_record)
557                && !parent_name_record.is_node_expired(current_timestamp_ms)
558            {
559                Ok(Some(name_record.into()))
560            } else {
561                Err(Error::from(IotaNamesError::NameExpired).into())
562            }
563        }
564    }
565
566    #[instrument(skip(self))]
567    async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
568        let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
569
570        let Some(field_reverse_record_object) = self
571            .state
572            .get_object(&reverse_record_id)
573            .await
574            .map_err(Error::from)?
575        else {
576            return Ok(None);
577        };
578
579        let name = field_reverse_record_object
580            .to_rust::<Field<IotaAddress, Name>>()
581            .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
582            .value;
583
584        let name = name.to_string();
585
586        let resolved_record = self.iota_names_lookup(&name).await?;
587
588        // If looking up the name returns an empty result, we return an empty result.
589        if resolved_record.is_none() {
590            return Ok(None);
591        }
592
593        Ok(Some(name))
594    }
595
596    #[instrument(skip(self))]
597    async fn iota_names_find_all_registration_nfts(
598        &self,
599        address: IotaAddress,
600        cursor: Option<ObjectID>,
601        limit: Option<usize>,
602        options: Option<IotaObjectDataOptions>,
603    ) -> RpcResult<ObjectsPage> {
604        let query = IotaObjectResponseQuery {
605            filter: Some(IotaObjectDataFilter::StructType(NameRegistration::type_(
606                self.iota_names_config.package_address.into(),
607            ))),
608            options,
609        };
610
611        let owned_objects = self
612            .get_owned_objects(address, Some(query), cursor, limit)
613            .await?;
614
615        Ok(owned_objects)
616    }
617}
618
619impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
620    fn rpc(self) -> RpcModule<Self> {
621        self.into_rpc()
622    }
623
624    fn rpc_doc_module() -> Module {
625        IndexerApiOpenRpc::module_doc()
626    }
627}