1use std::{collections::HashSet, sync::Arc, time::Duration};
6
7use anyhow::{anyhow, bail};
8use async_trait::async_trait;
9use futures::{Stream, StreamExt};
10use iota_core::authority::AuthorityState;
11use iota_json::IotaJsonValue;
12use iota_json_rpc_api::{
13 IndexerApiOpenRpc, IndexerApiServer, JsonRpcMetrics, QUERY_MAX_RESULT_LIMIT, ReadApiServer,
14 cap_page_limit, validate_limit,
15};
16use iota_json_rpc_types::{
17 DynamicFieldPage, EventFilter, EventPage, IotaNameRecord, IotaObjectDataFilter,
18 IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery,
19 IotaTransactionBlockResponse, IotaTransactionBlockResponseQuery, ObjectsPage, Page,
20 TransactionBlocksPage, TransactionFilter,
21};
22use iota_metrics::spawn_monitored_task;
23use iota_names::{
24 IotaNamesNft, IotaNamesRegistration, config::IotaNamesConfig, domain::Domain,
25 error::IotaNamesError, registry::NameRecord,
26};
27use iota_open_rpc::Module;
28use iota_storage::key_value_store::TransactionKeyValueStore;
29use iota_types::{
30 base_types::{IotaAddress, ObjectID},
31 digests::TransactionDigest,
32 dynamic_field::{DynamicFieldName, Field},
33 error::IotaObjectResponseError,
34 event::EventID,
35};
36use jsonrpsee::{
37 PendingSubscriptionSink, RpcModule, SendTimeoutError, SubscriptionMessage,
38 core::{RpcResult, SubscriptionResult},
39};
40use move_bytecode_utils::layout::TypeLayoutBuilder;
41use move_core_types::language_storage::TypeTag;
42use serde::Serialize;
43use tokio::sync::{OwnedSemaphorePermit, Semaphore};
44use tracing::{debug, instrument};
45
46use crate::{
47 IotaRpcModule,
48 authority_state::{StateRead, StateReadResult},
49 error::{Error, IotaRpcInputError},
50 logger::FutureWithTracing as _,
51};
52
53async fn pipe_from_stream<T: Serialize>(
54 pending: PendingSubscriptionSink,
55 mut stream: impl Stream<Item = T> + Unpin,
56) -> Result<(), anyhow::Error> {
57 let sink = pending.accept().await?;
58
59 loop {
60 tokio::select! {
61 _ = sink.closed() => break Ok(()),
62 maybe_item = stream.next() => {
63 let Some(item) = maybe_item else {
64 break Ok(());
65 };
66
67 let msg = SubscriptionMessage::from_json(&item)?;
68
69 if let Err(e) = sink.send_timeout(msg, Duration::from_secs(60)).await {
70 match e {
71 SendTimeoutError::Closed(_) => break Ok(()),
73 SendTimeoutError::Timeout(_) => break Err(anyhow::anyhow!("Subscription timeout expired")),
77 }
78 }
79 }
80 }
81 }
82}
83
84pub fn spawn_subscription<S, T>(
85 pending: PendingSubscriptionSink,
86 rx: S,
87 permit: Option<OwnedSemaphorePermit>,
88) where
89 S: Stream<Item = T> + Unpin + Send + 'static,
90 T: Serialize + Send,
91{
92 spawn_monitored_task!(async move {
93 let _permit = permit;
94 match pipe_from_stream(pending, rx).await {
95 Ok(_) => {
96 debug!("Subscription completed.");
97 }
98 Err(err) => {
99 debug!("Subscription failed: {err:?}");
100 }
101 }
102 });
103}
104const DEFAULT_MAX_SUBSCRIPTIONS: usize = 100;
105
106pub struct IndexerApi<R> {
107 state: Arc<dyn StateRead>,
108 read_api: R,
109 transaction_kv_store: Arc<TransactionKeyValueStore>,
110 iota_names_config: IotaNamesConfig,
111 pub metrics: Arc<JsonRpcMetrics>,
112 subscription_semaphore: Arc<Semaphore>,
113}
114
115impl<R: ReadApiServer> IndexerApi<R> {
116 pub fn new(
117 state: Arc<AuthorityState>,
118 read_api: R,
119 transaction_kv_store: Arc<TransactionKeyValueStore>,
120 metrics: Arc<JsonRpcMetrics>,
121 iota_names_config: IotaNamesConfig,
122 max_subscriptions: Option<usize>,
123 ) -> Self {
124 let max_subscriptions = max_subscriptions.unwrap_or(DEFAULT_MAX_SUBSCRIPTIONS);
125 Self {
126 state,
127 transaction_kv_store,
128 read_api,
129 metrics,
130 iota_names_config,
131 subscription_semaphore: Arc::new(Semaphore::new(max_subscriptions)),
132 }
133 }
134
135 fn extract_values_from_dynamic_field_name(
136 &self,
137 name: DynamicFieldName,
138 ) -> Result<(TypeTag, Vec<u8>), IotaRpcInputError> {
139 let DynamicFieldName {
140 type_: name_type,
141 value,
142 } = name;
143 let epoch_store = self.state.load_epoch_store_one_call_per_task();
144 let layout = TypeLayoutBuilder::build_with_types(&name_type, epoch_store.module_cache())?;
145 let iota_json_value = IotaJsonValue::new(value)?;
146 let name_bcs_value = iota_json_value.to_bcs_bytes(&layout)?;
147 Ok((name_type, name_bcs_value))
148 }
149
150 fn acquire_subscribe_permit(&self) -> anyhow::Result<OwnedSemaphorePermit> {
151 match self.subscription_semaphore.clone().try_acquire_owned() {
152 Ok(p) => Ok(p),
153 Err(_) => bail!("Resources exhausted"),
154 }
155 }
156
157 async fn get_dynamic_field_object(
158 &self,
159 parent_object_id: ObjectID,
160 name: DynamicFieldName,
161 options: Option<IotaObjectDataOptions>,
162 ) -> RpcResult<IotaObjectResponse> {
163 async move {
164 let (name_type, name_bcs_value) = self.extract_values_from_dynamic_field_name(name)?;
165
166 let id = self
167 .state
168 .get_dynamic_field_object_id(parent_object_id, name_type, &name_bcs_value)
169 .map_err(Error::from)?;
170
171 if let Some(id) = id {
172 self.read_api
173 .get_object(id, options)
174 .await
175 .map_err(|e| Error::Internal(anyhow!(e)))
176 } else {
177 Ok(IotaObjectResponse::new_with_error(
178 IotaObjectResponseError::DynamicFieldNotFound { parent_object_id },
179 ))
180 }
181 }
182 .trace()
183 .await
184 }
185
186 fn get_latest_checkpoint_timestamp_ms(&self) -> StateReadResult<u64> {
187 let latest_checkpoint = self.state.get_latest_checkpoint_sequence_number()?;
188
189 let checkpoint = self
190 .state
191 .get_verified_checkpoint_by_sequence_number(latest_checkpoint)?;
192
193 Ok(checkpoint.timestamp_ms)
194 }
195}
196
197#[async_trait]
198impl<R: ReadApiServer> IndexerApiServer for IndexerApi<R> {
199 #[instrument(skip(self))]
200 async fn get_owned_objects(
201 &self,
202 address: IotaAddress,
203 query: Option<IotaObjectResponseQuery>,
204 cursor: Option<ObjectID>,
205 limit: Option<usize>,
206 ) -> RpcResult<ObjectsPage> {
207 async move {
208 let limit =
209 validate_limit(limit, *QUERY_MAX_RESULT_LIMIT).map_err(IotaRpcInputError::from)?;
210 self.metrics.get_owned_objects_limit.report(limit as u64);
211 let IotaObjectResponseQuery { filter, options } = query.unwrap_or_default();
212 let options = options.unwrap_or_default();
213 let mut objects =
214 self.state
215 .get_owner_objects_with_limit(address, cursor, limit + 1, filter)?;
216
217 let has_next_page = objects.len() > limit;
220 objects.truncate(limit);
221 let next_cursor = objects
222 .last()
223 .cloned()
224 .map_or(cursor, |o_info| Some(o_info.object_id));
225
226 let data = match options.is_not_in_object_info() {
227 true => {
228 let object_ids = objects.iter().map(|obj| obj.object_id).collect();
229 self.read_api
230 .multi_get_objects(object_ids, Some(options))
231 .await
232 .map_err(|e| Error::Internal(anyhow!(e)))?
233 }
234 false => objects
235 .into_iter()
236 .map(|o_info| IotaObjectResponse::try_from((o_info, options.clone())))
237 .collect::<Result<Vec<IotaObjectResponse>, _>>()?,
238 };
239
240 self.metrics
241 .get_owned_objects_result_size
242 .report(data.len() as u64);
243 self.metrics
244 .get_owned_objects_result_size_total
245 .inc_by(data.len() as u64);
246 Ok(Page {
247 data,
248 next_cursor,
249 has_next_page,
250 })
251 }
252 .trace()
253 .await
254 }
255
256 #[instrument(skip(self))]
257 async fn query_transaction_blocks(
258 &self,
259 query: IotaTransactionBlockResponseQuery,
260 cursor: Option<TransactionDigest>,
262 limit: Option<usize>,
263 descending_order: Option<bool>,
264 ) -> RpcResult<TransactionBlocksPage> {
265 async move {
266 let limit = cap_page_limit(limit);
267 self.metrics.query_tx_blocks_limit.report(limit as u64);
268 let descending = descending_order.unwrap_or_default();
269 let opts = query.options.unwrap_or_default();
270
271 let mut digests = self
273 .state
274 .get_transactions(
275 &self.transaction_kv_store,
276 query.filter,
277 cursor,
278 Some(limit + 1),
279 descending,
280 )
281 .await
282 .map_err(Error::from)?;
283 let mut seen = HashSet::new();
286 digests.retain(|digest| seen.insert(*digest));
287
288 let has_next_page = digests.len() > limit;
290 digests.truncate(limit);
291 let next_cursor = digests.last().cloned().map_or(cursor, Some);
292
293 let data: Vec<IotaTransactionBlockResponse> = if opts.only_digest() {
294 digests
295 .into_iter()
296 .map(IotaTransactionBlockResponse::new)
297 .collect()
298 } else {
299 self.read_api
300 .multi_get_transaction_blocks(digests, Some(opts))
301 .await
302 .map_err(|e| Error::Internal(anyhow!(e)))?
303 };
304
305 self.metrics
306 .query_tx_blocks_result_size
307 .report(data.len() as u64);
308 self.metrics
309 .query_tx_blocks_result_size_total
310 .inc_by(data.len() as u64);
311 Ok(Page {
312 data,
313 next_cursor,
314 has_next_page,
315 })
316 }
317 .trace()
318 .await
319 }
320 #[instrument(skip(self))]
321 async fn query_events(
322 &self,
323 query: EventFilter,
324 cursor: Option<EventID>,
326 limit: Option<usize>,
327 descending_order: Option<bool>,
328 ) -> RpcResult<EventPage> {
329 async move {
330 let descending = descending_order.unwrap_or_default();
331 let limit = cap_page_limit(limit);
332 self.metrics.query_events_limit.report(limit as u64);
333 let mut data = self
335 .state
336 .query_events(
337 &self.transaction_kv_store,
338 query,
339 cursor,
340 limit + 1,
341 descending,
342 )
343 .await
344 .map_err(Error::from)?;
345 let has_next_page = data.len() > limit;
346 data.truncate(limit);
347 let next_cursor = data.last().map_or(cursor, |e| Some(e.id));
348 self.metrics
349 .query_events_result_size
350 .report(data.len() as u64);
351 self.metrics
352 .query_events_result_size_total
353 .inc_by(data.len() as u64);
354 Ok(EventPage {
355 data,
356 next_cursor,
357 has_next_page,
358 })
359 }
360 .trace()
361 .await
362 }
363
364 #[instrument(skip(self))]
365 fn subscribe_event(
366 &self,
367 sink: PendingSubscriptionSink,
368 filter: EventFilter,
369 ) -> SubscriptionResult {
370 let permit = self.acquire_subscribe_permit()?;
371 spawn_subscription(
372 sink,
373 self.state
374 .get_subscription_handler()
375 .subscribe_events(filter),
376 Some(permit),
377 );
378 Ok(())
379 }
380
381 fn subscribe_transaction(
382 &self,
383 sink: PendingSubscriptionSink,
384 filter: TransactionFilter,
385 ) -> SubscriptionResult {
386 if matches!(filter, TransactionFilter::Checkpoint(_)) {
388 return Err("checkpoint filter is not supported".into());
389 }
390
391 let permit = self.acquire_subscribe_permit()?;
392 spawn_subscription(
393 sink,
394 self.state
395 .get_subscription_handler()
396 .subscribe_transactions(filter),
397 Some(permit),
398 );
399 Ok(())
400 }
401
402 #[instrument(skip(self))]
403 async fn get_dynamic_fields(
404 &self,
405 parent_object_id: ObjectID,
406 cursor: Option<ObjectID>,
408 limit: Option<usize>,
409 ) -> RpcResult<DynamicFieldPage> {
410 async move {
411 let limit = cap_page_limit(limit);
412 self.metrics.get_dynamic_fields_limit.report(limit as u64);
413 let mut data = self
414 .state
415 .get_dynamic_fields(parent_object_id, cursor, limit + 1)
416 .map_err(Error::from)?;
417 let has_next_page = data.len() > limit;
418 data.truncate(limit);
419 let next_cursor = data.last().cloned().map_or(cursor, |c| Some(c.0));
420 self.metrics
421 .get_dynamic_fields_result_size
422 .report(data.len() as u64);
423 self.metrics
424 .get_dynamic_fields_result_size_total
425 .inc_by(data.len() as u64);
426 Ok(DynamicFieldPage {
427 data: data.into_iter().map(|(_, w)| w.into()).collect(),
428 next_cursor,
429 has_next_page,
430 })
431 }
432 .trace()
433 .await
434 }
435
436 #[instrument(skip(self))]
437 async fn get_dynamic_field_object(
438 &self,
439 parent_object_id: ObjectID,
440 name: DynamicFieldName,
441 ) -> RpcResult<IotaObjectResponse> {
442 self.get_dynamic_field_object(
443 parent_object_id,
444 name,
445 Some(IotaObjectDataOptions::full_content()),
446 )
447 .await
448 }
449
450 #[instrument(skip(self))]
451 async fn get_dynamic_field_object_v2(
452 &self,
453 parent_object_id: ObjectID,
454 name: DynamicFieldName,
455 options: Option<IotaObjectDataOptions>,
456 ) -> RpcResult<IotaObjectResponse> {
457 self.get_dynamic_field_object(parent_object_id, name, options)
458 .await
459 }
460
461 async fn iota_names_lookup(&self, name: &str) -> RpcResult<Option<IotaNameRecord>> {
462 let domain = name.parse::<Domain>().map_err(Error::from)?;
463
464 let record_id = self.iota_names_config.record_field_id(&domain);
466
467 let parent_record_id = domain
468 .parent()
469 .map(|parent_domain| self.iota_names_config.record_field_id(&parent_domain));
470
471 let mut requests = vec![self.state.get_object(&record_id)];
473
474 if let Some(ref parent_record_id) = parent_record_id {
477 requests.push(self.state.get_object(parent_record_id));
478 }
479
480 let mut results = futures::future::try_join_all(requests)
486 .await
487 .map_err(Error::from)?;
488
489 let Some(object) = results.remove(0) else {
492 return Ok(None);
493 };
494
495 let name_record = NameRecord::try_from(object).map_err(Error::from)?;
496
497 let current_timestamp_ms = self
498 .get_latest_checkpoint_timestamp_ms()
499 .map_err(Error::from)?;
500
501 if !name_record.is_leaf_record() {
505 return if !name_record.is_node_expired(current_timestamp_ms) {
506 Ok(Some(name_record.into()))
507 } else {
508 Err(Error::from(IotaNamesError::NameExpired).into())
509 };
510 } else {
511 let Some(parent_object) = results.remove(0) else {
516 return Err(Error::from(IotaNamesError::NameExpired).into());
517 };
518
519 let parent_name_record = NameRecord::try_from(parent_object).map_err(Error::from)?;
520
521 if parent_name_record.is_valid_leaf_parent(&name_record)
525 && !parent_name_record.is_node_expired(current_timestamp_ms)
526 {
527 Ok(Some(name_record.into()))
528 } else {
529 Err(Error::from(IotaNamesError::NameExpired).into())
530 }
531 }
532 }
533
534 #[instrument(skip(self))]
535 async fn iota_names_reverse_lookup(&self, address: IotaAddress) -> RpcResult<Option<String>> {
536 let reverse_record_id = self.iota_names_config.reverse_record_field_id(&address);
537
538 let Some(field_reverse_record_object) = self
539 .state
540 .get_object(&reverse_record_id)
541 .await
542 .map_err(Error::from)?
543 else {
544 return Ok(None);
545 };
546
547 let domain = field_reverse_record_object
548 .to_rust::<Field<IotaAddress, Domain>>()
549 .ok_or_else(|| Error::Unexpected(format!("malformed Object {reverse_record_id}")))?
550 .value;
551
552 let domain_name = domain.to_string();
553
554 let resolved_record = self.iota_names_lookup(&domain_name).await?;
555
556 if resolved_record.is_none() {
558 return Ok(None);
559 }
560
561 Ok(Some(domain_name))
562 }
563
564 #[instrument(skip(self))]
565 async fn iota_names_find_all_registration_nfts(
566 &self,
567 address: IotaAddress,
568 cursor: Option<ObjectID>,
569 limit: Option<usize>,
570 options: Option<IotaObjectDataOptions>,
571 ) -> RpcResult<ObjectsPage> {
572 let query = IotaObjectResponseQuery {
573 filter: Some(IotaObjectDataFilter::StructType(
574 IotaNamesRegistration::type_(self.iota_names_config.package_address.into()),
575 )),
576 options,
577 };
578
579 let owned_objects = self
580 .get_owned_objects(address, Some(query), cursor, limit)
581 .await?;
582
583 Ok(owned_objects)
584 }
585}
586
587impl<R: ReadApiServer> IotaRpcModule for IndexerApi<R> {
588 fn rpc(self) -> RpcModule<Self> {
589 self.into_rpc()
590 }
591
592 fn rpc_doc_module() -> Module {
593 IndexerApiOpenRpc::module_doc()
594 }
595}