1use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13 IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14 cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17 DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18 IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseError, IotaObjectResponseQuery,
19 IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery,
20 IotaTransactionBlockResponseQueryV2, ObjectsPage, Page, TransactionBlocksPage,
21 TransactionFilter,
22};
23use iota_metrics::spawn_monitored_task;
24use iota_names::{
25 IotaNamesNft, NameRegistration, config::IotaNamesConfig, error::IotaNamesError, name::Name,
26 registry::NameRecord,
27};
28use iota_open_rpc::Module;
29use iota_storage::key_value_store::TransactionKeyValueStore;
30use iota_types::{
31 base_types::{IotaAddress, ObjectID, TypeTag},
32 digests::TransactionDigest,
33 dynamic_field::{DynamicFieldName, Field},
34 error::UserInputError,
35 event::EventID,
36 iota_sdk_types_conversions::type_tag_sdk_to_core,
37};
38use jsonrpsee::{
39 PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
40 core::{RpcResult, SubscriptionResult},
41};
42use move_bytecode_utils::layout::TypeLayoutBuilder;
43use serde::Serialize;
44use tokio::sync::{OwnedSemaphorePermit, Semaphore};
45use tracing::{debug, instrument};
46
47use crate::{
48 IotaRpcModule,
49 authority_state::{StateRead, StateReadResult},
50 error::{Error, IotaRpcInputError},
51 logger::FutureWithTracing as _,
52};
53
54async fn pipe_from_stream<T: Serialize>(
55 pending: PendingSubscriptionSink,
56 mut stream: impl Stream<Item = T> + Unpin,
57) -> Result<(), anyhow::Error> {
58 let sink = pending.accept().await?;
59
60 loop {
61 tokio::select! {
62 _ = sink.closed() => break Ok(()),
63 maybe_item = stream.next() => {
64 let Some(item) = maybe_item else {
65 break Ok(());
66 };
67
68 let msg = SubscriptionMessage::from_json(&item)?;
69
70 if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
71 match e {
72 SendTimeoutError::Closed(_) => break Ok(()),
74 SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
78 }
79 }
80 }
81 }
82 }
83}
84
85pub fn spawn_subscription<S, T>(
86 pending: PendingSubscriptionSink,
87 rx: S,
88 permit: Option<OwnedSemaphorePermit>,
89) where
90 S: Stream<Item = T> + Unpin + Send + 'static,
91 T: Serialize + Send,
92{
93 spawn_monitored_task!(async move {
94 let _permit = permit;
95 match pipe_from_stream(pending, rx).await {
96 Ok(_) => {
97 debug!("Subscription completed.");
98 }
99 Err(err) => {
100 debug!("Subscription failed: {err:?}");
101 }
102 }
103 });
104}
105const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
106
107pub struct IndexerApi<R> {
108 state: Arc<dyn StateRead>,
109 read_api: R,
110 transaction_kv_store: Arc<TransactionKeyValueStore>,
111 iota_names_config: IotaNamesConfig,
112 pub metrics: Arc<JsonRpcMetrics>,
113 subscription_semaphore: Arc<Semaphore>,
114}
115
116impl<R: ReadApiServer> IndexerApi<R> {
117 pub fn new(
118 state: Arc<AuthorityState>,
119 read_api: R,
120 transaction_kv_store: Arc<TransactionKeyValueStore>,
121 metrics: Arc<JsonRpcMetrics>,
122 iota_names_config: IotaNamesConfig,
123 max_subscriptions: Option<usize>,
124 ) -> Self {
125 let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
126 Self {
127 state,
128 transaction_kv_store,
129 read_api,
130 metrics,
131 iota_names_config,
132 subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
133 }
134 }
135
136 fn extract_values_from_dynamic_field_name(
137 &self,
138 name: DynamicFieldName,
139 ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
140 let DynamicFieldName {
141 type_: name_type,
142 value,
143 } = name;
144 let epoch_store = self.state.load_epoch_store_one_call_per_task();
145 let layout = TypeLayoutBuilder::build_with_types(
146 &type_tag_sdk_to_core(&name_type),
147 epoch_store.module_cache(),
148 )?;
149 let iota_json_value = IotaJsonValue::new(value)?;
150 let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
151 Ok((name_type, name_bcs_value))
152 }
153
154 fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
155 match self.subscription_semaphore.clone().try_acquire_owned() {
156 Ok(p) => Ok(p),
157 Err(_) => bail!("Resources exhausted"),
158 }
159 }
160
161 async fn get_dynamic_field_object(
162 &self,
163 parent_object_id: ObjectID,
164 name: DynamicFieldName,
165 options: Option<IotaObjectDataOptions>,
166 ) -> RpcResult<IotaObjectResponse> {
167 async move {
168 let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
169
170 let id = self
171 .state
172 .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
173 .map_err(Error::from)?;
174
175 if let Some(id) = id {
176 self.read_api
177 .get_object(id, options)
178 .await
179 .map_err(|e| Error::Internal(anyhow!(e)))
180 } else {
181 Ok(IotaObjectResponse::new_with_error(
182 IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
183 ))
184 }
185 }
186 .trace()
187 .await
188 }
189
190 fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
191 let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
192
193 let checkpoint = self
194 .state
195 .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
196
197 Ok(checkpoint.timestamp_ms)
198 }
199}
200
201#[async_trait]
202impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
203 #[instrument(skip(self, address), fields(address = %address))]
204 async fn get_owned_objects(
205 &self,
206 address: IotaAddress,
207 query: Option<IotaObjectResponseQuery>,
208 cursor: Option<ObjectID>,
209 limit: Option<usize>,
210 ) -> RpcResult<ObjectsPage> {
211 async move {
212 let limit =
213 validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
214 self.metrics.get_owned_objects_limit.observe(limit as f64);
215 let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
216 let options = options.unwrap_or_default();
217 let mut objects =
218 self.state
219 .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
220
221 let has_next_page = objects.len() > limit && limit > 0;
224 objects.truncate(limit);
225 let next_cursor = (has_next_page).then_some(
226 objects
227 .last()
228 .map(|obj| obj.object_id)
229 .unwrap_or(ObjectID::ZERO),
230 );
231
232 let data = match options.is_not_in_object_info() {
233 true => {
234 let object_ids = objects.iter().map(|obj| obj.object_id).collect();
235 self.read_api
236 .multi_get_objects(object_ids, Some(options))
237 .await
238 .map_err(|e| Error::Internal(anyhow!(e)))?
239 }
240 false => objects
241 .into_iter()
242 .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
243 .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
244 };
245
246 self.metrics
247 .get_owned_objects_result_size
248 .observe(data.len() as f64);
249 self.metrics
250 .get_owned_objects_result_size_total
251 .inc_by(data.len() as u64);
252 Ok(Page {
253 data,
254 next_cursor,
255 has_next_page,
256 })
257 }
258 .trace()
259 .await
260 }
261
262 #[instrument(skip(self))]
263 async fn query_transaction_blocks(
264 &self,
265 query: IotaTransactionBlockResponseQuery,
266 cursor: Option<TransactionDigest>,
268 limit: Option<usize>,
269 descending_order: Option<bool>,
270 ) -> RpcResult<TransactionBlocksPage> {
271 async move {
272 let limit = cap_page_limit(limit);
273 self.metrics.query_tx_blocks_limit.observe(limit as f64);
274 let descending = descending_order.unwrap_or_default();
275 let opts = query.options.unwrap_or_default();
276
277 let mut digests = self
279 .state
280 .get_transactions(
281 &self.transaction_kv_store,
282 query.filter,
283 cursor,
284 Some(limit + 1),
285 descending,
286 )
287 .await
288 .map_err(Error::from)?;
289 let mut seen = HashSet::new();
292 digests.retain(|digest| seen.insert(*digest));
293
294 let has_next_page = digests.len() > limit;
296 digests.truncate(limit);
297 let next_cursor = digests.last().cloned().map_or(cursor, Some);
298
299 let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
300 digests
301 .into_iter()
302 .map(IotaTransactionBlockResponse::new)
303 .collect()
304 } else {
305 self.read_api
306 .multi_get_transaction_blocks(digests, Some(opts))
307 .await
308 .map_err(|e| Error::Internal(anyhow!(e)))?
309 };
310
311 self.metrics
312 .query_tx_blocks_result_size
313 .observe(data.len() as f64);
314 self.metrics
315 .query_tx_blocks_result_size_total
316 .inc_by(data.len() as u64);
317 Ok(Page {
318 data,
319 next_cursor,
320 has_next_page,
321 })
322 }
323 .trace()
324 .await
325 }
326
327 #[instrument(skip(self))]
328 async fn query_transaction_blocks_v2(
329 &self,
330 query: IotaTransactionBlockResponseQueryV2,
331 cursor: Option<TransactionDigest>,
333 limit: Option<usize>,
334 descending_order: Option<bool>,
335 ) -> RpcResult<TransactionBlocksPage> {
336 let v1_filter = query
337 .filter
338 .map(|f| {
339 f.as_v1().ok_or_else(|| {
340 Error::UserInput(UserInputError::Unsupported(
341 "transaction filter is not supported".to_string(),
342 ))
343 })
344 })
345 .transpose()?;
346
347 let v1_query = IotaTransactionBlockResponseQuery {
348 filter: v1_filter,
349 options: query.options,
350 };
351 self.query_transaction_blocks(v1_query, cursor, limit, descending_order)
352 .await
353 }
354
355 #[instrument(skip(self))]
356 async fn query_events(
357 &self,
358 query: EventFilter,
359 cursor: Option<EventID>,
361 limit: Option<usize>,
362 descending_order: Option<bool>,
363 ) -> RpcResult<EventPage> {
364 async move {
365 let descending = descending_order.unwrap_or_default();
366 let limit = cap_page_limit(limit);
367 self.metrics.query_events_limit.observe(limit as f64);
368 let mut data = self
370 .state
371 .query_events(
372 &self.transaction_kv_store,
373 query,
374 cursor,
375 limit + 1,
376 descending,
377 )
378 .await
379 .map_err(Error::from)?;
380 let has_next_page = data.len() > limit;
381 data.truncate(limit);
382 let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
383 self.metrics
384 .query_events_result_size
385 .observe(data.len() as f64);
386 self.metrics
387 .query_events_result_size_total
388 .inc_by(data.len() as u64);
389 Ok(EventPage {
390 data,
391 next_cursor,
392 has_next_page,
393 })
394 }
395 .trace()
396 .await
397 }
398
399 #[instrument(skip(self))]
400 fn subscribe_event(
401 &self,
402 sink: PendingSubscriptionSink,
403 filter: EventFilter,
404 ) -> SubscriptionResult {
405 let permit = self.acquire_subscribe_permit()?;
406 spawn_subscription(
407 sink,
408 self.state
409 .get_subscription_handler()
410 .subscribe_events(filter),
411 Some(permit),
412 );
413 Ok(())
414 }
415
416 fn subscribe_transaction(
417 &self,
418 sink: PendingSubscriptionSink,
419 filter: TransactionFilter,
420 ) -> SubscriptionResult {
421 if matches!(filter, TransactionFilter::Checkpoint(_)) {
423 return Err("checkpoint filter is not supported".into());
424 }
425
426 let permit = self.acquire_subscribe_permit()?;
427 spawn_subscription(
428 sink,
429 self.state
430 .get_subscription_handler()
431 .subscribe_transactions(filter),
432 Some(permit),
433 );
434 Ok(())
435 }
436
437 #[instrument(skip(self, parent_object_id), fields(parent_object_id = %parent_object_id))]
438 async fn get_dynamic_fields(
439 &self,
440 parent_object_id: ObjectID,
441 cursor: Option<ObjectID>,
443 limit: Option<usize>,
444 ) -> RpcResult<DynamicFieldPage> {
445 async move {
446 let limit = cap_page_limit(limit);
447 self.metrics.get_dynamic_fields_limit.observe(limit as f64);
448 let mut data = self
449 .state
450 .get_dynamic_fields(parent_object_id, cursor, limit + 1)
451 .map_err(Error::from)?;
452 let has_next_page = data.len() > limit;
453 data.truncate(limit);
454 let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
455 self.metrics
456 .get_dynamic_fields_result_size
457 .observe(data.len() as f64);
458 self.metrics
459 .get_dynamic_fields_result_size_total
460 .inc_by(data.len() as u64);
461 Ok(DynamicFieldPage {
462 data: data.into_iter().map(|(_, w)| w.into()).collect(),
463 next_cursor,
464 has_next_page,
465 })
466 }
467 .trace()
468 .await
469 }
470
471 #[instrument(skip(self, parent_object_id), fields(parent_object_id = %parent_object_id))]
472 async fn get_dynamic_field_object(
473 &self,
474 parent_object_id: ObjectID,
475 name: DynamicFieldName,
476 ) -> RpcResult<IotaObjectResponse> {
477 self.get_dynamic_field_object(
478 parent_object_id,
479 name,
480 Some(IotaObjectDataOptions::full_content()),
481 )
482 .await
483 }
484
485 #[instrument(skip(self, parent_object_id), fields(parent_object_id = %parent_object_id))]
486 async fn get_dynamic_field_object_v2(
487 &self,
488 parent_object_id: ObjectID,
489 name: DynamicFieldName,
490 options: Option<IotaObjectDataOptions>,
491 ) -> RpcResult<IotaObjectResponse> {
492 self.get_dynamic_field_object(parent_object_id, name, options)
493 .await
494 }
495
496 async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
497 let name = name.parse::<Name>().map_err(Error::from)?;
498
499 let record_id = self.iota_names_config.record_field_id(&name);
501
502 let parent_record_id = name
503 .parent()
504 .map(|parent_name| self.iota_names_config.record_field_id(&parent_name));
505
506 let mut requests = vec![self.state.get_object(&record_id)];
508
509 if let Some(ref parent_record_id) = parent_record_id {
512 requests.push(self.state.get_object(parent_record_id));
513 }
514
515 let mut results = futures::future::try_join_all(requests)
521 .await
522 .map_err(Error::from)?;
523
524 let Some(object) = results.remove(0) else {
527 return Ok(None);
528 };
529
530 let name_record = NameRecord::try_from(object).map_err(Error::from)?;
531
532 let current_timestamp_ms = self
533 .get_latest_checkpoint_timestamp_ms()
534 .map_err(Error::from)?;
535
536 if !name_record.is_leaf_record() {
540 return if !name_record.is_node_expired(current_timestamp_ms) {
541 Ok(Some(name_record.into()))
542 } else {
543 Err(Error::from(IotaNamesError::NameExpired).into())
544 };
545 } else {
546 let Some(parent_object) = results.remove(0) else {
551 return Err(Error::from(IotaNamesError::NameExpired).into());
552 };
553
554 let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
555
556 if parent_name_record.is_valid_leaf_parent(&name_record)
560 && !parent_name_record.is_node_expired(current_timestamp_ms)
561 {
562 Ok(Some(name_record.into()))
563 } else {
564 Err(Error::from(IotaNamesError::NameExpired).into())
565 }
566 }
567 }
568
569 #[instrument(skip(self, address), fields(address = %address))]
570 async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
571 let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
572
573 let Some(field_reverse_record_object) = self
574 .state
575 .get_object(&reverse_record_id)
576 .await
577 .map_err(Error::from)?
578 else {
579 return Ok(None);
580 };
581
582 let name = field_reverse_record_object
583 .to_rust::<Field<IotaAddress, Name>>()
584 .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
585 .value;
586
587 let name = name.to_string();
588
589 let resolved_record = self.iota_names_lookup(&name).await?;
590
591 if resolved_record.is_none() {
593 return Ok(None);
594 }
595
596 Ok(Some(name))
597 }
598
599 #[instrument(skip(self, address), fields(address = %address))]
600 async fn iota_names_find_all_registration_nfts(
601 &self,
602 address: IotaAddress,
603 cursor: Option<ObjectID>,
604 limit: Option<usize>,
605 options: Option<IotaObjectDataOptions>,
606 ) -> RpcResult<ObjectsPage> {
607 let query = IotaObjectResponseQuery {
608 filter: Some(IotaObjectDataFilter::StructType(NameRegistration::type_(
609 self.iota_names_config.package_address,
610 ))),
611 options,
612 };
613
614 let owned_objects = self
615 .get_owned_objects(address, Some(query), cursor, limit)
616 .await?;
617
618 Ok(owned_objects)
619 }
620}
621
622impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
623 fn rpc(self) -> RpcModule<Self> {
624 self.into_rpc()
625 }
626
627 fn rpc_doc_module() -> Module {
628 IndexerApiOpenRpc::module_doc()
629 }
630}