iota_graphql_rpc/types/
balance.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::str::FromStr;
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    *,
10};
11use diesel::{
12    OptionalExtension, QueryableByName,
13    sql_types::{BigInt as SqlBigInt, Nullable, Text},
14};
15use iota_indexer::types::OwnerType;
16use iota_types::base_types::TypeTag;
17use serde::{Deserialize, Serialize};
18
19use crate::{
20    consistency::Checkpointed,
21    data::{Db, DbConnection, QueryExecutor},
22    error::Error,
23    filter, query,
24    raw_query::RawQuery,
25    types::{
26        available_range::AvailableRange,
27        big_int::BigInt,
28        cursor::{self, Page, RawPaginated, ScanLimited, Target},
29        iota_address::IotaAddress,
30        move_type::MoveType,
31        uint53::UInt53,
32    },
33};
34
35/// The total balance for a particular coin type.
36#[derive(Clone, Debug, SimpleObject)]
37pub(crate) struct Balance {
38    /// Coin type for the balance, such as 0x2::iota::IOTA
39    pub(crate) coin_type: MoveType,
40    /// How many coins of this type constitute the balance
41    pub(crate) coin_object_count: Option<UInt53>,
42    /// Total balance across all coin objects of the coin type
43    pub(crate) total_balance: Option<BigInt>,
44}
45
46/// Representation of a row of balance information from the DB. We read the
47/// balance as a `String` to deal with the large (bigger than 2^63 - 1)
48/// balances.
49#[derive(QueryableByName)]
50pub struct StoredBalance {
51    #[diesel(sql_type = Nullable<Text>)]
52    pub balance: Option<String>,
53    #[diesel(sql_type = Nullable<SqlBigInt>)]
54    pub count: Option<i64>,
55    #[diesel(sql_type = Text)]
56    pub coin_type: String,
57}
58
59pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
60
61/// The inner struct for the `Balance`'s cursor. The `coin_type` is used as the
62/// cursor, while the `checkpoint_viewed_at` sets the consistent upper bound for
63/// the cursor.
64#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
65pub(crate) struct BalanceCursor {
66    #[serde(rename = "t")]
67    coin_type: String,
68    /// The checkpoint sequence number this was viewed at.
69    #[serde(rename = "c")]
70    checkpoint_viewed_at: u64,
71}
72
73impl Balance {
74    /// Query for the balance of coins owned by `address`, of coins with type
75    /// `coin_type`. Note that `coin_type` is the type of
76    /// `0x2::coin::Coin`'s type parameter, not the full type of the coin
77    /// object.
78    pub(crate) async fn query(
79        db: &Db,
80        address: IotaAddress,
81        coin_type: TypeTag,
82        checkpoint_viewed_at: u64,
83    ) -> Result<Option<Balance>, Error> {
84        let max_available_range = db.max_available_range;
85        let stored: Option<StoredBalance> = db
86            .execute_repeatable(move |conn| {
87                if !AvailableRange::is_checkpoint_in_backward_history_range(
88                    conn,
89                    checkpoint_viewed_at,
90                    max_available_range,
91                )? {
92                    return Ok::<_, diesel::result::Error>(None);
93                }
94
95                conn.result(move || {
96                    balance_query(address, Some(coin_type.clone()), checkpoint_viewed_at)
97                        .into_boxed()
98                })
99                .optional()
100            })
101            .await?;
102
103        stored.map(Balance::try_from).transpose()
104    }
105
106    /// Query the database for a `page` of coin balances. Each balance
107    /// represents the total balance for a particular coin type, owned by
108    /// `address`.
109    pub(crate) async fn paginate(
110        db: &Db,
111        page: Page<Cursor>,
112        address: IotaAddress,
113        checkpoint_viewed_at: u64,
114    ) -> Result<Connection<String, Balance>, Error> {
115        // If cursors are provided, defer to the `checkpoint_viewed_at` in the cursor if
116        // they are consistent. Otherwise, use the value from the parameter, or
117        // set to None. This is so that paginated queries are consistent with
118        // the previous query that created the cursor.
119        let cursor_viewed_at = page.validate_cursor_consistency()?;
120        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
121
122        let max_available_range = db.max_available_range;
123
124        let Some((prev, next, results)) = db
125            .execute_repeatable(move |conn| {
126                if !AvailableRange::is_checkpoint_in_backward_history_range(
127                    conn,
128                    checkpoint_viewed_at,
129                    max_available_range,
130                )? {
131                    return Ok::<_, diesel::result::Error>(None);
132                }
133
134                let result = page.paginate_raw_query::<StoredBalance>(
135                    conn,
136                    checkpoint_viewed_at,
137                    balance_query(address, None, checkpoint_viewed_at),
138                )?;
139
140                Ok(Some(result))
141            })
142            .await?
143        else {
144            return Err(Error::Client(
145                "Requested data is outside the available range".to_string(),
146            ));
147        };
148
149        let mut conn = Connection::new(prev, next);
150        for stored in results {
151            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
152            let balance = Balance::try_from(stored)?;
153            conn.edges.push(Edge::new(cursor, balance));
154        }
155
156        Ok(conn)
157    }
158}
159
160impl RawPaginated<Cursor> for StoredBalance {
161    fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
162        filter!(query, "coin_type >= {}", cursor.coin_type.clone())
163    }
164
165    fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
166        filter!(query, "coin_type <= {}", cursor.coin_type.clone())
167    }
168
169    fn order(asc: bool, query: RawQuery) -> RawQuery {
170        if asc {
171            return query.order_by("coin_type ASC");
172        }
173        query.order_by("coin_type DESC")
174    }
175}
176
177impl Target<Cursor> for StoredBalance {
178    fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
179        Cursor::new(BalanceCursor {
180            coin_type: self.coin_type.clone(),
181            checkpoint_viewed_at,
182        })
183    }
184}
185
186impl Checkpointed for Cursor {
187    fn checkpoint_viewed_at(&self) -> u64 {
188        self.checkpoint_viewed_at
189    }
190}
191
192impl ScanLimited for Cursor {}
193
194impl TryFrom<StoredBalance> for Balance {
195    type Error = Error;
196
197    fn try_from(s: StoredBalance) -> Result<Self, Error> {
198        let StoredBalance {
199            balance,
200            count,
201            coin_type,
202        } = s;
203        let total_balance = balance
204            .map(|b| BigInt::from_str(&b))
205            .transpose()
206            .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
207
208        let coin_object_count = count.map(|c| UInt53::from(c as u64));
209
210        let coin_type = TypeTag::from_str(&coin_type)
211            .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?
212            .into();
213
214        Ok(Balance {
215            coin_type,
216            coin_object_count,
217            total_balance,
218        })
219    }
220}
221
222/// Builds the aggregating SQL that totals each coin type's balance for
223/// `address` at `checkpoint_viewed_at`, using the backward-diff tables.
224///
225/// The consistent set of objects at `checkpoint_viewed_at` is the union of two
226/// disjoint sources:
227///
228/// * Source A — `checkpointed_objects` rows for objects that have not changed
229///   since `checkpoint_viewed_at` (no `objects_backward_history` entry with
230///   `superseded_at_checkpoint > checkpoint_viewed_at`).
231/// * Source B — `objects_backward_history` rows for the version of each object
232///   current at `checkpoint_viewed_at`: the earliest version whose
233///   `superseded_at_checkpoint > checkpoint_viewed_at`. Synthetic rows
234///   (`NotYetCreated`, `WrappedOrDeleted`) drop out automatically — they carry
235///   NULL `owner_id`/`coin_type`, so the owner+coin filter excludes them.
236///
237/// Each object can match at most one source: A only returns objects that
238/// did not change after `checkpoint_viewed_at`, while B only returns objects
239/// that did. So `UNION ALL` already gives one row per `object_id` — no
240/// dedup needed before aggregation.
241fn balance_query(
242    address: IotaAddress,
243    coin_type: Option<TypeTag>,
244    checkpoint_viewed_at: u64,
245) -> RawQuery {
246    let checkpoint_viewed_at = checkpoint_viewed_at as i64;
247
248    let checkpointed = filter(
249        query!("SELECT object_id, coin_balance, coin_type FROM checkpointed_objects"),
250        address,
251        coin_type.clone(),
252    );
253    let changed = query!(format!(
254        "SELECT DISTINCT object_id FROM objects_backward_history \
255         WHERE superseded_at_checkpoint > {checkpoint_viewed_at}"
256    ));
257    let source_a = filter!(
258        query!(
259            r#"SELECT candidates.object_id, candidates.coin_balance, candidates.coin_type
260               FROM ({}) candidates
261               LEFT JOIN ({}) changed ON candidates.object_id = changed.object_id"#,
262            checkpointed,
263            changed
264        ),
265        "changed.object_id IS NULL"
266    );
267
268    let mut history = filter(
269        query!(
270            "SELECT object_id, object_version, coin_balance, coin_type \
271             FROM objects_backward_history"
272        ),
273        address,
274        coin_type,
275    );
276    // NotYetCreated and WrappedOrDeleted rows are already excluded above by
277    // `filter`'s `coin_type IS NOT NULL` / `owner_id = ...` clauses, since
278    // `from_empty` leaves those columns NULL.
279    history = filter!(
280        history,
281        format!("superseded_at_checkpoint > {checkpoint_viewed_at}")
282    );
283    let oldest = query!(format!(
284        "SELECT object_id, MIN(object_version) AS min_version \
285         FROM objects_backward_history \
286         WHERE superseded_at_checkpoint > {checkpoint_viewed_at} \
287         GROUP BY object_id"
288    ));
289    let source_b = query!(
290        r#"SELECT candidates.object_id, candidates.coin_balance, candidates.coin_type
291           FROM ({}) candidates
292           JOIN ({}) oldest ON candidates.object_id = oldest.object_id
293               AND candidates.object_version = oldest.min_version"#,
294        history,
295        oldest
296    );
297
298    query!(
299        r#"SELECT
300            CAST(SUM(coin_balance) AS TEXT) as balance,
301            COUNT(*) as count,
302            coin_type
303        FROM (({}) UNION ALL ({})) candidates"#,
304        source_a,
305        source_b
306    )
307    .group_by("coin_type")
308}
309
310/// Applies the filtering criteria for balances to the input `RawQuery` and
311/// returns a new `RawQuery`.
312fn filter(mut query: RawQuery, owner: IotaAddress, coin_type: Option<TypeTag>) -> RawQuery {
313    query = filter!(query, "coin_type IS NOT NULL");
314
315    query = filter!(
316        query,
317        format!(
318            "owner_id = '\\x{}'::bytea AND owner_type = {}",
319            hex::encode(owner.into_vec()),
320            OwnerType::Address as i16
321        )
322    );
323
324    if let Some(coin_type) = coin_type {
325        query = filter!(
326            query,
327            "coin_type = {}",
328            coin_type.to_canonical_string(/* with_prefix */ true)
329        );
330    };
331
332    query
333}