1use std::str::FromStr;
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 *,
10};
11use diesel::{
12 OptionalExtension, QueryableByName,
13 sql_types::{BigInt as SqlBigInt, Nullable, Text},
14};
15use iota_indexer::types::OwnerType;
16use iota_types::TypeTag;
17use serde::{Deserialize, Serialize};
18
19use crate::{
20 consistency::Checkpointed,
21 data::{Db, DbConnection, QueryExecutor},
22 error::Error,
23 filter, query,
24 raw_query::RawQuery,
25 types::{
26 available_range::AvailableRange,
27 big_int::BigInt,
28 cursor::{self, Page, RawPaginated, ScanLimited, Target},
29 iota_address::IotaAddress,
30 move_type::MoveType,
31 uint53::UInt53,
32 },
33};
34
35#[derive(Clone, Debug, SimpleObject)]
37pub(crate) struct Balance {
38 pub(crate) coin_type: MoveType,
40 pub(crate) coin_object_count: Option<UInt53>,
42 pub(crate) total_balance: Option<BigInt>,
44}
45
46#[derive(QueryableByName)]
50pub struct StoredBalance {
51 #[diesel(sql_type = Nullable<Text>)]
52 pub balance: Option<String>,
53 #[diesel(sql_type = Nullable<SqlBigInt>)]
54 pub count: Option<i64>,
55 #[diesel(sql_type = Text)]
56 pub coin_type: String,
57}
58
59pub(crate) type Cursor = cursor::JsonCursor<BalanceCursor>;
60
61#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)]
65pub(crate) struct BalanceCursor {
66 #[serde(rename = "t")]
67 coin_type: String,
68 #[serde(rename = "c")]
70 checkpoint_viewed_at: u64,
71}
72
73impl Balance {
74 pub(crate) async fn query(
79 db: &Db,
80 address: IotaAddress,
81 coin_type: TypeTag,
82 checkpoint_viewed_at: u64,
83 ) -> Result<Option<Balance>, Error> {
84 let stored: Option<StoredBalance> = db
85 .execute_repeatable(move |conn| {
86 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
87 return Ok::<_, diesel::result::Error>(None);
88 };
89
90 conn.result(move || {
91 balance_query(address, Some(coin_type.clone()), range).into_boxed()
92 })
93 .optional()
94 })
95 .await?;
96
97 stored.map(Balance::try_from).transpose()
98 }
99
100 pub(crate) async fn paginate(
104 db: &Db,
105 page: Page<Cursor>,
106 address: IotaAddress,
107 checkpoint_viewed_at: u64,
108 ) -> Result<Connection<String, Balance>, Error> {
109 let cursor_viewed_at = page.validate_cursor_consistency()?;
114 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
115
116 let Some((prev, next, results)) = db
117 .execute_repeatable(move |conn| {
118 let Some(range) = AvailableRange::result(conn, checkpoint_viewed_at)? else {
119 return Ok::<_, diesel::result::Error>(None);
120 };
121
122 let result = page.paginate_raw_query::<StoredBalance>(
123 conn,
124 checkpoint_viewed_at,
125 balance_query(address, None, range),
126 )?;
127
128 Ok(Some(result))
129 })
130 .await?
131 else {
132 return Err(Error::Client(
133 "Requested data is outside the available range".to_string(),
134 ));
135 };
136
137 let mut conn = Connection::new(prev, next);
138 for stored in results {
139 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
140 let balance = Balance::try_from(stored)?;
141 conn.edges.push(Edge::new(cursor, balance));
142 }
143
144 Ok(conn)
145 }
146}
147
148impl RawPaginated<Cursor> for StoredBalance {
149 fn filter_ge(cursor: &Cursor, query: RawQuery) -> RawQuery {
150 filter!(query, "coin_type >= {}", cursor.coin_type.clone())
151 }
152
153 fn filter_le(cursor: &Cursor, query: RawQuery) -> RawQuery {
154 filter!(query, "coin_type <= {}", cursor.coin_type.clone())
155 }
156
157 fn order(asc: bool, query: RawQuery) -> RawQuery {
158 if asc {
159 return query.order_by("coin_type ASC");
160 }
161 query.order_by("coin_type DESC")
162 }
163}
164
165impl Target<Cursor> for StoredBalance {
166 fn cursor(&self, checkpoint_viewed_at: u64) -> Cursor {
167 Cursor::new(BalanceCursor {
168 coin_type: self.coin_type.clone(),
169 checkpoint_viewed_at,
170 })
171 }
172}
173
174impl Checkpointed for Cursor {
175 fn checkpoint_viewed_at(&self) -> u64 {
176 self.checkpoint_viewed_at
177 }
178}
179
180impl ScanLimited for Cursor {}
181
182impl TryFrom<StoredBalance> for Balance {
183 type Error = Error;
184
185 fn try_from(s: StoredBalance) -> Result<Self, Error> {
186 let StoredBalance {
187 balance,
188 count,
189 coin_type,
190 } = s;
191 let total_balance = balance
192 .map(|b| BigInt::from_str(&b))
193 .transpose()
194 .map_err(|_| Error::Internal("Failed to read balance.".to_string()))?;
195
196 let coin_object_count = count.map(|c| UInt53::from(c as u64));
197
198 let coin_type = TypeTag::from_str(&coin_type)
199 .map_err(|e| Error::Internal(format!("Failed to parse coin type: {e}")))?
200 .into();
201
202 Ok(Balance {
203 coin_type,
204 coin_object_count,
205 total_balance,
206 })
207 }
208}
209
210fn balance_query(
215 address: IotaAddress,
216 coin_type: Option<TypeTag>,
217 range: AvailableRange,
218) -> RawQuery {
219 let mut snapshot_objs = query!("SELECT * FROM objects_snapshot");
222 snapshot_objs = filter(snapshot_objs, address, coin_type.clone());
223
224 let mut history_objs = query!("SELECT * FROM objects_history");
227 history_objs = filter(history_objs, address, coin_type.clone());
228 history_objs = filter!(
229 history_objs,
230 format!(
231 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
232 range.first, range.last
233 )
234 );
235
236 let candidates = query!(
238 r#"SELECT DISTINCT ON (object_id) * FROM (({}) UNION ALL ({})) o"#,
239 snapshot_objs,
240 history_objs
241 )
242 .order_by("object_id")
243 .order_by("object_version DESC");
244
245 let mut newer = query!("SELECT object_id, object_version FROM objects_history");
249 newer = filter!(
250 newer,
251 format!(
252 r#"checkpoint_sequence_number BETWEEN {} AND {}"#,
253 range.first, range.last
254 )
255 );
256 let final_ = query!(
257 r#"SELECT
258 CAST(SUM(coin_balance) AS TEXT) as balance,
259 COUNT(*) as count,
260 coin_type
261 FROM ({}) candidates
262 LEFT JOIN ({}) newer
263 ON (
264 candidates.object_id = newer.object_id
265 AND candidates.object_version < newer.object_version
266 )"#,
267 candidates,
268 newer
269 );
270
271 filter!(final_, "newer.object_version IS NULL").group_by("coin_type")
273}
274
275fn filter(mut query: RawQuery, owner: IotaAddress, coin_type: Option<TypeTag>) -> RawQuery {
278 query = filter!(query, "coin_type IS NOT NULL");
279
280 query = filter!(
281 query,
282 format!(
283 "owner_id = '\\x{}'::bytea AND owner_type = {}",
284 hex::encode(owner.into_vec()),
285 OwnerType::Address as i16
286 )
287 );
288
289 if let Some(coin_type) = coin_type {
290 query = filter!(
291 query,
292 "coin_type = {}",
293 coin_type.to_canonical_display(true)
294 );
295 };
296
297 query
298}