1use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13 IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14 cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17 DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18 IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
19 IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery,
20 IotaTransactionBlockResponseQueryV2, ObjectsPage, Page, TransactionBlocksPage,
21 TransactionFilter,
22};
23use iota_metrics::spawn_monitored_task;
24use iota_names::{
25 IotaNamesNft, NameRegistration, config::IotaNamesConfig, error::IotaNamesError, name::Name,
26 registry::NameRecord,
27};
28use iota_open_rpc::Module;
29use iota_storage::key_value_store::TransactionKeyValueStore;
30use iota_types::{
31 base_types::{IotaAddress, ObjectID},
32 digests::TransactionDigest,
33 dynamic_field::{DynamicFieldName, Field},
34 error::{IotaObjectResponseError, UserInputError},
35 event::EventID,
36};
37use jsonrpsee::{
38 PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
39 core::{RpcResult, SubscriptionResult},
40};
41use move_bytecode_utils::layout::TypeLayoutBuilder;
42use move_core_types::language_storage::TypeTag;
43use serde::Serialize;
44use tokio::sync::{OwnedSemaphorePermit, Semaphore};
45use tracing::{debug, instrument};
46
47use crate::{
48 IotaRpcModule,
49 authority_state::{StateRead, StateReadResult},
50 error::{Error, IotaRpcInputError},
51 logger::FutureWithTracing as _,
52};
53
54async fn pipe_from_stream<T: Serialize>(
55 pending: PendingSubscriptionSink,
56 mut stream: impl Stream<Item = T> + Unpin,
57) -> Result<(), anyhow::Error> {
58 let sink = pending.accept().await?;
59
60 loop {
61 tokio::select! {
62 _ = sink.closed() => break Ok(()),
63 maybe_item = stream.next() => {
64 let Some(item) = maybe_item else {
65 break Ok(());
66 };
67
68 let msg = SubscriptionMessage::from_json(&item)?;
69
70 if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
71 match e {
72 SendTimeoutError::Closed(_) => break Ok(()),
74 SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
78 }
79 }
80 }
81 }
82 }
83}
84
85pub fn spawn_subscription<S, T>(
86 pending: PendingSubscriptionSink,
87 rx: S,
88 permit: Option<OwnedSemaphorePermit>,
89) where
90 S: Stream<Item = T> + Unpin + Send + 'static,
91 T: Serialize + Send,
92{
93 spawn_monitored_task!(async move {
94 let _permit = permit;
95 match pipe_from_stream(pending, rx).await {
96 Ok(_) => {
97 debug!("Subscription completed.");
98 }
99 Err(err) => {
100 debug!("Subscription failed: {err:?}");
101 }
102 }
103 });
104}
105const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
106
107pub struct IndexerApi<R> {
108 state: Arc<dyn StateRead>,
109 read_api: R,
110 transaction_kv_store: Arc<TransactionKeyValueStore>,
111 iota_names_config: IotaNamesConfig,
112 pub metrics: Arc<JsonRpcMetrics>,
113 subscription_semaphore: Arc<Semaphore>,
114}
115
116impl<R: ReadApiServer> IndexerApi<R> {
117 pub fn new(
118 state: Arc<AuthorityState>,
119 read_api: R,
120 transaction_kv_store: Arc<TransactionKeyValueStore>,
121 metrics: Arc<JsonRpcMetrics>,
122 iota_names_config: IotaNamesConfig,
123 max_subscriptions: Option<usize>,
124 ) -> Self {
125 let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
126 Self {
127 state,
128 transaction_kv_store,
129 read_api,
130 metrics,
131 iota_names_config,
132 subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
133 }
134 }
135
136 fn extract_values_from_dynamic_field_name(
137 &self,
138 name: DynamicFieldName,
139 ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
140 let DynamicFieldName {
141 type_: name_type,
142 value,
143 } = name;
144 let epoch_store = self.state.load_epoch_store_one_call_per_task();
145 let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
146 let iota_json_value = IotaJsonValue::new(value)?;
147 let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
148 Ok((name_type, name_bcs_value))
149 }
150
151 fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
152 match self.subscription_semaphore.clone().try_acquire_owned() {
153 Ok(p) => Ok(p),
154 Err(_) => bail!("Resources exhausted"),
155 }
156 }
157
158 async fn get_dynamic_field_object(
159 &self,
160 parent_object_id: ObjectID,
161 name: DynamicFieldName,
162 options: Option<IotaObjectDataOptions>,
163 ) -> RpcResult<IotaObjectResponse> {
164 async move {
165 let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
166
167 let id = self
168 .state
169 .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
170 .map_err(Error::from)?;
171
172 if let Some(id) = id {
173 self.read_api
174 .get_object(id, options)
175 .await
176 .map_err(|e| Error::Internal(anyhow!(e)))
177 } else {
178 Ok(IotaObjectResponse::new_with_error(
179 IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
180 ))
181 }
182 }
183 .trace()
184 .await
185 }
186
187 fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
188 let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
189
190 let checkpoint = self
191 .state
192 .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
193
194 Ok(checkpoint.timestamp_ms)
195 }
196}
197
198#[async_trait]
199impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
200 #[instrument(skip(self))]
201 async fn get_owned_objects(
202 &self,
203 address: IotaAddress,
204 query: Option<IotaObjectResponseQuery>,
205 cursor: Option<ObjectID>,
206 limit: Option<usize>,
207 ) -> RpcResult<ObjectsPage> {
208 async move {
209 let limit =
210 validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
211 self.metrics.get_owned_objects_limit.observe(limit as f64);
212 let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
213 let options = options.unwrap_or_default();
214 let mut objects =
215 self.state
216 .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
217
218 let has_next_page = objects.len() > limit;
221 objects.truncate(limit);
222 let next_cursor = objects
223 .last()
224 .cloned()
225 .map_or(cursor, |o_info| Some(o_info.object_id));
226
227 let data = match options.is_not_in_object_info() {
228 true => {
229 let object_ids = objects.iter().map(|obj| obj.object_id).collect();
230 self.read_api
231 .multi_get_objects(object_ids, Some(options))
232 .await
233 .map_err(|e| Error::Internal(anyhow!(e)))?
234 }
235 false => objects
236 .into_iter()
237 .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
238 .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
239 };
240
241 self.metrics
242 .get_owned_objects_result_size
243 .observe(data.len() as f64);
244 self.metrics
245 .get_owned_objects_result_size_total
246 .inc_by(data.len() as u64);
247 Ok(Page {
248 data,
249 next_cursor,
250 has_next_page,
251 })
252 }
253 .trace()
254 .await
255 }
256
257 #[instrument(skip(self))]
258 async fn query_transaction_blocks(
259 &self,
260 query: IotaTransactionBlockResponseQuery,
261 cursor: Option<TransactionDigest>,
263 limit: Option<usize>,
264 descending_order: Option<bool>,
265 ) -> RpcResult<TransactionBlocksPage> {
266 async move {
267 let limit = cap_page_limit(limit);
268 self.metrics.query_tx_blocks_limit.observe(limit as f64);
269 let descending = descending_order.unwrap_or_default();
270 let opts = query.options.unwrap_or_default();
271
272 let mut digests = self
274 .state
275 .get_transactions(
276 &self.transaction_kv_store,
277 query.filter,
278 cursor,
279 Some(limit + 1),
280 descending,
281 )
282 .await
283 .map_err(Error::from)?;
284 let mut seen = HashSet::new();
287 digests.retain(|digest| seen.insert(*digest));
288
289 let has_next_page = digests.len() > limit;
291 digests.truncate(limit);
292 let next_cursor = digests.last().cloned().map_or(cursor, Some);
293
294 let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
295 digests
296 .into_iter()
297 .map(IotaTransactionBlockResponse::new)
298 .collect()
299 } else {
300 self.read_api
301 .multi_get_transaction_blocks(digests, Some(opts))
302 .await
303 .map_err(|e| Error::Internal(anyhow!(e)))?
304 };
305
306 self.metrics
307 .query_tx_blocks_result_size
308 .observe(data.len() as f64);
309 self.metrics
310 .query_tx_blocks_result_size_total
311 .inc_by(data.len() as u64);
312 Ok(Page {
313 data,
314 next_cursor,
315 has_next_page,
316 })
317 }
318 .trace()
319 .await
320 }
321
322 #[instrument(skip(self))]
323 async fn query_transaction_blocks_v2(
324 &self,
325 query: IotaTransactionBlockResponseQueryV2,
326 cursor: Option<TransactionDigest>,
328 limit: Option<usize>,
329 descending_order: Option<bool>,
330 ) -> RpcResult<TransactionBlocksPage> {
331 let v1_filter = query
332 .filter
333 .map(|f| {
334 f.as_v1().ok_or_else(|| {
335 Error::UserInput(UserInputError::Unsupported(
336 "transaction filter is not supported".to_string(),
337 ))
338 })
339 })
340 .transpose()?;
341
342 let v1_query = IotaTransactionBlockResponseQuery {
343 filter: v1_filter,
344 options: query.options,
345 };
346 self.query_transaction_blocks(v1_query, cursor, limit, descending_order)
347 .await
348 }
349
350 #[instrument(skip(self))]
351 async fn query_events(
352 &self,
353 query: EventFilter,
354 cursor: Option<EventID>,
356 limit: Option<usize>,
357 descending_order: Option<bool>,
358 ) -> RpcResult<EventPage> {
359 async move {
360 let descending = descending_order.unwrap_or_default();
361 let limit = cap_page_limit(limit);
362 self.metrics.query_events_limit.observe(limit as f64);
363 let mut data = self
365 .state
366 .query_events(
367 &self.transaction_kv_store,
368 query,
369 cursor,
370 limit + 1,
371 descending,
372 )
373 .await
374 .map_err(Error::from)?;
375 let has_next_page = data.len() > limit;
376 data.truncate(limit);
377 let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
378 self.metrics
379 .query_events_result_size
380 .observe(data.len() as f64);
381 self.metrics
382 .query_events_result_size_total
383 .inc_by(data.len() as u64);
384 Ok(EventPage {
385 data,
386 next_cursor,
387 has_next_page,
388 })
389 }
390 .trace()
391 .await
392 }
393
394 #[instrument(skip(self))]
395 fn subscribe_event(
396 &self,
397 sink: PendingSubscriptionSink,
398 filter: EventFilter,
399 ) -> SubscriptionResult {
400 let permit = self.acquire_subscribe_permit()?;
401 spawn_subscription(
402 sink,
403 self.state
404 .get_subscription_handler()
405 .subscribe_events(filter),
406 Some(permit),
407 );
408 Ok(())
409 }
410
411 fn subscribe_transaction(
412 &self,
413 sink: PendingSubscriptionSink,
414 filter: TransactionFilter,
415 ) -> SubscriptionResult {
416 if matches!(filter, TransactionFilter::Checkpoint(_)) {
418 return Err("checkpoint filter is not supported".into());
419 }
420
421 let permit = self.acquire_subscribe_permit()?;
422 spawn_subscription(
423 sink,
424 self.state
425 .get_subscription_handler()
426 .subscribe_transactions(filter),
427 Some(permit),
428 );
429 Ok(())
430 }
431
432 #[instrument(skip(self))]
433 async fn get_dynamic_fields(
434 &self,
435 parent_object_id: ObjectID,
436 cursor: Option<ObjectID>,
438 limit: Option<usize>,
439 ) -> RpcResult<DynamicFieldPage> {
440 async move {
441 let limit = cap_page_limit(limit);
442 self.metrics.get_dynamic_fields_limit.observe(limit as f64);
443 let mut data = self
444 .state
445 .get_dynamic_fields(parent_object_id, cursor, limit + 1)
446 .map_err(Error::from)?;
447 let has_next_page = data.len() > limit;
448 data.truncate(limit);
449 let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
450 self.metrics
451 .get_dynamic_fields_result_size
452 .observe(data.len() as f64);
453 self.metrics
454 .get_dynamic_fields_result_size_total
455 .inc_by(data.len() as u64);
456 Ok(DynamicFieldPage {
457 data: data.into_iter().map(|(_, w)| w.into()).collect(),
458 next_cursor,
459 has_next_page,
460 })
461 }
462 .trace()
463 .await
464 }
465
466 #[instrument(skip(self))]
467 async fn get_dynamic_field_object(
468 &self,
469 parent_object_id: ObjectID,
470 name: DynamicFieldName,
471 ) -> RpcResult<IotaObjectResponse> {
472 self.get_dynamic_field_object(
473 parent_object_id,
474 name,
475 Some(IotaObjectDataOptions::full_content()),
476 )
477 .await
478 }
479
480 #[instrument(skip(self))]
481 async fn get_dynamic_field_object_v2(
482 &self,
483 parent_object_id: ObjectID,
484 name: DynamicFieldName,
485 options: Option<IotaObjectDataOptions>,
486 ) -> RpcResult<IotaObjectResponse> {
487 self.get_dynamic_field_object(parent_object_id, name, options)
488 .await
489 }
490
491 async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
492 let name = name.parse::<Name>().map_err(Error::from)?;
493
494 let record_id = self.iota_names_config.record_field_id(&name);
496
497 let parent_record_id = name
498 .parent()
499 .map(|parent_name| self.iota_names_config.record_field_id(&parent_name));
500
501 let mut requests = vec![self.state.get_object(&record_id)];
503
504 if let Some(ref parent_record_id) = parent_record_id {
507 requests.push(self.state.get_object(parent_record_id));
508 }
509
510 let mut results = futures::future::try_join_all(requests)
516 .await
517 .map_err(Error::from)?;
518
519 let Some(object) = results.remove(0) else {
522 return Ok(None);
523 };
524
525 let name_record = NameRecord::try_from(object).map_err(Error::from)?;
526
527 let current_timestamp_ms = self
528 .get_latest_checkpoint_timestamp_ms()
529 .map_err(Error::from)?;
530
531 if !name_record.is_leaf_record() {
535 return if !name_record.is_node_expired(current_timestamp_ms) {
536 Ok(Some(name_record.into()))
537 } else {
538 Err(Error::from(IotaNamesError::NameExpired).into())
539 };
540 } else {
541 let Some(parent_object) = results.remove(0) else {
546 return Err(Error::from(IotaNamesError::NameExpired).into());
547 };
548
549 let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
550
551 if parent_name_record.is_valid_leaf_parent(&name_record)
555 && !parent_name_record.is_node_expired(current_timestamp_ms)
556 {
557 Ok(Some(name_record.into()))
558 } else {
559 Err(Error::from(IotaNamesError::NameExpired).into())
560 }
561 }
562 }
563
564 #[instrument(skip(self))]
565 async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
566 let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
567
568 let Some(field_reverse_record_object) = self
569 .state
570 .get_object(&reverse_record_id)
571 .await
572 .map_err(Error::from)?
573 else {
574 return Ok(None);
575 };
576
577 let name = field_reverse_record_object
578 .to_rust::<Field<IotaAddress, Name>>()
579 .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
580 .value;
581
582 let name = name.to_string();
583
584 let resolved_record = self.iota_names_lookup(&name).await?;
585
586 if resolved_record.is_none() {
588 return Ok(None);
589 }
590
591 Ok(Some(name))
592 }
593
594 #[instrument(skip(self))]
595 async fn iota_names_find_all_registration_nfts(
596 &self,
597 address: IotaAddress,
598 cursor: Option<ObjectID>,
599 limit: Option<usize>,
600 options: Option<IotaObjectDataOptions>,
601 ) -> RpcResult<ObjectsPage> {
602 let query = IotaObjectResponseQuery {
603 filter: Some(IotaObjectDataFilter::StructType(NameRegistration::type_(
604 self.iota_names_config.package_address.into(),
605 ))),
606 options,
607 };
608
609 let owned_objects = self
610 .get_owned_objects(address, Some(query), cursor, limit)
611 .await?;
612
613 Ok(owned_objects)
614 }
615}
616
617impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
618 fn rpc(self) -> RpcModule<Self> {
619 self.into_rpc()
620 }
621
622 fn rpc_doc_module() -> Module {
623 IndexerApiOpenRpc::module_doc()
624 }
625}