iota_json_rpc/
indexer_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13    IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14    cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17    DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18    IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
19    IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery,
20    IotaTransactionBlockResponseQueryV2, ObjectsPage, Page, TransactionBlocksPage,
21    TransactionFilter,
22};
23use iota_metrics::spawn_monitored_task;
24use iota_names::{
25    IotaNamesNft, NameRegistration, config::IotaNamesConfig, error::IotaNamesError, name::Name,
26    registry::NameRecord,
27};
28use iota_open_rpc::Module;
29use iota_storage::key_value_store::TransactionKeyValueStore;
30use iota_types::{
31    base_types::{IotaAddress, ObjectID},
32    digests::TransactionDigest,
33    dynamic_field::{DynamicFieldName, Field},
34    error::{IotaObjectResponseError, UserInputError},
35    event::EventID,
36};
37use jsonrpsee::{
38    PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
39    core::{RpcResult, SubscriptionResult},
40};
41use move_bytecode_utils::layout::TypeLayoutBuilder;
42use move_core_types::language_storage::TypeTag;
43use serde::Serialize;
44use tokio::sync::{OwnedSemaphorePermit, Semaphore};
45use tracing::{debug, instrument};
46
47use crate::{
48    IotaRpcModule,
49    authority_state::{StateRead, StateReadResult},
50    error::{Error, IotaRpcInputError},
51    logger::FutureWithTracing as _,
52};
53
54async fn pipe_from_stream<T: Serialize>(
55    pending: PendingSubscriptionSink,
56    mut stream: impl Stream<Item = T> + Unpin,
57) -> Result<(), anyhow::Error> {
58    let sink = pending.accept().await?;
59
60    loop {
61        tokio::select! {
62            _ = sink.closed() => break Ok(()),
63            maybe_item = stream.next() => {
64                let Some(item) = maybe_item else {
65                    break Ok(());
66                };
67
68                let msg = SubscriptionMessage::from_json(&item)?;
69
70                if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
71                    match e {
72                        // The subscription or connection was closed.
73                        SendTimeoutError::Closed(_) => break Ok(()),
74                        // The subscription send timeout expired
75                        // the message is returned and you could save that message
76                        // and retry again later.
77                        SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
78                    }
79                }
80            }
81        }
82    }
83}
84
85pub fn spawn_subscription<S, T>(
86    pending: PendingSubscriptionSink,
87    rx: S,
88    permit: Option<OwnedSemaphorePermit>,
89) where
90    S: Stream<Item = T> + Unpin + Send + 'static,
91    T: Serialize + Send,
92{
93    spawn_monitored_task!(async move {
94        let _permit = permit;
95        match pipe_from_stream(pending, rx).await {
96            Ok(_) => {
97                debug!("Subscription completed.");
98            }
99            Err(err) => {
100                debug!("Subscription failed: {err:?}");
101            }
102        }
103    });
104}
105const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
106
107pub struct IndexerApi<R> {
108    state: Arc<dyn StateRead>,
109    read_api: R,
110    transaction_kv_store: Arc<TransactionKeyValueStore>,
111    iota_names_config: IotaNamesConfig,
112    pub metrics: Arc<JsonRpcMetrics>,
113    subscription_semaphore: Arc<Semaphore>,
114}
115
116impl<R: ReadApiServer> IndexerApi<R> {
117    pub fn new(
118        state: Arc<AuthorityState>,
119        read_api: R,
120        transaction_kv_store: Arc<TransactionKeyValueStore>,
121        metrics: Arc<JsonRpcMetrics>,
122        iota_names_config: IotaNamesConfig,
123        max_subscriptions: Option<usize>,
124    ) -> Self {
125        let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
126        Self {
127            state,
128            transaction_kv_store,
129            read_api,
130            metrics,
131            iota_names_config,
132            subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
133        }
134    }
135
136    fn extract_values_from_dynamic_field_name(
137        &self,
138        name: DynamicFieldName,
139    ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
140        let DynamicFieldName {
141            type_: name_type,
142            value,
143        } = name;
144        let epoch_store = self.state.load_epoch_store_one_call_per_task();
145        let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
146        let iota_json_value = IotaJsonValue::new(value)?;
147        let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
148        Ok((name_type, name_bcs_value))
149    }
150
151    fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
152        match self.subscription_semaphore.clone().try_acquire_owned() {
153            Ok(p) => Ok(p),
154            Err(_) => bail!("Resources exhausted"),
155        }
156    }
157
158    async fn get_dynamic_field_object(
159        &self,
160        parent_object_id: ObjectID,
161        name: DynamicFieldName,
162        options: Option<IotaObjectDataOptions>,
163    ) -> RpcResult<IotaObjectResponse> {
164        async move {
165            let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
166
167            let id = self
168                .state
169                .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
170                .map_err(Error::from)?;
171
172            if let Some(id) = id {
173                self.read_api
174                    .get_object(id, options)
175                    .await
176                    .map_err(|e| Error::Internal(anyhow!(e)))
177            } else {
178                Ok(IotaObjectResponse::new_with_error(
179                    IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
180                ))
181            }
182        }
183        .trace()
184        .await
185    }
186
187    fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
188        let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
189
190        let checkpoint = self
191            .state
192            .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
193
194        Ok(checkpoint.timestamp_ms)
195    }
196}
197
198#[async_trait]
199impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
200    #[instrument(skip(self))]
201    async fn get_owned_objects(
202        &self,
203        address: IotaAddress,
204        query: Option<IotaObjectResponseQuery>,
205        cursor: Option<ObjectID>,
206        limit: Option<usize>,
207    ) -> RpcResult<ObjectsPage> {
208        async move {
209            let limit =
210                validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
211            self.metrics.get_owned_objects_limit.observe(limit as f64);
212            let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
213            let options = options.unwrap_or_default();
214            let mut objects =
215                self.state
216                    .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
217
218            // objects here are of size (limit + 1), where the last one is the cursor for
219            // the next page
220            let has_next_page = objects.len() > limit;
221            objects.truncate(limit);
222            let next_cursor = objects
223                .last()
224                .cloned()
225                .map_or(cursor, |o_info| Some(o_info.object_id));
226
227            let data = match options.is_not_in_object_info() {
228                true => {
229                    let object_ids = objects.iter().map(|obj| obj.object_id).collect();
230                    self.read_api
231                        .multi_get_objects(object_ids, Some(options))
232                        .await
233                        .map_err(|e| Error::Internal(anyhow!(e)))?
234                }
235                false => objects
236                    .into_iter()
237                    .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
238                    .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
239            };
240
241            self.metrics
242                .get_owned_objects_result_size
243                .observe(data.len() as f64);
244            self.metrics
245                .get_owned_objects_result_size_total
246                .inc_by(data.len() as u64);
247            Ok(Page {
248                data,
249                next_cursor,
250                has_next_page,
251            })
252        }
253        .trace()
254        .await
255    }
256
257    #[instrument(skip(self))]
258    async fn query_transaction_blocks(
259        &self,
260        query: IotaTransactionBlockResponseQuery,
261        // If `Some`, the query will start from the next item after the specified cursor
262        cursor: Option<TransactionDigest>,
263        limit: Option<usize>,
264        descending_order: Option<bool>,
265    ) -> RpcResult<TransactionBlocksPage> {
266        async move {
267            let limit = cap_page_limit(limit);
268            self.metrics.query_tx_blocks_limit.observe(limit as f64);
269            let descending = descending_order.unwrap_or_default();
270            let opts = query.options.unwrap_or_default();
271
272            // Retrieve 1 extra item for next cursor
273            let mut digests = self
274                .state
275                .get_transactions(
276                    &self.transaction_kv_store,
277                    query.filter,
278                    cursor,
279                    Some(limit + 1),
280                    descending,
281                )
282                .await
283                .map_err(Error::from)?;
284            // De-dup digests, duplicate digests are possible, for example,
285            // when get_transactions_by_move_function with module or function being None.
286            let mut seen = HashSet::new();
287            digests.retain(|digest| seen.insert(*digest));
288
289            // extract next cursor
290            let has_next_page = digests.len() > limit;
291            digests.truncate(limit);
292            let next_cursor = digests.last().cloned().map_or(cursor, Some);
293
294            let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
295                digests
296                    .into_iter()
297                    .map(IotaTransactionBlockResponse::new)
298                    .collect()
299            } else {
300                self.read_api
301                    .multi_get_transaction_blocks(digests, Some(opts))
302                    .await
303                    .map_err(|e| Error::Internal(anyhow!(e)))?
304            };
305
306            self.metrics
307                .query_tx_blocks_result_size
308                .observe(data.len() as f64);
309            self.metrics
310                .query_tx_blocks_result_size_total
311                .inc_by(data.len() as u64);
312            Ok(Page {
313                data,
314                next_cursor,
315                has_next_page,
316            })
317        }
318        .trace()
319        .await
320    }
321
322    #[instrument(skip(self))]
323    async fn query_transaction_blocks_v2(
324        &self,
325        query: IotaTransactionBlockResponseQueryV2,
326        // If `Some`, the query will start from the next item after the specified cursor
327        cursor: Option<TransactionDigest>,
328        limit: Option<usize>,
329        descending_order: Option<bool>,
330    ) -> RpcResult<TransactionBlocksPage> {
331        let v1_filter = query
332            .filter
333            .map(|f| {
334                f.as_v1().ok_or_else(|| {
335                    Error::UserInput(UserInputError::Unsupported(
336                        "transaction filter is not supported".to_string(),
337                    ))
338                })
339            })
340            .transpose()?;
341
342        let v1_query = IotaTransactionBlockResponseQuery {
343            filter: v1_filter,
344            options: query.options,
345        };
346        self.query_transaction_blocks(v1_query, cursor, limit, descending_order)
347            .await
348    }
349
350    #[instrument(skip(self))]
351    async fn query_events(
352        &self,
353        query: EventFilter,
354        // exclusive cursor if `Some`, otherwise start from the beginning
355        cursor: Option<EventID>,
356        limit: Option<usize>,
357        descending_order: Option<bool>,
358    ) -> RpcResult<EventPage> {
359        async move {
360            let descending = descending_order.unwrap_or_default();
361            let limit = cap_page_limit(limit);
362            self.metrics.query_events_limit.observe(limit as f64);
363            // Retrieve 1 extra item for next cursor
364            let mut data = self
365                .state
366                .query_events(
367                    &self.transaction_kv_store,
368                    query,
369                    cursor,
370                    limit + 1,
371                    descending,
372                )
373                .await
374                .map_err(Error::from)?;
375            let has_next_page = data.len() > limit;
376            data.truncate(limit);
377            let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
378            self.metrics
379                .query_events_result_size
380                .observe(data.len() as f64);
381            self.metrics
382                .query_events_result_size_total
383                .inc_by(data.len() as u64);
384            Ok(EventPage {
385                data,
386                next_cursor,
387                has_next_page,
388            })
389        }
390        .trace()
391        .await
392    }
393
394    #[instrument(skip(self))]
395    fn subscribe_event(
396        &self,
397        sink: PendingSubscriptionSink,
398        filter: EventFilter,
399    ) -> SubscriptionResult {
400        let permit = self.acquire_subscribe_permit()?;
401        spawn_subscription(
402            sink,
403            self.state
404                .get_subscription_handler()
405                .subscribe_events(filter),
406            Some(permit),
407        );
408        Ok(())
409    }
410
411    fn subscribe_transaction(
412        &self,
413        sink: PendingSubscriptionSink,
414        filter: TransactionFilter,
415    ) -> SubscriptionResult {
416        // Validate unsupported filters
417        if matches!(filter, TransactionFilter::Checkpoint(_)) {
418            return Err("checkpoint filter is not supported".into());
419        }
420
421        let permit = self.acquire_subscribe_permit()?;
422        spawn_subscription(
423            sink,
424            self.state
425                .get_subscription_handler()
426                .subscribe_transactions(filter),
427            Some(permit),
428        );
429        Ok(())
430    }
431
432    #[instrument(skip(self))]
433    async fn get_dynamic_fields(
434        &self,
435        parent_object_id: ObjectID,
436        // If `Some`, the query will start from the next item after the specified cursor
437        cursor: Option<ObjectID>,
438        limit: Option<usize>,
439    ) -> RpcResult<DynamicFieldPage> {
440        async move {
441            let limit = cap_page_limit(limit);
442            self.metrics.get_dynamic_fields_limit.observe(limit as f64);
443            let mut data = self
444                .state
445                .get_dynamic_fields(parent_object_id, cursor, limit + 1)
446                .map_err(Error::from)?;
447            let has_next_page = data.len() > limit;
448            data.truncate(limit);
449            let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
450            self.metrics
451                .get_dynamic_fields_result_size
452                .observe(data.len() as f64);
453            self.metrics
454                .get_dynamic_fields_result_size_total
455                .inc_by(data.len() as u64);
456            Ok(DynamicFieldPage {
457                data: data.into_iter().map(|(_, w)| w.into()).collect(),
458                next_cursor,
459                has_next_page,
460            })
461        }
462        .trace()
463        .await
464    }
465
466    #[instrument(skip(self))]
467    async fn get_dynamic_field_object(
468        &self,
469        parent_object_id: ObjectID,
470        name: DynamicFieldName,
471    ) -> RpcResult<IotaObjectResponse> {
472        self.get_dynamic_field_object(
473            parent_object_id,
474            name,
475            Some(IotaObjectDataOptions::full_content()),
476        )
477        .await
478    }
479
480    #[instrument(skip(self))]
481    async fn get_dynamic_field_object_v2(
482        &self,
483        parent_object_id: ObjectID,
484        name: DynamicFieldName,
485        options: Option<IotaObjectDataOptions>,
486    ) -> RpcResult<IotaObjectResponse> {
487        self.get_dynamic_field_object(parent_object_id, name, options)
488            .await
489    }
490
491    async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
492        let name = name.parse::<Name>().map_err(Error::from)?;
493
494        // Construct the record id to lookup.
495        let record_id = self.iota_names_config.record_field_id(&name);
496
497        let parent_record_id = name
498            .parent()
499            .map(|parent_name| self.iota_names_config.record_field_id(&parent_name));
500
501        // Keep record IDs alive by declaring both before creating futures
502        let mut requests = vec![self.state.get_object(&record_id)];
503
504        // We only want to fetch both the child and the parent if the name is a
505        // subname.
506        if let Some(ref parent_record_id) = parent_record_id {
507            requests.push(self.state.get_object(parent_record_id));
508        }
509
510        // Couldn't find a `multi_get_object` for this crate (looks like it uses a k,v
511        // db) Always fetching both parent + child at the same time (even for
512        // node subnames), to avoid sequential db reads. We do this because we
513        // do not know if the requested name is a node subname or a leaf
514        // subname, and we can save a trip to the db.
515        let mut results = futures::future::try_join_all(requests)
516            .await
517            .map_err(Error::from)?;
518
519        // Removing without checking vector len, since it is known (== 1 or 2 depending
520        // on whether it is a subname or not).
521        let Some(object) = results.remove(0) else {
522            return Ok(None);
523        };
524
525        let name_record = NameRecord::try_from(object).map_err(Error::from)?;
526
527        let current_timestamp_ms = self
528            .get_latest_checkpoint_timestamp_ms()
529            .map_err(Error::from)?;
530
531        // Handling second-level names & node subnames is the same (we handle them as
532        // `node` records). We check their expiration, and if not expired,
533        // return the target address.
534        if !name_record.is_leaf_record() {
535            return if !name_record.is_node_expired(current_timestamp_ms) {
536                Ok(Some(name_record.into()))
537            } else {
538                Err(Error::from(IotaNamesError::NameExpired).into())
539            };
540        } else {
541            // Handle the `leaf` record case which requires to check the parent for
542            // expiration. We can remove since we know that if we're here, we have a parent
543            // result for the parent request. If the parent result is `None` for the
544            // existing leaf record, we consider it expired.
545            let Some(parent_object) = results.remove(0) else {
546                return Err(Error::from(IotaNamesError::NameExpired).into());
547            };
548
549            let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
550
551            // For a leaf record, we check that:
552            // 1. The parent is a valid parent for that leaf record
553            // 2. The parent is not expired
554            if parent_name_record.is_valid_leaf_parent(&name_record)
555                && !parent_name_record.is_node_expired(current_timestamp_ms)
556            {
557                Ok(Some(name_record.into()))
558            } else {
559                Err(Error::from(IotaNamesError::NameExpired).into())
560            }
561        }
562    }
563
564    #[instrument(skip(self))]
565    async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
566        let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
567
568        let Some(field_reverse_record_object) = self
569            .state
570            .get_object(&reverse_record_id)
571            .await
572            .map_err(Error::from)?
573        else {
574            return Ok(None);
575        };
576
577        let name = field_reverse_record_object
578            .to_rust::<Field<IotaAddress, Name>>()
579            .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
580            .value;
581
582        let name = name.to_string();
583
584        let resolved_record = self.iota_names_lookup(&name).await?;
585
586        // If looking up the name returns an empty result, we return an empty result.
587        if resolved_record.is_none() {
588            return Ok(None);
589        }
590
591        Ok(Some(name))
592    }
593
594    #[instrument(skip(self))]
595    async fn iota_names_find_all_registration_nfts(
596        &self,
597        address: IotaAddress,
598        cursor: Option<ObjectID>,
599        limit: Option<usize>,
600        options: Option<IotaObjectDataOptions>,
601    ) -> RpcResult<ObjectsPage> {
602        let query = IotaObjectResponseQuery {
603            filter: Some(IotaObjectDataFilter::StructType(NameRegistration::type_(
604                self.iota_names_config.package_address.into(),
605            ))),
606            options,
607        };
608
609        let owned_objects = self
610            .get_owned_objects(address, Some(query), cursor, limit)
611            .await?;
612
613        Ok(owned_objects)
614    }
615}
616
617impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
618    fn rpc(self) -> RpcModule<Self> {
619        self.into_rpc()
620    }
621
622    fn rpc_doc_module() -> Module {
623        IndexerApiOpenRpc::module_doc()
624    }
625}