1use std::str::FromStr;
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 *,
10};
11use diesel::{
12 OptionalExtension, QueryableByName,
13 sql_types::{BigInt as SqlBigInt, Nullable, Text},
14};
15use iota_indexer::types::OwnerType;
16use iota_types::base_types::TypeTag;
17use serde::{Deserialize, Serialize};
18
19use crate::{
20 consistency::Checkpointed,
21 data::{Db, DbConnection, QueryExecutor},
22 error::Error,
23 filter, query,
24 raw_query::RawQuery,
25 types::{
26 available_range::AvailableRange,
27 big_int::BigInt,
28 cursor::{self, Page, RawPaginated, ScanLimited, Target},
29 iota_address::IotaAddress,
30 move_type::MoveType,
31 uint53::UInt53,
32 },
33};
34
35#[derive(Clone, Debug, SimpleObject)]
37pub(crate) struct Balance {
38 pub(crate) coin_type: MoveType,
40 pub(crate) coin_object_count: Option<UInt53>,
42 pub(crate) total_balance: Option<BigInt>,
44}
45
46#[derive(QueryableByName)]
50pub struct StoredBalance {
51 #[diesel(sql_type = Nullable<Text>)]
52 pub balance: Option<String>,
53 #[diesel(sql_type = Nullable<SqlBigInt>)]
54 pub count: Option<i64>,
55 #[diesel(sql_type = Text)]
56 pub coin_type: String,
57}
58
59pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
60
61#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
65pub(crate) struct BalanceCursor {
66 #[serde(rename = "t")]
67 coin_type: String,
68 #[serde(rename = "c")]
70 checkpoint_viewed_at: u64,
71}
72
73impl Balance {
74 pub(crate) async fn query(
79 db: &Db,
80 address: IotaAddress,
81 coin_type: TypeTag,
82 checkpoint_viewed_at: u64,
83 ) -> Result<Option<Balance>, Error> {
84 let max_available_range = db.max_available_range;
85 let stored: Option<StoredBalance> = db
86 .execute_repeatable(move |conn| {
87 if !AvailableRange::is_checkpoint_in_backward_history_range(
88 conn,
89 checkpoint_viewed_at,
90 max_available_range,
91 )? {
92 return Ok::<_, diesel::result::Error>(None);
93 }
94
95 conn.result(move || {
96 balance_query(address, Some(coin_type.clone()), checkpoint_viewed_at)
97 .into_boxed()
98 })
99 .optional()
100 })
101 .await?;
102
103 stored.map(Balance::try_from).transpose()
104 }
105
106 pub(crate) async fn paginate(
110 db: &Db,
111 page: Page<Cursor>,
112 address: IotaAddress,
113 checkpoint_viewed_at: u64,
114 ) -> Result<Connection<String, Balance>, Error> {
115 let cursor_viewed_at = page.validate_cursor_consistency()?;
120 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
121
122 let max_available_range = db.max_available_range;
123
124 let Some((prev, next, results)) = db
125 .execute_repeatable(move |conn| {
126 if !AvailableRange::is_checkpoint_in_backward_history_range(
127 conn,
128 checkpoint_viewed_at,
129 max_available_range,
130 )? {
131 return Ok::<_, diesel::result::Error>(None);
132 }
133
134 let result = page.paginate_raw_query::<StoredBalance>(
135 conn,
136 checkpoint_viewed_at,
137 balance_query(address, None, checkpoint_viewed_at),
138 )?;
139
140 Ok(Some(result))
141 })
142 .await?
143 else {
144 return Err(Error::Client(
145 "Requested data is outside the available range".to_string(),
146 ));
147 };
148
149 let mut conn = Connection::new(prev, next);
150 for stored in results {
151 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
152 let balance = Balance::try_from(stored)?;
153 conn.edges.push(Edge::new(cursor, balance));
154 }
155
156 Ok(conn)
157 }
158}
159
160impl RawPaginated<Cursor> for StoredBalance {
161 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
162 filter!(query, "coin_type >= {}", cursor.coin_type.clone())
163 }
164
165 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
166 filter!(query, "coin_type <= {}", cursor.coin_type.clone())
167 }
168
169 fn order(asc: bool, query: RawQuery) -> RawQuery {
170 if asc {
171 return query.order_by("coin_type ASC");
172 }
173 query.order_by("coin_type DESC")
174 }
175}
176
177impl Target<Cursor> for StoredBalance {
178 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
179 Cursor::new(BalanceCursor {
180 coin_type: self.coin_type.clone(),
181 checkpoint_viewed_at,
182 })
183 }
184}
185
186impl Checkpointed for Cursor {
187 fn checkpoint_viewed_at(&self) -> u64 {
188 self.checkpoint_viewed_at
189 }
190}
191
192impl ScanLimited for Cursor {}
193
194impl TryFrom<StoredBalance> for Balance {
195 type Error = Error;
196
197 fn try_from(s: StoredBalance) -> Result<Self, Error> {
198 let StoredBalance {
199 balance,
200 count,
201 coin_type,
202 } = s;
203 let total_balance = balance
204 .map(|b| BigInt::from_str(&b))
205 .transpose()
206 .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
207
208 let coin_object_count = count.map(|c| UInt53::from(c as u64));
209
210 let coin_type = TypeTag::from_str(&coin_type)
211 .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?
212 .into();
213
214 Ok(Balance {
215 coin_type,
216 coin_object_count,
217 total_balance,
218 })
219 }
220}
221
222fn balance_query(
242 address: IotaAddress,
243 coin_type: Option<TypeTag>,
244 checkpoint_viewed_at: u64,
245) -> RawQuery {
246 let checkpoint_viewed_at = checkpoint_viewed_at as i64;
247
248 let checkpointed = filter(
249 query!("SELECT object_id, coin_balance, coin_type FROM checkpointed_objects"),
250 address,
251 coin_type.clone(),
252 );
253 let changed = query!(format!(
254 "SELECT DISTINCT object_id FROM objects_backward_history \
255 WHERE superseded_at_checkpoint > {checkpoint_viewed_at}"
256 ));
257 let source_a = filter!(
258 query!(
259 r#"SELECT candidates.object_id, candidates.coin_balance, candidates.coin_type
260 FROM ({}) candidates
261 LEFT JOIN ({}) changed ON candidates.object_id = changed.object_id"#,
262 checkpointed,
263 changed
264 ),
265 "changed.object_id IS NULL"
266 );
267
268 let mut history = filter(
269 query!(
270 "SELECT object_id, object_version, coin_balance, coin_type \
271 FROM objects_backward_history"
272 ),
273 address,
274 coin_type,
275 );
276 history = filter!(
280 history,
281 format!("superseded_at_checkpoint > {checkpoint_viewed_at}")
282 );
283 let oldest = query!(format!(
284 "SELECT object_id, MIN(object_version) AS min_version \
285 FROM objects_backward_history \
286 WHERE superseded_at_checkpoint > {checkpoint_viewed_at} \
287 GROUP BY object_id"
288 ));
289 let source_b = query!(
290 r#"SELECT candidates.object_id, candidates.coin_balance, candidates.coin_type
291 FROM ({}) candidates
292 JOIN ({}) oldest ON candidates.object_id = oldest.object_id
293 AND candidates.object_version = oldest.min_version"#,
294 history,
295 oldest
296 );
297
298 query!(
299 r#"SELECT
300 CAST(SUM(coin_balance) AS TEXT) as balance,
301 COUNT(*) as count,
302 coin_type
303 FROM (({}) UNION ALL ({})) candidates"#,
304 source_a,
305 source_b
306 )
307 .group_by("coin_type")
308}
309
310fn filter(mut query: RawQuery, owner: IotaAddress, coin_type: Option<TypeTag>) -> RawQuery {
313 query = filter!(query, "coin_type IS NOT NULL");
314
315 query = filter!(
316 query,
317 format!(
318 "owner_id = '\\x{}'::bytea AND owner_type = {}",
319 hex::encode(owner.into_vec()),
320 OwnerType::Address as i16
321 )
322 );
323
324 if let Some(coin_type) = coin_type {
325 query = filter!(
326 query,
327 "coin_type = {}",
328 coin_type.to_canonical_string(true)
329 );
330 };
331
332 query
333}