iota_graphql_rpc/types/
balance.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::str::FromStr;
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    *,
10};
11use diesel::{
12    OptionalExtension, QueryableByName,
13    sql_types::{BigInt as SqlBigInt, Nullable, Text},
14};
15use iota_indexer::types::OwnerType;
16use iota_types::{TypeTag, parse_iota_type_tag};
17use serde::{Deserialize, Serialize};
18
19use crate::{
20    consistency::Checkpointed,
21    data::{Db, DbConnection, QueryExecutor},
22    error::Error,
23    filter, query,
24    raw_query::RawQuery,
25    types::{
26        available_range::AvailableRange,
27        big_int::BigInt,
28        cursor::{self, Page, RawPaginated, ScanLimited, Target},
29        iota_address::IotaAddress,
30        move_type::MoveType,
31        uint53::UInt53,
32    },
33};
34
35/// The total balance for a particular coin type.
36#[derive(Clone, Debug, SimpleObject)]
37pub(crate) struct Balance {
38    /// Coin type for the balance, such as 0x2::iota::IOTA
39    pub(crate) coin_type: MoveType,
40    /// How many coins of this type constitute the balance
41    pub(crate) coin_object_count: Option<UInt53>,
42    /// Total balance across all coin objects of the coin type
43    pub(crate) total_balance: Option<BigInt>,
44}
45
46/// Representation of a row of balance information from the DB. We read the
47/// balance as a `String` to deal with the large (bigger than 2^63 - 1)
48/// balances.
49#[derive(QueryableByName)]
50pub struct StoredBalance {
51    #[diesel(sql_type = Nullable<Text>)]
52    pub balance: Option<String>,
53    #[diesel(sql_type = Nullable<SqlBigInt>)]
54    pub count: Option<i64>,
55    #[diesel(sql_type = Text)]
56    pub coin_type: String,
57}
58
59pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
60
61/// The inner struct for the `Balance`'s cursor. The `coin_type` is used as the
62/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
63/// the cursor.
64#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
65pub(crate) struct BalanceCursor {
66    #[serde(rename = "t")]
67    coin_type: String,
68    /// The checkpoint sequence number this was viewed at.
69    #[serde(rename = "c")]
70    checkpoint_viewed_at: u64,
71}
72
73impl Balance {
74    /// Query for the balance of coins owned by `address`, of coins with type
75    /// `coin_type`. Note that `coin_type` is the type of
76    /// `0x2::coin::Coin`'s type parameter, not the full type of the coin
77    /// object.
78    pub(crate) async fn query(
79        db: &Db,
80        address: IotaAddress,
81        coin_type: TypeTag,
82        checkpoint_viewed_at: u64,
83    ) -> Result<Option<Balance>, Error> {
84        let stored: Option<StoredBalance> = db
85            .execute_repeatable(move |conn| {
86                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
87                    return Ok::<_, diesel::result::Error>(None);
88                };
89
90                conn.result(move || {
91                    balance_query(address, Some(coin_type.clone()), range).into_boxed()
92                })
93                .optional()
94            })
95            .await?;
96
97        stored.map(Balance::try_from).transpose()
98    }
99
100    /// Query the database for a `page` of coin balances. Each balance
101    /// represents the total balance for a particular coin type, owned by
102    /// `address`.
103    pub(crate) async fn paginate(
104        db: &Db,
105        page: Page<Cursor>,
106        address: IotaAddress,
107        checkpoint_viewed_at: u64,
108    ) -> Result<Connection<String, Balance>, Error> {
109        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
110        // they are consistent. Otherwise, use the value from the parameter, or
111        // set to None. This is so that paginated queries are consistent with
112        // the previous query that created the cursor.
113        let cursor_viewed_at = page.validate_cursor_consistency()?;
114        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
115
116        let Some((prev, next, results)) = db
117            .execute_repeatable(move |conn| {
118                let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
119                    return Ok::<_, diesel::result::Error>(None);
120                };
121
122                let result = page.paginate_raw_query::<StoredBalance>(
123                    conn,
124                    checkpoint_viewed_at,
125                    balance_query(address, None, range),
126                )?;
127
128                Ok(Some(result))
129            })
130            .await?
131        else {
132            return Err(Error::Client(
133                "Requested data is outside the available range".to_string(),
134            ));
135        };
136
137        let mut conn = Connection::new(prev, next);
138        for stored in results {
139            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
140            let balance = Balance::try_from(stored)?;
141            conn.edges.push(Edge::new(cursor, balance));
142        }
143
144        Ok(conn)
145    }
146}
147
148impl RawPaginated<Cursor> for StoredBalance {
149    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
150        filter!(query, "coin_type >= {}", cursor.coin_type.clone())
151    }
152
153    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
154        filter!(query, "coin_type <= {}", cursor.coin_type.clone())
155    }
156
157    fn order(asc: bool, query: RawQuery) -> RawQuery {
158        if asc {
159            return query.order_by("coin_type ASC");
160        }
161        query.order_by("coin_type DESC")
162    }
163}
164
165impl Target<Cursor> for StoredBalance {
166    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
167        Cursor::new(BalanceCursor {
168            coin_type: self.coin_type.clone(),
169            checkpoint_viewed_at,
170        })
171    }
172}
173
174impl Checkpointed for Cursor {
175    fn checkpoint_viewed_at(&self) -> u64 {
176        self.checkpoint_viewed_at
177    }
178}
179
180impl ScanLimited for Cursor {}
181
182impl TryFrom<StoredBalance> for Balance {
183    type Error = Error;
184
185    fn try_from(s: StoredBalance) -> Result<Self, Error> {
186        let StoredBalance {
187            balance,
188            count,
189            coin_type,
190        } = s;
191        let total_balance = balance
192            .map(|b| BigInt::from_str(&b))
193            .transpose()
194            .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
195
196        let coin_object_count = count.map(|c| UInt53::from(c as u64));
197
198        let coin_type = MoveType::new(
199            parse_iota_type_tag(&coin_type)
200                .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?,
201        );
202
203        Ok(Balance {
204            coin_type,
205            coin_object_count,
206            total_balance,
207        })
208    }
209}
210
211/// Query the database for a `page` of coin balances. Each balance represents
212/// the total balance for a particular coin type, owned by `address`. This
213/// function is meant to be called within a thunk and returns a RawQuery that
214/// can be converted into a BoxedSqlQuery with `.into_boxed()`.
215fn balance_query(
216    address: IotaAddress,
217    coin_type: Option<TypeTag>,
218    range: AvailableRange,
219) -> RawQuery {
220    // Construct the filtered inner query - apply the same filtering criteria to
221    // both objects_snapshot and objects_history tables.
222    let mut snapshot_objs = query!("SELECT * FROM objects_snapshot");
223    snapshot_objs = filter(snapshot_objs, address, coin_type.clone());
224
225    // Additionally filter objects_history table for results between the available
226    // range, or checkpoint_viewed_at, if provided.
227    let mut history_objs = query!("SELECT * FROM objects_history");
228    history_objs = filter(history_objs, address, coin_type.clone());
229    history_objs = filter!(
230        history_objs,
231        format!(
232            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
233            range.first, range.last
234        )
235    );
236
237    // Combine the two queries, and select the most recent version of each object.
238    let candidates = query!(
239        r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) o"#,
240        snapshot_objs,
241        history_objs
242    )
243    .order_by("object_id")
244    .order_by("object_version DESC");
245
246    // Objects that fulfill the filtering criteria may not be the most recent
247    // version available. Left join the candidates table on newer to filter out
248    // any objects that have a newer version.
249    let mut newer = query!("SELECT object_id, object_version FROM objects_history");
250    newer = filter!(
251        newer,
252        format!(
253            r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
254            range.first, range.last
255        )
256    );
257    let final_ = query!(
258        r#"SELECT
259            CAST(SUM(coin_balance) AS TEXT) as balance,
260            COUNT(*) as count,
261            coin_type
262        FROM ({}) candidates
263        LEFT JOIN ({}) newer
264        ON (
265            candidates.object_id = newer.object_id
266            AND candidates.object_version < newer.object_version
267        )"#,
268        candidates,
269        newer
270    );
271
272    // Additionally for balance's query, group coins by coin_type.
273    filter!(final_, "newer.object_version IS NULL").group_by("coin_type")
274}
275
276/// Applies the filtering criteria for balances to the input `RawQuery` and
277/// returns a new `RawQuery`.
278fn filter(mut query: RawQuery, owner: IotaAddress, coin_type: Option<TypeTag>) -> RawQuery {
279    query = filter!(query, "coin_type IS NOT NULL");
280
281    query = filter!(
282        query,
283        format!(
284            "owner_id = '\\x{}'::bytea AND owner_type = {}",
285            hex::encode(owner.into_vec()),
286            OwnerType::Address as i16
287        )
288    );
289
290    if let Some(coin_type) = coin_type {
291        query = filter!(
292            query,
293            "coin_type = {}",
294            coin_type.to_canonical_display(/* with_prefix */ true)
295        );
296    };
297
298    query
299}