iota_rosetta/
account.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! This module implements the [Rosetta Account API](https://www.rosetta-api.org/docs/AccountApi.html)
6
7use std::time::Duration;
8
9use axum::{Extension, Json, extract::State};
10use axum_extra::extract::WithRejection;
11use futures::StreamExt;
12use iota_sdk::{IOTA_COIN_TYPE, IotaClient, rpc_types::StakeStatus};
13use iota_types::base_types::IotaAddress;
14use tracing::info;
15
16use crate::{
17    IotaEnv, OnlineServerContext,
18    errors::Error,
19    types::{
20        AccountBalanceRequest, AccountBalanceResponse, AccountCoinsRequest, AccountCoinsResponse,
21        Amount, Coin, SubAccount, SubAccountType, SubBalance,
22    },
23};
24
25/// Get an array of all AccountBalances for an AccountIdentifier and the
26/// BlockIdentifier at which the balance lookup was performed.
27/// [Rosetta API Spec](https://www.rosetta-api.org/docs/AccountApi.html#accountbalance)
28pub async fn balance(
29    State(ctx): State<OnlineServerContext>,
30    Extension(env): Extension<IotaEnv>,
31    WithRejection(Json(request), _): WithRejection<Json<AccountBalanceRequest>, Error>,
32) -> Result<AccountBalanceResponse, Error> {
33    env.check_network_identifier(&request.network_identifier)?;
34    let address = request.account_identifier.address;
35    let mut retry_attempts = 5;
36    if let Some(SubAccount { account_type }) = request.account_identifier.sub_account {
37        while retry_attempts > 0 {
38            let balances_first =
39                get_sub_account_balances(account_type.clone(), &ctx.client, address).await?;
40            let checkpoint1 = ctx
41                .client
42                .read_api()
43                .get_latest_checkpoint_sequence_number()
44                .await?;
45            // Get another checkpoint which is greater than current
46            let mut checkpoint2 = ctx
47                .client
48                .read_api()
49                .get_latest_checkpoint_sequence_number()
50                .await?;
51
52            while checkpoint2 <= checkpoint1 {
53                checkpoint2 = ctx
54                    .client
55                    .read_api()
56                    .get_latest_checkpoint_sequence_number()
57                    .await?;
58                tokio::time::sleep(Duration::from_secs(1)).await;
59            }
60            let balances_second =
61                get_sub_account_balances(account_type.clone(), &ctx.client, address).await?;
62            if balances_first.eq(&balances_second) {
63                return Ok(AccountBalanceResponse {
64                    block_identifier: ctx.blocks().create_block_identifier(checkpoint2).await?,
65                    balances: balances_first,
66                });
67            } else {
68                // retry logic needs to be aaded
69                retry_attempts -= 1;
70            }
71        }
72        Err(Error::RetryExhausted(String::from("retry")))
73    } else {
74        // Get current live balance
75        while retry_attempts > 0 {
76            let balances_first = ctx
77                .client
78                .coin_read_api()
79                .get_balance(address, Some(IOTA_COIN_TYPE.to_string()))
80                .await?
81                .total_balance as i128;
82
83            // Get current latest checkpoint
84            let checkpoint1 = ctx
85                .client
86                .read_api()
87                .get_latest_checkpoint_sequence_number()
88                .await?;
89
90            // Get another checkpoint which is greater than current
91            let mut checkpoint2 = ctx
92                .client
93                .read_api()
94                .get_latest_checkpoint_sequence_number()
95                .await?;
96
97            while checkpoint2 <= checkpoint1 {
98                checkpoint2 = ctx
99                    .client
100                    .read_api()
101                    .get_latest_checkpoint_sequence_number()
102                    .await?;
103                tokio::time::sleep(Duration::from_secs(1)).await;
104            }
105
106            // Get live balance again
107            let balances_second = ctx
108                .client
109                .coin_read_api()
110                .get_balance(address, Some(IOTA_COIN_TYPE.to_string()))
111                .await?
112                .total_balance as i128;
113
114            // if those two live balances are equal then that is the current balance for
115            // checkpoint2
116            if balances_first.eq(&balances_second) {
117                info!(
118                    "same balance for account {} at checkpoint {}",
119                    address, checkpoint2
120                );
121                return Ok(AccountBalanceResponse {
122                    block_identifier: ctx.blocks().create_block_identifier(checkpoint2).await?,
123                    balances: vec![Amount::new(balances_first)],
124                });
125            } else {
126                // balances are different so we need to try again.
127                info!(
128                    "different balance for account {} at checkpoint {}",
129                    address, checkpoint2
130                );
131                retry_attempts -= 1;
132            }
133        }
134        Err(Error::RetryExhausted(String::from("retry")))
135    }
136}
137
138async fn get_sub_account_balances(
139    account_type: SubAccountType,
140    client: &IotaClient,
141    address: IotaAddress,
142) -> Result<Vec<Amount>, Error> {
143    let amounts = match account_type {
144        SubAccountType::Stake => {
145            let delegations = client.governance_api().get_stakes(address).await?;
146            delegations.into_iter().fold(vec![], |mut amounts, stakes| {
147                for stake in &stakes.stakes {
148                    if let StakeStatus::Active { .. } = stake.status {
149                        amounts.push(SubBalance {
150                            stake_id: stake.staked_iota_id,
151                            validator: stakes.validator_address,
152                            value: stake.principal as i128,
153                        });
154                    }
155                }
156                amounts
157            })
158        }
159        SubAccountType::PendingStake => {
160            let delegations = client.governance_api().get_stakes(address).await?;
161            delegations.into_iter().fold(vec![], |mut amounts, stakes| {
162                for stake in &stakes.stakes {
163                    if let StakeStatus::Pending = stake.status {
164                        amounts.push(SubBalance {
165                            stake_id: stake.staked_iota_id,
166                            validator: stakes.validator_address,
167                            value: stake.principal as i128,
168                        });
169                    }
170                }
171                amounts
172            })
173        }
174
175        SubAccountType::EstimatedReward => {
176            let delegations = client.governance_api().get_stakes(address).await?;
177            delegations.into_iter().fold(vec![], |mut amounts, stakes| {
178                for stake in &stakes.stakes {
179                    if let StakeStatus::Active { estimated_reward } = stake.status {
180                        amounts.push(SubBalance {
181                            stake_id: stake.staked_iota_id,
182                            validator: stakes.validator_address,
183                            value: estimated_reward as i128,
184                        });
185                    }
186                }
187                amounts
188            })
189        }
190    };
191
192    // Make sure there are always one amount returned
193    Ok(if amounts.is_empty() {
194        vec![Amount::new(0)]
195    } else {
196        vec![Amount::new_from_sub_balances(amounts)]
197    })
198}
199
200/// Get an array of all unspent coins for an AccountIdentifier and the
201/// BlockIdentifier at which the lookup was performed. . [Rosetta API Spec](https://www.rosetta-api.org/docs/AccountApi.html#accountcoins)
202pub async fn coins(
203    State(context): State<OnlineServerContext>,
204    Extension(env): Extension<IotaEnv>,
205    WithRejection(Json(request), _): WithRejection<Json<AccountCoinsRequest>, Error>,
206) -> Result<AccountCoinsResponse, Error> {
207    env.check_network_identifier(&request.network_identifier)?;
208    let coins = context
209        .client
210        .coin_read_api()
211        .get_coins_stream(
212            request.account_identifier.address,
213            Some(IOTA_COIN_TYPE.to_string()),
214        )
215        .map(Coin::from)
216        .collect()
217        .await;
218
219    Ok(AccountCoinsResponse {
220        block_identifier: context.blocks().current_block_identifier().await?,
221        coins,
222    })
223}