1pub mod apis;
82pub mod error;
83pub mod iota_client_config;
84pub mod json_rpc_error;
85pub mod wallet_context;
86
87use std::{
88 collections::VecDeque,
89 fmt::{Debug, Formatter},
90 marker::PhantomData,
91 pin::Pin,
92 sync::Arc,
93 task::Poll,
94 time::Duration,
95};
96
97use async_trait::async_trait;
98use base64::Engine;
99use futures::{StreamExt, TryStreamExt};
100pub use iota_json as json;
101use iota_json_rpc_api::{
102 CLIENT_SDK_TYPE_HEADER, CLIENT_SDK_VERSION_HEADER, CLIENT_TARGET_API_VERSION_HEADER,
103};
104pub use iota_json_rpc_types as rpc_types;
105use iota_json_rpc_types::{
106 IotaObjectDataFilter, IotaObjectDataOptions, IotaObjectResponse, IotaObjectResponseQuery, Page,
107};
108use iota_transaction_builder::{DataReader, TransactionBuilder};
109pub use iota_types as types;
110use iota_types::base_types::{IotaAddress, ObjectID, ObjectInfo};
111use jsonrpsee::{
112 core::client::ClientT,
113 http_client::{HeaderMap, HeaderValue, HttpClient, HttpClientBuilder},
114 rpc_params,
115 ws_client::{PingConfig, WsClient, WsClientBuilder},
116};
117use move_core_types::language_storage::StructTag;
118use rustls::crypto::{CryptoProvider, ring};
119use serde_json::Value;
120
121use crate::{
122 apis::{CoinReadApi, EventApi, GovernanceApi, QuorumDriverApi, ReadApi},
123 error::{Error, IotaRpcResult},
124};
125
126pub const IOTA_COIN_TYPE: &str = "0x2::iota::IOTA";
127pub const IOTA_LOCAL_NETWORK_URL: &str = "http://127.0.0.1:9000";
128pub const IOTA_LOCAL_NETWORK_URL_0: &str = "http://0.0.0.0:9000";
129pub const IOTA_LOCAL_NETWORK_GRAPHQL_URL: &str = "http://127.0.0.1:8000";
130pub const IOTA_LOCAL_NETWORK_GAS_URL: &str = "http://127.0.0.1:9123/v1/gas";
131pub const IOTA_DEVNET_URL: &str = "https://api.devnet.iota.cafe";
132pub const IOTA_DEVNET_GRAPHQL_URL: &str = "https://graphql.devnet.iota.cafe";
133pub const IOTA_DEVNET_GAS_URL: &str = "https://faucet.devnet.iota.cafe/v1/gas";
134pub const IOTA_TESTNET_URL: &str = "https://api.testnet.iota.cafe";
135pub const IOTA_TESTNET_GRAPHQL_URL: &str = "https://graphql.testnet.iota.cafe";
136pub const IOTA_TESTNET_GAS_URL: &str = "https://faucet.testnet.iota.cafe/v1/gas";
137pub const IOTA_MAINNET_URL: &str = "https://api.mainnet.iota.cafe";
138
139pub struct IotaClientBuilder {
163 request_timeout: Duration,
164 max_concurrent_requests: Option<usize>,
165 ws_url: Option<String>,
166 ws_ping_interval: Option<Duration>,
167 basic_auth: Option<(String, String)>,
168}
169
170impl Default for IotaClientBuilder {
171 fn default() -> Self {
172 Self {
173 request_timeout: Duration::from_secs(60),
174 max_concurrent_requests: None,
175 ws_url: None,
176 ws_ping_interval: None,
177 basic_auth: None,
178 }
179 }
180}
181
182impl IotaClientBuilder {
183 pub fn request_timeout(mut self, request_timeout: Duration) -> Self {
185 self.request_timeout = request_timeout;
186 self
187 }
188
189 pub fn max_concurrent_requests(mut self, max_concurrent_requests: usize) -> Self {
191 self.max_concurrent_requests = Some(max_concurrent_requests);
192 self
193 }
194
195 pub fn ws_url(mut self, url: impl AsRef<str>) -> Self {
197 self.ws_url = Some(url.as_ref().to_string());
198 self
199 }
200
201 pub fn ws_ping_interval(mut self, duration: Duration) -> Self {
203 self.ws_ping_interval = Some(duration);
204 self
205 }
206
207 pub fn basic_auth(mut self, username: impl AsRef<str>, password: impl AsRef<str>) -> Self {
209 self.basic_auth = Some((username.as_ref().to_string(), password.as_ref().to_string()));
210 self
211 }
212
213 pub async fn build(self, http: impl AsRef<str>) -> IotaRpcResult<IotaClient> {
232 if CryptoProvider::get_default().is_none() {
233 ring::default_provider().install_default().ok();
234 }
235
236 let client_version = env!("CARGO_PKG_VERSION");
237 let mut headers = HeaderMap::new();
238 headers.insert(
239 CLIENT_TARGET_API_VERSION_HEADER,
240 HeaderValue::from_static(client_version),
242 );
243 headers.insert(
244 CLIENT_SDK_VERSION_HEADER,
245 HeaderValue::from_static(client_version),
246 );
247 headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("rust"));
248
249 if let Some((username, password)) = self.basic_auth {
250 let auth = base64::engine::general_purpose::STANDARD
251 .encode(format!("{}:{}", username, password));
252 headers.insert(
253 "authorization",
254 HeaderValue::from_str(&format!("Basic {}", auth)).unwrap(),
256 );
257 }
258
259 let ws = if let Some(url) = self.ws_url {
260 let mut builder = WsClientBuilder::default()
261 .max_request_size(2 << 30)
262 .set_headers(headers.clone())
263 .request_timeout(self.request_timeout);
264
265 if let Some(duration) = self.ws_ping_interval {
266 builder = builder.enable_ws_ping(PingConfig::new().ping_interval(duration))
267 }
268
269 if let Some(max_concurrent_requests) = self.max_concurrent_requests {
270 builder = builder.max_concurrent_requests(max_concurrent_requests);
271 }
272
273 builder.build(url).await.ok()
274 } else {
275 None
276 };
277
278 let mut http_builder = HttpClientBuilder::default()
279 .max_request_size(2 << 30)
280 .set_headers(headers)
281 .request_timeout(self.request_timeout);
282
283 if let Some(max_concurrent_requests) = self.max_concurrent_requests {
284 http_builder = http_builder.max_concurrent_requests(max_concurrent_requests);
285 }
286
287 let http = http_builder.build(http)?;
288
289 let info = Self::get_server_info(&http, &ws).await?;
290
291 let rpc = RpcClient { http, ws, info };
292 let api = Arc::new(rpc);
293 let read_api = Arc::new(ReadApi::new(api.clone()));
294 let quorum_driver_api = QuorumDriverApi::new(api.clone());
295 let event_api = EventApi::new(api.clone());
296 let transaction_builder = TransactionBuilder::new(read_api.clone());
297 let coin_read_api = CoinReadApi::new(api.clone());
298 let governance_api = GovernanceApi::new(api.clone());
299
300 Ok(IotaClient {
301 api,
302 transaction_builder,
303 read_api,
304 coin_read_api,
305 event_api,
306 quorum_driver_api,
307 governance_api,
308 })
309 }
310
311 pub async fn build_localnet(self) -> IotaRpcResult<IotaClient> {
331 self.build(IOTA_LOCAL_NETWORK_URL).await
332 }
333
334 pub async fn build_devnet(self) -> IotaRpcResult<IotaClient> {
353 self.build(IOTA_DEVNET_URL).await
354 }
355
356 pub async fn build_testnet(self) -> IotaRpcResult<IotaClient> {
375 self.build(IOTA_TESTNET_URL).await
376 }
377
378 pub async fn build_mainnet(self) -> IotaRpcResult<IotaClient> {
397 self.build(IOTA_MAINNET_URL).await
398 }
399
400 async fn get_server_info(
404 http: &HttpClient,
405 ws: &Option<WsClient>,
406 ) -> Result<ServerInfo, Error> {
407 let rpc_spec: Value = http.request("rpc.discover", rpc_params![]).await?;
408 let version = rpc_spec
409 .pointer("/info/version")
410 .and_then(|v| v.as_str())
411 .ok_or_else(|| {
412 Error::Data("Fail parsing server version from rpc.discover endpoint.".into())
413 })?;
414 let rpc_methods = Self::parse_methods(&rpc_spec)?;
415
416 let subscriptions = if let Some(ws) = ws {
417 match ws.request("rpc.discover", rpc_params![]).await {
418 Ok(rpc_spec) => Self::parse_methods(&rpc_spec)?,
419 Err(_) => Vec::new(),
420 }
421 } else {
422 Vec::new()
423 };
424 let iota_system_state_v2_support =
425 rpc_methods.contains(&"iotax_getLatestIotaSystemStateV2".to_string());
426 Ok(ServerInfo {
427 rpc_methods,
428 subscriptions,
429 version: version.to_string(),
430 iota_system_state_v2_support,
431 })
432 }
433
434 fn parse_methods(server_spec: &Value) -> Result<Vec<String>, Error> {
435 let methods = server_spec
436 .pointer("/methods")
437 .and_then(|methods| methods.as_array())
438 .ok_or_else(|| {
439 Error::Data("Fail parsing server information from rpc.discover endpoint.".into())
440 })?;
441
442 Ok(methods
443 .iter()
444 .flat_map(|method| method["name"].as_str())
445 .map(|s| s.into())
446 .collect())
447 }
448}
449
450#[derive(Clone)]
486pub struct IotaClient {
487 api: Arc<RpcClient>,
488 transaction_builder: TransactionBuilder,
489 read_api: Arc<ReadApi>,
490 coin_read_api: CoinReadApi,
491 event_api: EventApi,
492 quorum_driver_api: QuorumDriverApi,
493 governance_api: GovernanceApi,
494}
495
496pub(crate) struct RpcClient {
497 http: HttpClient,
498 ws: Option<WsClient>,
499 info: ServerInfo,
500}
501
502impl Debug for RpcClient {
503 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
504 write!(
505 f,
506 "RPC client. Http: {:?}, Websocket: {:?}",
507 self.http, self.ws
508 )
509 }
510}
511
512struct ServerInfo {
515 rpc_methods: Vec<String>,
516 subscriptions: Vec<String>,
517 version: String,
518 iota_system_state_v2_support: bool,
519}
520
521impl IotaClient {
522 pub fn available_rpc_methods(&self) -> &Vec<String> {
525 &self.api.info.rpc_methods
526 }
527
528 pub fn available_subscriptions(&self) -> &Vec<String> {
531 &self.api.info.subscriptions
532 }
533
534 pub fn api_version(&self) -> &str {
540 &self.api.info.version
541 }
542
543 pub fn check_api_version(&self) -> IotaRpcResult<()> {
546 let server_version = self.api_version();
547 let client_version = env!("CARGO_PKG_VERSION");
548 if server_version != client_version {
549 return Err(Error::ServerVersionMismatch {
550 client_version: client_version.to_string(),
551 server_version: server_version.to_string(),
552 });
553 };
554 Ok(())
555 }
556
557 pub fn coin_read_api(&self) -> &CoinReadApi {
559 &self.coin_read_api
560 }
561
562 pub fn event_api(&self) -> &EventApi {
564 &self.event_api
565 }
566
567 pub fn governance_api(&self) -> &GovernanceApi {
569 &self.governance_api
570 }
571
572 pub fn quorum_driver_api(&self) -> &QuorumDriverApi {
574 &self.quorum_driver_api
575 }
576
577 pub fn read_api(&self) -> &ReadApi {
579 &self.read_api
580 }
581
582 pub fn transaction_builder(&self) -> &TransactionBuilder {
584 &self.transaction_builder
585 }
586
587 pub fn http(&self) -> &HttpClient {
589 &self.api.http
590 }
591
592 pub fn ws(&self) -> Option<&WsClient> {
594 self.api.ws.as_ref()
595 }
596}
597
598#[async_trait]
599impl DataReader for ReadApi {
600 async fn get_owned_objects(
601 &self,
602 address: IotaAddress,
603 object_type: StructTag,
604 ) -> Result<Vec<ObjectInfo>, anyhow::Error> {
605 let query = Some(IotaObjectResponseQuery {
606 filter: Some(IotaObjectDataFilter::StructType(object_type)),
607 options: Some(
608 IotaObjectDataOptions::new()
609 .with_previous_transaction()
610 .with_type()
611 .with_owner(),
612 ),
613 });
614
615 let result = PagedFn::stream(async |cursor| {
616 self.get_owned_objects(address, query.clone(), cursor, None)
617 .await
618 })
619 .map(|v| v?.try_into())
620 .try_collect::<Vec<_>>()
621 .await?;
622
623 Ok(result)
624 }
625
626 async fn get_object_with_options(
627 &self,
628 object_id: ObjectID,
629 options: IotaObjectDataOptions,
630 ) -> Result<IotaObjectResponse, anyhow::Error> {
631 Ok(self.get_object_with_options(object_id, options).await?)
632 }
633
634 async fn get_reference_gas_price(&self) -> Result<u64, anyhow::Error> {
636 Ok(self.get_reference_gas_price().await?)
637 }
638}
639
640pub trait PagedFn<O, C, F, E>: Sized + Fn(Option<C>) -> F
643where
644 O: Send,
645 C: Send,
646 F: futures::Future<Output = Result<Page<O, C>, E>> + Send,
647{
648 fn collect<T>(self) -> impl futures::Future<Output = Result<T, E>>
650 where
651 T: Default + Extend<O>,
652 {
653 self.stream().try_collect::<T>()
654 }
655
656 fn stream(self) -> PagedStream<O, C, F, E, Self> {
658 PagedStream::new(self)
659 }
660}
661
662impl<O, C, F, E, Fun> PagedFn<O, C, F, E> for Fun
663where
664 Fun: Fn(Option<C>) -> F,
665 O: Send,
666 C: Send,
667 F: futures::Future<Output = Result<Page<O, C>, E>> + Send,
668{
669}
670
671pub struct PagedStream<O, C, F, E, Fun> {
674 fun: Fun,
675 fut: Pin<Box<F>>,
676 next: VecDeque<O>,
677 has_next_page: bool,
678 _data: PhantomData<(E, C)>,
679}
680
681impl<O, C, F, E, Fun> PagedStream<O, C, F, E, Fun>
682where
683 Fun: Fn(Option<C>) -> F,
684{
685 pub fn new(fun: Fun) -> Self {
686 let fut = fun(None);
687 Self {
688 fun,
689 fut: Box::pin(fut),
690 next: Default::default(),
691 has_next_page: true,
692 _data: PhantomData,
693 }
694 }
695}
696
697impl<O, C, F, E, Fun> futures::Stream for PagedStream<O, C, F, E, Fun>
698where
699 O: Send,
700 C: Send,
701 F: futures::Future<Output = Result<Page<O, C>, E>> + Send,
702 Fun: Fn(Option<C>) -> F,
703{
704 type Item = Result<O, E>;
705
706 fn poll_next(
707 self: std::pin::Pin<&mut Self>,
708 cx: &mut std::task::Context<'_>,
709 ) -> Poll<Option<Self::Item>> {
710 let this = unsafe { self.get_unchecked_mut() };
711 if this.next.is_empty() && this.has_next_page {
712 match this.fut.as_mut().poll(cx) {
713 Poll::Ready(res) => match res {
714 Ok(mut page) => {
715 this.next.extend(page.data);
716 this.has_next_page = page.has_next_page;
717 if this.has_next_page {
718 this.fut.set((this.fun)(page.next_cursor.take()));
719 }
720 }
721 Err(e) => {
722 this.has_next_page = false;
723 return Poll::Ready(Some(Err(e)));
724 }
725 },
726 Poll::Pending => return Poll::Pending,
727 }
728 }
729 Poll::Ready(this.next.pop_front().map(Ok))
730 }
731}
732
733#[cfg(test)]
734mod test {
735 use iota_json_rpc_types::Page;
736
737 use super::*;
738
739 #[tokio::test]
740 async fn test_get_all_pages() {
741 let data = (0..10000).collect::<Vec<_>>();
742 struct Endpoint {
743 data: Vec<i32>,
744 }
745
746 impl Endpoint {
747 async fn get_page(&self, cursor: Option<usize>) -> anyhow::Result<Page<i32, usize>> {
748 const PAGE_SIZE: usize = 100;
749 anyhow::ensure!(cursor.is_none_or(|v| v < self.data.len()), "invalid cursor");
750 let index = cursor.unwrap_or_default();
751 let data = self.data[index..]
752 .iter()
753 .copied()
754 .take(PAGE_SIZE)
755 .collect::<Vec<_>>();
756 let has_next_page = self.data.len() > index + PAGE_SIZE;
757 Ok(Page {
758 data,
759 next_cursor: has_next_page.then_some(index + PAGE_SIZE),
760 has_next_page,
761 })
762 }
763 }
764
765 let endpoint = Endpoint { data };
766
767 let mut stream = PagedFn::stream(async |cursor| endpoint.get_page(cursor).await);
768
769 assert_eq!(
770 stream
771 .by_ref()
772 .take(9999)
773 .try_collect::<Vec<_>>()
774 .await
775 .unwrap(),
776 endpoint.data[..9999]
777 );
778 assert_eq!(stream.by_ref().try_next().await.unwrap(), Some(9999));
779 assert!(stream.try_next().await.unwrap().is_none());
780
781 let mut bad_stream = PagedFn::stream(async |_| endpoint.get_page(Some(99999)).await);
782
783 assert!(bad_stream.try_next().await.is_err());
784 }
785}