iota_graphql_rpc/types/
balance.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::str::FromStr;
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    *,
10};
11use diesel::{
12    OptionalExtension, QueryableByName,
13    sql_types::{BigInt as SqlBigInt, Nullable, Text},
14};
15use iota_indexer::types::OwnerType;
16use iota_types::TypeTag;
17use serde::{Deserialize, Serialize};
18
19use crate::{
20    consistency::Checkpointed,
21    data::{Db, DbConnection, QueryExecutor},
22    error::Error,
23    filter, query,
24    raw_query::RawQuery,
25    types::{
26        available_range::AvailableRange,
27        big_int::BigInt,
28        cursor::{self, Page, RawPaginated, ScanLimited, Target},
29        iota_address::IotaAddress,
30        move_type::MoveType,
31        uint53::UInt53,
32    },
33};
34
35/// The total balance for a particular coin type.
36#[derive(Clone, Debug, SimpleObject)]
37pub(crate) struct Balance {
38    /// Coin type for the balance, such as 0x2::iota::IOTA
39    pub(crate) coin_type: MoveType,
40    /// How many coins of this type constitute the balance
41    pub(crate) coin_object_count: Option<UInt53>,
42    /// Total balance across all coin objects of the coin type
43    pub(crate) total_balance: Option<BigInt>,
44}
45
46/// Representation of a row of balance information from the DB. We read the
47/// balance as a `String` to deal with the large (bigger than 2^63 - 1)
48/// balances.
49#[derive(QueryableByName)]
50pub struct StoredBalance {
51    #[diesel(sql_type = Nullable<Text>)]
52    pub balance: Option<String>,
53    #[diesel(sql_type = Nullable<SqlBigInt>)]
54    pub count: Option<i64>,
55    #[diesel(sql_type = Text)]
56    pub coin_type: String,
57}
58
59pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
60
61/// The inner struct for the `Balance`'s cursor. The `coin_type` is used as the
62/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
63/// the cursor.
64#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
65pub(crate) struct BalanceCursor {
66    #[serde(rename = "t")]
67    coin_type: String,
68    /// The checkpoint sequence number this was viewed at.
69    #[serde(rename = "c")]
70    checkpoint_viewed_at: u64,
71}
72
73impl Balance {
74    /// Query for the balance of coins owned by `address`, of coins with type
75    /// `coin_type`. Note that `coin_type` is the type of
76    /// `0x2::coin::Coin`'s type parameter, not the full type of the coin
77    /// object.
78    pub(crate) async fn query(
79        db: &Db,
80        address: IotaAddress,
81        coin_type: TypeTag,
82        checkpoint_viewed_at: u64,
83    ) -> Result<Option<Balance>, Error> {
84        let stored: Option<StoredBalance> = db
85            .execute_repeatable(move |conn| {
86                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
87                    return Ok::<_, diesel::result::Error>(None);
88                };
89
90                conn.result(move || {
91                    balance_query(address, Some(coin_type.clone()), range).into_boxed()
92                })
93                .optional()
94            })
95            .await?;
96
97        stored.map(Balance::try_from).transpose()
98    }
99
100    /// Query the database for a `page` of coin balances. Each balance
101    /// represents the total balance for a particular coin type, owned by
102    /// `address`.
103    pub(crate) async fn paginate(
104        db: &Db,
105        page: Page<Cursor>,
106        address: IotaAddress,
107        checkpoint_viewed_at: u64,
108    ) -> Result<Connection<String, Balance>, Error> {
109        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
110        // they are consistent. Otherwise, use the value from the parameter, or
111        // set to None. This is so that paginated queries are consistent with
112        // the previous query that created the cursor.
113        let cursor_viewed_at = page.validate_cursor_consistency()?;
114        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
115
116        let Some((prev, next, results)) = db
117            .execute_repeatable(move |conn| {
118                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
119                    return Ok::<_, diesel::result::Error>(None);
120                };
121
122                let result = page.paginate_raw_query::<StoredBalance>(
123                    conn,
124                    checkpoint_viewed_at,
125                    balance_query(address, None, range),
126                )?;
127
128                Ok(Some(result))
129            })
130            .await?
131        else {
132            return Err(Error::Client(
133                "Requested data is outside the available range".to_string(),
134            ));
135        };
136
137        let mut conn = Connection::new(prev, next);
138        for stored in results {
139            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
140            let balance = Balance::try_from(stored)?;
141            conn.edges.push(Edge::new(cursor, balance));
142        }
143
144        Ok(conn)
145    }
146}
147
148impl RawPaginated<Cursor> for StoredBalance {
149    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
150        filter!(query, "coin_type >= {}", cursor.coin_type.clone())
151    }
152
153    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
154        filter!(query, "coin_type <= {}", cursor.coin_type.clone())
155    }
156
157    fn order(asc: bool, query: RawQuery) -> RawQuery {
158        if asc {
159            return query.order_by("coin_type ASC");
160        }
161        query.order_by("coin_type DESC")
162    }
163}
164
165impl Target<Cursor> for StoredBalance {
166    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
167        Cursor::new(BalanceCursor {
168            coin_type: self.coin_type.clone(),
169            checkpoint_viewed_at,
170        })
171    }
172}
173
174impl Checkpointed for Cursor {
175    fn checkpoint_viewed_at(&self) -> u64 {
176        self.checkpoint_viewed_at
177    }
178}
179
180impl ScanLimited for Cursor {}
181
182impl TryFrom<StoredBalance> for Balance {
183    type Error = Error;
184
185    fn try_from(s: StoredBalance) -> Result<Self, Error> {
186        let StoredBalance {
187            balance,
188            count,
189            coin_type,
190        } = s;
191        let total_balance = balance
192            .map(|b| BigInt::from_str(&b))
193            .transpose()
194            .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
195
196        let coin_object_count = count.map(|c| UInt53::from(c as u64));
197
198        let coin_type = TypeTag::from_str(&coin_type)
199            .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?
200            .into();
201
202        Ok(Balance {
203            coin_type,
204            coin_object_count,
205            total_balance,
206        })
207    }
208}
209
210/// Query the database for a `page` of coin balances. Each balance represents
211/// the total balance for a particular coin type, owned by `address`. This
212/// function is meant to be called within a thunk and returns a RawQuery that
213/// can be converted into a BoxedSqlQuery with `.into_boxed()`.
214fn balance_query(
215    address: IotaAddress,
216    coin_type: Option<TypeTag>,
217    range: AvailableRange,
218) -> RawQuery {
219    // Construct the filtered inner query - apply the same filtering criteria to
220    // both objects_snapshot and objects_history tables.
221    let mut snapshot_objs = query!("SELECT * FROM objects_snapshot");
222    snapshot_objs = filter(snapshot_objs, address, coin_type.clone());
223
224    // Additionally filter objects_history table for results between the available
225    // range, or checkpoint_viewed_at, if provided.
226    let mut history_objs = query!("SELECT * FROM objects_history");
227    history_objs = filter(history_objs, address, coin_type.clone());
228    history_objs = filter!(
229        history_objs,
230        format!(
231            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
232            range.first, range.last
233        )
234    );
235
236    // Combine the two queries, and select the most recent version of each object.
237    let candidates = query!(
238        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) o"#,
239        snapshot_objs,
240        history_objs
241    )
242    .order_by("object_id")
243    .order_by("object_version DESC");
244
245    // Objects that fulfill the filtering criteria may not be the most recent
246    // version available. Left join the candidates table on newer to filter out
247    // any objects that have a newer version.
248    let mut newer = query!("SELECT object_id, object_version FROM objects_history");
249    newer = filter!(
250        newer,
251        format!(
252            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
253            range.first, range.last
254        )
255    );
256    let final_ = query!(
257        r#"SELECT
258            CAST(SUM(coin_balance) AS TEXT) as balance,
259            COUNT(*) as count,
260            coin_type
261        FROM ({}) candidates
262        LEFT JOIN ({}) newer
263        ON (
264            candidates.object_id = newer.object_id
265            AND candidates.object_version < newer.object_version
266        )"#,
267        candidates,
268        newer
269    );
270
271    // Additionally for balance's query, group coins by coin_type.
272    filter!(final_, "newer.object_version IS NULL").group_by("coin_type")
273}
274
275/// Applies the filtering criteria for balances to the input `RawQuery` and
276/// returns a new `RawQuery`.
277fn filter(mut query: RawQuery, owner: IotaAddress, coin_type: Option<TypeTag>) -> RawQuery {
278    query = filter!(query, "coin_type IS NOT NULL");
279
280    query = filter!(
281        query,
282        format!(
283            "owner_id = '\\x{}'::bytea AND owner_type = {}",
284            hex::encode(owner.into_vec()),
285            OwnerType::Address as i16
286        )
287    );
288
289    if let Some(coin_type) = coin_type {
290        query = filter!(
291            query,
292            "coin_type = {}",
293            coin_type.to_canonical_display(/* with_prefix */ true)
294        );
295    };
296
297    query
298}