1use std::str::FromStr;
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 *,
10};
11use diesel::{
12 OptionalExtension, QueryableByName,
13 sql_types::{BigInt as SqlBigInt, Nullable, Text},
14};
15use iota_indexer::types::OwnerType;
16use iota_types::{TypeTag, parse_iota_type_tag};
17use serde::{Deserialize, Serialize};
18
19use crate::{
20 consistency::Checkpointed,
21 data::{Db, DbConnection, QueryExecutor},
22 error::Error,
23 filter, query,
24 raw_query::RawQuery,
25 types::{
26 available_range::AvailableRange,
27 big_int::BigInt,
28 cursor::{self, Page, RawPaginated, ScanLimited, Target},
29 iota_address::IotaAddress,
30 move_type::MoveType,
31 uint53::UInt53,
32 },
33};
34
35#[derive(Clone, Debug, SimpleObject)]
37pub(crate) struct Balance {
38 pub(crate) coin_type: MoveType,
40 pub(crate) coin_object_count: Option<UInt53>,
42 pub(crate) total_balance: Option<BigInt>,
44}
45
46#[derive(QueryableByName)]
50pub struct StoredBalance {
51 #[diesel(sql_type = Nullable<Text>)]
52 pub balance: Option<String>,
53 #[diesel(sql_type = Nullable<SqlBigInt>)]
54 pub count: Option<i64>,
55 #[diesel(sql_type = Text)]
56 pub coin_type: String,
57}
58
59pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
60
61#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
65pub(crate) struct BalanceCursor {
66 #[serde(rename = "t")]
67 coin_type: String,
68 #[serde(rename = "c")]
70 checkpoint_viewed_at: u64,
71}
72
73impl Balance {
74 pub(crate) async fn query(
79 db: &Db,
80 address: IotaAddress,
81 coin_type: TypeTag,
82 checkpoint_viewed_at: u64,
83 ) -> Result<Option<Balance>, Error> {
84 let stored: Option<StoredBalance> = db
85 .execute_repeatable(move |conn| {
86 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
87 return Ok::<_, diesel::result::Error>(None);
88 };
89
90 conn.result(move || {
91 balance_query(address, Some(coin_type.clone()), range).into_boxed()
92 })
93 .optional()
94 })
95 .await?;
96
97 stored.map(Balance::try_from).transpose()
98 }
99
100 pub(crate) async fn paginate(
104 db: &Db,
105 page: Page<Cursor>,
106 address: IotaAddress,
107 checkpoint_viewed_at: u64,
108 ) -> Result<Connection<String, Balance>, Error> {
109 let cursor_viewed_at = page.validate_cursor_consistency()?;
114 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
115
116 let Some((prev, next, results)) = db
117 .execute_repeatable(move |conn| {
118 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
119 return Ok::<_, diesel::result::Error>(None);
120 };
121
122 let result = page.paginate_raw_query::<StoredBalance>(
123 conn,
124 checkpoint_viewed_at,
125 balance_query(address, None, range),
126 )?;
127
128 Ok(Some(result))
129 })
130 .await?
131 else {
132 return Err(Error::Client(
133 "Requested data is outside the available range".to_string(),
134 ));
135 };
136
137 let mut conn = Connection::new(prev, next);
138 for stored in results {
139 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
140 let balance = Balance::try_from(stored)?;
141 conn.edges.push(Edge::new(cursor, balance));
142 }
143
144 Ok(conn)
145 }
146}
147
148impl RawPaginated<Cursor> for StoredBalance {
149 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
150 filter!(query, "coin_type >= {}", cursor.coin_type.clone())
151 }
152
153 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
154 filter!(query, "coin_type <= {}", cursor.coin_type.clone())
155 }
156
157 fn order(asc: bool, query: RawQuery) -> RawQuery {
158 if asc {
159 return query.order_by("coin_type ASC");
160 }
161 query.order_by("coin_type DESC")
162 }
163}
164
165impl Target<Cursor> for StoredBalance {
166 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
167 Cursor::new(BalanceCursor {
168 coin_type: self.coin_type.clone(),
169 checkpoint_viewed_at,
170 })
171 }
172}
173
174impl Checkpointed for Cursor {
175 fn checkpoint_viewed_at(&self) -> u64 {
176 self.checkpoint_viewed_at
177 }
178}
179
180impl ScanLimited for Cursor {}
181
182impl TryFrom<StoredBalance> for Balance {
183 type Error = Error;
184
185 fn try_from(s: StoredBalance) -> Result<Self, Error> {
186 let StoredBalance {
187 balance,
188 count,
189 coin_type,
190 } = s;
191 let total_balance = balance
192 .map(|b| BigInt::from_str(&b))
193 .transpose()
194 .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
195
196 let coin_object_count = count.map(|c| UInt53::from(c as u64));
197
198 let coin_type = MoveType::new(
199 parse_iota_type_tag(&coin_type)
200 .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?,
201 );
202
203 Ok(Balance {
204 coin_type,
205 coin_object_count,
206 total_balance,
207 })
208 }
209}
210
211fn balance_query(
216 address: IotaAddress,
217 coin_type: Option<TypeTag>,
218 range: AvailableRange,
219) -> RawQuery {
220 let mut snapshot_objs = query!("SELECT * FROM objects_snapshot");
223 snapshot_objs = filter(snapshot_objs, address, coin_type.clone());
224
225 let mut history_objs = query!("SELECT * FROM objects_history");
228 history_objs = filter(history_objs, address, coin_type.clone());
229 history_objs = filter!(
230 history_objs,
231 format!(
232 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
233 range.first, range.last
234 )
235 );
236
237 let candidates = query!(
239 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) o"#,
240 snapshot_objs,
241 history_objs
242 )
243 .order_by("object_id")
244 .order_by("object_version DESC");
245
246 let mut newer = query!("SELECT object_id, object_version FROM objects_history");
250 newer = filter!(
251 newer,
252 format!(
253 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
254 range.first, range.last
255 )
256 );
257 let final_ = query!(
258 r#"SELECT
259 CAST(SUM(coin_balance) AS TEXT) as balance,
260 COUNT(*) as count,
261 coin_type
262 FROM ({}) candidates
263 LEFT JOIN ({}) newer
264 ON (
265 candidates.object_id = newer.object_id
266 AND candidates.object_version < newer.object_version
267 )"#,
268 candidates,
269 newer
270 );
271
272 filter!(final_, "newer.object_version IS NULL").group_by("coin_type")
274}
275
276fn filter(mut query: RawQuery, owner: IotaAddress, coin_type: Option<TypeTag>) -> RawQuery {
279 query = filter!(query, "coin_type IS NOT NULL");
280
281 query = filter!(
282 query,
283 format!(
284 "owner_id = '\\x{}'::bytea AND owner_type = {}",
285 hex::encode(owner.into_vec()),
286 OwnerType::Address as i16
287 )
288 );
289
290 if let Some(coin_type) = coin_type {
291 query = filter!(
292 query,
293 "coin_type = {}",
294 coin_type.to_canonical_display(true)
295 );
296 };
297
298 query
299}