1use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13 IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14 cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17 DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18 IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
19 IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery,
20 IotaTransactionBlockResponseQueryV2, ObjectsPage, Page, TransactionBlocksPage,
21 TransactionFilter,
22};
23use iota_metrics::spawn_monitored_task;
24use iota_names::{
25 IotaNamesNft, NameRegistration, config::IotaNamesConfig, error::IotaNamesError, name::Name,
26 registry::NameRecord,
27};
28use iota_open_rpc::Module;
29use iota_storage::key_value_store::TransactionKeyValueStore;
30use iota_types::{
31 base_types::{IotaAddress, ObjectID},
32 digests::TransactionDigest,
33 dynamic_field::{DynamicFieldName, Field},
34 error::{IotaObjectResponseError, UserInputError},
35 event::EventID,
36};
37use jsonrpsee::{
38 PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
39 core::{RpcResult, SubscriptionResult},
40};
41use move_bytecode_utils::layout::TypeLayoutBuilder;
42use move_core_types::language_storage::TypeTag;
43use serde::Serialize;
44use tokio::sync::{OwnedSemaphorePermit, Semaphore};
45use tracing::{debug, instrument};
46
47use crate::{
48 IotaRpcModule,
49 authority_state::{StateRead, StateReadResult},
50 error::{Error, IotaRpcInputError},
51 logger::FutureWithTracing as _,
52};
53
54async fn pipe_from_stream<T: Serialize>(
55 pending: PendingSubscriptionSink,
56 mut stream: impl Stream<Item = T> + Unpin,
57) -> Result<(), anyhow::Error> {
58 let sink = pending.accept().await?;
59
60 loop {
61 tokio::select! {
62 _ = sink.closed() => break Ok(()),
63 maybe_item = stream.next() => {
64 let Some(item) = maybe_item else {
65 break Ok(());
66 };
67
68 let msg = SubscriptionMessage::from_json(&item)?;
69
70 if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
71 match e {
72 SendTimeoutError::Closed(_) => break Ok(()),
74 SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
78 }
79 }
80 }
81 }
82 }
83}
84
85pub fn spawn_subscription<S, T>(
86 pending: PendingSubscriptionSink,
87 rx: S,
88 permit: Option<OwnedSemaphorePermit>,
89) where
90 S: Stream<Item = T> + Unpin + Send + 'static,
91 T: Serialize + Send,
92{
93 spawn_monitored_task!(async move {
94 let _permit = permit;
95 match pipe_from_stream(pending, rx).await {
96 Ok(_) => {
97 debug!("Subscription completed.");
98 }
99 Err(err) => {
100 debug!("Subscription failed: {err:?}");
101 }
102 }
103 });
104}
105const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
106
107pub struct IndexerApi<R> {
108 state: Arc<dyn StateRead>,
109 read_api: R,
110 transaction_kv_store: Arc<TransactionKeyValueStore>,
111 iota_names_config: IotaNamesConfig,
112 pub metrics: Arc<JsonRpcMetrics>,
113 subscription_semaphore: Arc<Semaphore>,
114}
115
116impl<R: ReadApiServer> IndexerApi<R> {
117 pub fn new(
118 state: Arc<AuthorityState>,
119 read_api: R,
120 transaction_kv_store: Arc<TransactionKeyValueStore>,
121 metrics: Arc<JsonRpcMetrics>,
122 iota_names_config: IotaNamesConfig,
123 max_subscriptions: Option<usize>,
124 ) -> Self {
125 let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
126 Self {
127 state,
128 transaction_kv_store,
129 read_api,
130 metrics,
131 iota_names_config,
132 subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
133 }
134 }
135
136 fn extract_values_from_dynamic_field_name(
137 &self,
138 name: DynamicFieldName,
139 ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
140 let DynamicFieldName {
141 type_: name_type,
142 value,
143 } = name;
144 let epoch_store = self.state.load_epoch_store_one_call_per_task();
145 let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
146 let iota_json_value = IotaJsonValue::new(value)?;
147 let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
148 Ok((name_type, name_bcs_value))
149 }
150
151 fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
152 match self.subscription_semaphore.clone().try_acquire_owned() {
153 Ok(p) => Ok(p),
154 Err(_) => bail!("Resources exhausted"),
155 }
156 }
157
158 async fn get_dynamic_field_object(
159 &self,
160 parent_object_id: ObjectID,
161 name: DynamicFieldName,
162 options: Option<IotaObjectDataOptions>,
163 ) -> RpcResult<IotaObjectResponse> {
164 async move {
165 let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
166
167 let id = self
168 .state
169 .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
170 .map_err(Error::from)?;
171
172 if let Some(id) = id {
173 self.read_api
174 .get_object(id, options)
175 .await
176 .map_err(|e| Error::Internal(anyhow!(e)))
177 } else {
178 Ok(IotaObjectResponse::new_with_error(
179 IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
180 ))
181 }
182 }
183 .trace()
184 .await
185 }
186
187 fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
188 let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
189
190 let checkpoint = self
191 .state
192 .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
193
194 Ok(checkpoint.timestamp_ms)
195 }
196}
197
198#[async_trait]
199impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
200 #[instrument(skip(self))]
201 async fn get_owned_objects(
202 &self,
203 address: IotaAddress,
204 query: Option<IotaObjectResponseQuery>,
205 cursor: Option<ObjectID>,
206 limit: Option<usize>,
207 ) -> RpcResult<ObjectsPage> {
208 async move {
209 let limit =
210 validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
211 self.metrics.get_owned_objects_limit.observe(limit as f64);
212 let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
213 let options = options.unwrap_or_default();
214 let mut objects =
215 self.state
216 .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
217
218 let has_next_page = objects.len() > limit && limit > 0;
221 objects.truncate(limit);
222 let next_cursor = (has_next_page).then_some(
223 objects
224 .last()
225 .map(|obj| obj.object_id)
226 .unwrap_or(ObjectID::ZERO),
227 );
228
229 let data = match options.is_not_in_object_info() {
230 true => {
231 let object_ids = objects.iter().map(|obj| obj.object_id).collect();
232 self.read_api
233 .multi_get_objects(object_ids, Some(options))
234 .await
235 .map_err(|e| Error::Internal(anyhow!(e)))?
236 }
237 false => objects
238 .into_iter()
239 .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
240 .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
241 };
242
243 self.metrics
244 .get_owned_objects_result_size
245 .observe(data.len() as f64);
246 self.metrics
247 .get_owned_objects_result_size_total
248 .inc_by(data.len() as u64);
249 Ok(Page {
250 data,
251 next_cursor,
252 has_next_page,
253 })
254 }
255 .trace()
256 .await
257 }
258
259 #[instrument(skip(self))]
260 async fn query_transaction_blocks(
261 &self,
262 query: IotaTransactionBlockResponseQuery,
263 cursor: Option<TransactionDigest>,
265 limit: Option<usize>,
266 descending_order: Option<bool>,
267 ) -> RpcResult<TransactionBlocksPage> {
268 async move {
269 let limit = cap_page_limit(limit);
270 self.metrics.query_tx_blocks_limit.observe(limit as f64);
271 let descending = descending_order.unwrap_or_default();
272 let opts = query.options.unwrap_or_default();
273
274 let mut digests = self
276 .state
277 .get_transactions(
278 &self.transaction_kv_store,
279 query.filter,
280 cursor,
281 Some(limit + 1),
282 descending,
283 )
284 .await
285 .map_err(Error::from)?;
286 let mut seen = HashSet::new();
289 digests.retain(|digest| seen.insert(*digest));
290
291 let has_next_page = digests.len() > limit;
293 digests.truncate(limit);
294 let next_cursor = digests.last().cloned().map_or(cursor, Some);
295
296 let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
297 digests
298 .into_iter()
299 .map(IotaTransactionBlockResponse::new)
300 .collect()
301 } else {
302 self.read_api
303 .multi_get_transaction_blocks(digests, Some(opts))
304 .await
305 .map_err(|e| Error::Internal(anyhow!(e)))?
306 };
307
308 self.metrics
309 .query_tx_blocks_result_size
310 .observe(data.len() as f64);
311 self.metrics
312 .query_tx_blocks_result_size_total
313 .inc_by(data.len() as u64);
314 Ok(Page {
315 data,
316 next_cursor,
317 has_next_page,
318 })
319 }
320 .trace()
321 .await
322 }
323
324 #[instrument(skip(self))]
325 async fn query_transaction_blocks_v2(
326 &self,
327 query: IotaTransactionBlockResponseQueryV2,
328 cursor: Option<TransactionDigest>,
330 limit: Option<usize>,
331 descending_order: Option<bool>,
332 ) -> RpcResult<TransactionBlocksPage> {
333 let v1_filter = query
334 .filter
335 .map(|f| {
336 f.as_v1().ok_or_else(|| {
337 Error::UserInput(UserInputError::Unsupported(
338 "transaction filter is not supported".to_string(),
339 ))
340 })
341 })
342 .transpose()?;
343
344 let v1_query = IotaTransactionBlockResponseQuery {
345 filter: v1_filter,
346 options: query.options,
347 };
348 self.query_transaction_blocks(v1_query, cursor, limit, descending_order)
349 .await
350 }
351
352 #[instrument(skip(self))]
353 async fn query_events(
354 &self,
355 query: EventFilter,
356 cursor: Option<EventID>,
358 limit: Option<usize>,
359 descending_order: Option<bool>,
360 ) -> RpcResult<EventPage> {
361 async move {
362 let descending = descending_order.unwrap_or_default();
363 let limit = cap_page_limit(limit);
364 self.metrics.query_events_limit.observe(limit as f64);
365 let mut data = self
367 .state
368 .query_events(
369 &self.transaction_kv_store,
370 query,
371 cursor,
372 limit + 1,
373 descending,
374 )
375 .await
376 .map_err(Error::from)?;
377 let has_next_page = data.len() > limit;
378 data.truncate(limit);
379 let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
380 self.metrics
381 .query_events_result_size
382 .observe(data.len() as f64);
383 self.metrics
384 .query_events_result_size_total
385 .inc_by(data.len() as u64);
386 Ok(EventPage {
387 data,
388 next_cursor,
389 has_next_page,
390 })
391 }
392 .trace()
393 .await
394 }
395
396 #[instrument(skip(self))]
397 fn subscribe_event(
398 &self,
399 sink: PendingSubscriptionSink,
400 filter: EventFilter,
401 ) -> SubscriptionResult {
402 let permit = self.acquire_subscribe_permit()?;
403 spawn_subscription(
404 sink,
405 self.state
406 .get_subscription_handler()
407 .subscribe_events(filter),
408 Some(permit),
409 );
410 Ok(())
411 }
412
413 fn subscribe_transaction(
414 &self,
415 sink: PendingSubscriptionSink,
416 filter: TransactionFilter,
417 ) -> SubscriptionResult {
418 if matches!(filter, TransactionFilter::Checkpoint(_)) {
420 return Err("checkpoint filter is not supported".into());
421 }
422
423 let permit = self.acquire_subscribe_permit()?;
424 spawn_subscription(
425 sink,
426 self.state
427 .get_subscription_handler()
428 .subscribe_transactions(filter),
429 Some(permit),
430 );
431 Ok(())
432 }
433
434 #[instrument(skip(self))]
435 async fn get_dynamic_fields(
436 &self,
437 parent_object_id: ObjectID,
438 cursor: Option<ObjectID>,
440 limit: Option<usize>,
441 ) -> RpcResult<DynamicFieldPage> {
442 async move {
443 let limit = cap_page_limit(limit);
444 self.metrics.get_dynamic_fields_limit.observe(limit as f64);
445 let mut data = self
446 .state
447 .get_dynamic_fields(parent_object_id, cursor, limit + 1)
448 .map_err(Error::from)?;
449 let has_next_page = data.len() > limit;
450 data.truncate(limit);
451 let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
452 self.metrics
453 .get_dynamic_fields_result_size
454 .observe(data.len() as f64);
455 self.metrics
456 .get_dynamic_fields_result_size_total
457 .inc_by(data.len() as u64);
458 Ok(DynamicFieldPage {
459 data: data.into_iter().map(|(_, w)| w.into()).collect(),
460 next_cursor,
461 has_next_page,
462 })
463 }
464 .trace()
465 .await
466 }
467
468 #[instrument(skip(self))]
469 async fn get_dynamic_field_object(
470 &self,
471 parent_object_id: ObjectID,
472 name: DynamicFieldName,
473 ) -> RpcResult<IotaObjectResponse> {
474 self.get_dynamic_field_object(
475 parent_object_id,
476 name,
477 Some(IotaObjectDataOptions::full_content()),
478 )
479 .await
480 }
481
482 #[instrument(skip(self))]
483 async fn get_dynamic_field_object_v2(
484 &self,
485 parent_object_id: ObjectID,
486 name: DynamicFieldName,
487 options: Option<IotaObjectDataOptions>,
488 ) -> RpcResult<IotaObjectResponse> {
489 self.get_dynamic_field_object(parent_object_id, name, options)
490 .await
491 }
492
493 async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
494 let name = name.parse::<Name>().map_err(Error::from)?;
495
496 let record_id = self.iota_names_config.record_field_id(&name);
498
499 let parent_record_id = name
500 .parent()
501 .map(|parent_name| self.iota_names_config.record_field_id(&parent_name));
502
503 let mut requests = vec![self.state.get_object(&record_id)];
505
506 if let Some(ref parent_record_id) = parent_record_id {
509 requests.push(self.state.get_object(parent_record_id));
510 }
511
512 let mut results = futures::future::try_join_all(requests)
518 .await
519 .map_err(Error::from)?;
520
521 let Some(object) = results.remove(0) else {
524 return Ok(None);
525 };
526
527 let name_record = NameRecord::try_from(object).map_err(Error::from)?;
528
529 let current_timestamp_ms = self
530 .get_latest_checkpoint_timestamp_ms()
531 .map_err(Error::from)?;
532
533 if !name_record.is_leaf_record() {
537 return if !name_record.is_node_expired(current_timestamp_ms) {
538 Ok(Some(name_record.into()))
539 } else {
540 Err(Error::from(IotaNamesError::NameExpired).into())
541 };
542 } else {
543 let Some(parent_object) = results.remove(0) else {
548 return Err(Error::from(IotaNamesError::NameExpired).into());
549 };
550
551 let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
552
553 if parent_name_record.is_valid_leaf_parent(&name_record)
557 && !parent_name_record.is_node_expired(current_timestamp_ms)
558 {
559 Ok(Some(name_record.into()))
560 } else {
561 Err(Error::from(IotaNamesError::NameExpired).into())
562 }
563 }
564 }
565
566 #[instrument(skip(self))]
567 async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
568 let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
569
570 let Some(field_reverse_record_object) = self
571 .state
572 .get_object(&reverse_record_id)
573 .await
574 .map_err(Error::from)?
575 else {
576 return Ok(None);
577 };
578
579 let name = field_reverse_record_object
580 .to_rust::<Field<IotaAddress, Name>>()
581 .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
582 .value;
583
584 let name = name.to_string();
585
586 let resolved_record = self.iota_names_lookup(&name).await?;
587
588 if resolved_record.is_none() {
590 return Ok(None);
591 }
592
593 Ok(Some(name))
594 }
595
596 #[instrument(skip(self))]
597 async fn iota_names_find_all_registration_nfts(
598 &self,
599 address: IotaAddress,
600 cursor: Option<ObjectID>,
601 limit: Option<usize>,
602 options: Option<IotaObjectDataOptions>,
603 ) -> RpcResult<ObjectsPage> {
604 let query = IotaObjectResponseQuery {
605 filter: Some(IotaObjectDataFilter::StructType(NameRegistration::type_(
606 self.iota_names_config.package_address.into(),
607 ))),
608 options,
609 };
610
611 let owned_objects = self
612 .get_owned_objects(address, Some(query), cursor, limit)
613 .await?;
614
615 Ok(owned_objects)
616 }
617}
618
619impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
620 fn rpc(self) -> RpcModule<Self> {
621 self.into_rpc()
622 }
623
624 fn rpc_doc_module() -> Module {
625 IndexerApiOpenRpc::module_doc()
626 }
627}