1use std::time::Duration;
8
9use axum::{Extension, Json, extract::State};
10use axum_extra::extract::WithRejection;
11use futures::StreamExt;
12use iota_sdk::{IOTA_COIN_TYPE, IotaClient, rpc_types::StakeStatus};
13use iota_types::base_types::IotaAddress;
14use tracing::info;
15
16use crate::{
17 IotaEnv, OnlineServerContext,
18 errors::Error,
19 types::{
20 AccountBalanceRequest, AccountBalanceResponse, AccountCoinsRequest, AccountCoinsResponse,
21 Amount, Coin, SubAccount, SubAccountType, SubBalance,
22 },
23};
24
25pub async fn balance(
29 State(ctx): State<OnlineServerContext>,
30 Extension(env): Extension<IotaEnv>,
31 WithRejection(Json(request), _): WithRejection<Json<AccountBalanceRequest>, Error>,
32) -> Result<AccountBalanceResponse, Error> {
33 env.check_network_identifier(&request.network_identifier)?;
34 let address = request.account_identifier.address;
35 let mut retry_attempts = 5;
36 if let Some(SubAccount { account_type }) = request.account_identifier.sub_account {
37 while retry_attempts > 0 {
38 let balances_first =
39 get_sub_account_balances(account_type.clone(), &ctx.client, address).await?;
40 let checkpoint1 = ctx
41 .client
42 .read_api()
43 .get_latest_checkpoint_sequence_number()
44 .await?;
45 let mut checkpoint2 = ctx
47 .client
48 .read_api()
49 .get_latest_checkpoint_sequence_number()
50 .await?;
51
52 while checkpoint2 <= checkpoint1 {
53 checkpoint2 = ctx
54 .client
55 .read_api()
56 .get_latest_checkpoint_sequence_number()
57 .await?;
58 tokio::time::sleep(Duration::from_secs(1)).await;
59 }
60 let balances_second =
61 get_sub_account_balances(account_type.clone(), &ctx.client, address).await?;
62 if balances_first.eq(&balances_second) {
63 return Ok(AccountBalanceResponse {
64 block_identifier: ctx.blocks().create_block_identifier(checkpoint2).await?,
65 balances: balances_first,
66 });
67 } else {
68 retry_attempts -= 1;
70 }
71 }
72 Err(Error::RetryExhausted(String::from("retry")))
73 } else {
74 while retry_attempts > 0 {
76 let balances_first = ctx
77 .client
78 .coin_read_api()
79 .get_balance(address, Some(IOTA_COIN_TYPE.to_string()))
80 .await?
81 .total_balance as i128;
82
83 let checkpoint1 = ctx
85 .client
86 .read_api()
87 .get_latest_checkpoint_sequence_number()
88 .await?;
89
90 let mut checkpoint2 = ctx
92 .client
93 .read_api()
94 .get_latest_checkpoint_sequence_number()
95 .await?;
96
97 while checkpoint2 <= checkpoint1 {
98 checkpoint2 = ctx
99 .client
100 .read_api()
101 .get_latest_checkpoint_sequence_number()
102 .await?;
103 tokio::time::sleep(Duration::from_secs(1)).await;
104 }
105
106 let balances_second = ctx
108 .client
109 .coin_read_api()
110 .get_balance(address, Some(IOTA_COIN_TYPE.to_string()))
111 .await?
112 .total_balance as i128;
113
114 if balances_first.eq(&balances_second) {
117 info!(
118 "same balance for account {} at checkpoint {}",
119 address, checkpoint2
120 );
121 return Ok(AccountBalanceResponse {
122 block_identifier: ctx.blocks().create_block_identifier(checkpoint2).await?,
123 balances: vec![Amount::new(balances_first)],
124 });
125 } else {
126 info!(
128 "different balance for account {} at checkpoint {}",
129 address, checkpoint2
130 );
131 retry_attempts -= 1;
132 }
133 }
134 Err(Error::RetryExhausted(String::from("retry")))
135 }
136}
137
138async fn get_sub_account_balances(
139 account_type: SubAccountType,
140 client: &IotaClient,
141 address: IotaAddress,
142) -> Result<Vec<Amount>, Error> {
143 let amounts = match account_type {
144 SubAccountType::Stake => {
145 let delegations = client.governance_api().get_stakes(address).await?;
146 delegations.into_iter().fold(vec![], |mut amounts, stakes| {
147 for stake in &stakes.stakes {
148 if let StakeStatus::Active { .. } = stake.status {
149 amounts.push(SubBalance {
150 stake_id: stake.staked_iota_id,
151 validator: stakes.validator_address,
152 value: stake.principal as i128,
153 });
154 }
155 }
156 amounts
157 })
158 }
159 SubAccountType::PendingStake => {
160 let delegations = client.governance_api().get_stakes(address).await?;
161 delegations.into_iter().fold(vec![], |mut amounts, stakes| {
162 for stake in &stakes.stakes {
163 if let StakeStatus::Pending = stake.status {
164 amounts.push(SubBalance {
165 stake_id: stake.staked_iota_id,
166 validator: stakes.validator_address,
167 value: stake.principal as i128,
168 });
169 }
170 }
171 amounts
172 })
173 }
174
175 SubAccountType::EstimatedReward => {
176 let delegations = client.governance_api().get_stakes(address).await?;
177 delegations.into_iter().fold(vec![], |mut amounts, stakes| {
178 for stake in &stakes.stakes {
179 if let StakeStatus::Active { estimated_reward } = stake.status {
180 amounts.push(SubBalance {
181 stake_id: stake.staked_iota_id,
182 validator: stakes.validator_address,
183 value: estimated_reward as i128,
184 });
185 }
186 }
187 amounts
188 })
189 }
190 };
191
192 Ok(if amounts.is_empty() {
194 vec![Amount::new(0)]
195 } else {
196 vec![Amount::new_from_sub_balances(amounts)]
197 })
198}
199
200pub async fn coins(
203 State(context): State<OnlineServerContext>,
204 Extension(env): Extension<IotaEnv>,
205 WithRejection(Json(request), _): WithRejection<Json<AccountCoinsRequest>, Error>,
206) -> Result<AccountCoinsResponse, Error> {
207 env.check_network_identifier(&request.network_identifier)?;
208 let coins = context
209 .client
210 .coin_read_api()
211 .get_coins_stream(
212 request.account_identifier.address,
213 Some(IOTA_COIN_TYPE.to_string()),
214 )
215 .map(Coin::from)
216 .collect()
217 .await;
218
219 Ok(AccountCoinsResponse {
220 block_identifier: context.blocks().current_block_identifier().await?,
221 coins,
222 })
223}