iota_graphql_rpc/types/event/
mod.rs1use std::str::FromStr;
6
7use async_graphql::{
8 connection::{Connection, CursorType, Edge},
9 *,
10};
11use cursor::EvLookup;
12use diesel::{ExpressionMethods, QueryDsl};
13use iota_indexer::{
14 models::{events::StoredEvent, transactions::StoredTransaction},
15 schema::{checkpoints, events},
16};
17use iota_types::{
18 Identifier,
19 base_types::{IotaAddress as NativeIotaAddress, ObjectID},
20 event::Event as NativeEvent,
21 parse_iota_struct_tag,
22};
23use lookups::{add_bounds, select_emit_module, select_event_type, select_sender};
24
25use crate::{
26 data::{self, Db, DbConnection, QueryExecutor},
27 error::Error,
28 query,
29 types::{
30 address::Address,
31 base64::Base64,
32 cursor::{Page, Target},
33 date_time::DateTime,
34 move_module::MoveModule,
35 move_value::MoveValue,
36 },
37};
38
39mod cursor;
40mod filter;
41mod lookups;
42pub(crate) use cursor::Cursor;
43pub(crate) use filter::EventFilter;
44
45#[derive(Clone, Debug)]
53pub(crate) struct Event {
54 pub stored: Option<StoredEvent>,
55 pub native: NativeEvent,
56 pub checkpoint_viewed_at: u64,
58}
59
60type Query<ST, GB> = data::Query<ST, events::table, GB>;
61
62#[Object]
63impl Event {
64 async fn sending_module(&self, ctx: &Context<'_>) -> Result<Option<MoveModule>> {
70 MoveModule::query(
71 ctx,
72 self.native.package_id.into(),
73 &self.native.transaction_module.to_string(),
74 self.checkpoint_viewed_at,
75 )
76 .await
77 .extend()
78 }
79
80 async fn sender(&self) -> Result<Option<Address>> {
82 if self.native.sender == NativeIotaAddress::ZERO {
83 return Ok(None);
84 }
85
86 Ok(Some(Address {
87 address: self.native.sender.into(),
88 checkpoint_viewed_at: self.checkpoint_viewed_at,
89 }))
90 }
91
92 async fn timestamp(&self) -> Result<Option<DateTime>, Error> {
94 if let Some(stored) = &self.stored {
95 Ok(Some(DateTime::from_ms(stored.timestamp_ms)?))
96 } else {
97 Ok(None)
98 }
99 }
100
101 #[graphql(flatten)]
102 async fn move_value(&self) -> Result<MoveValue> {
103 Ok(MoveValue::new(
104 self.native.type_.clone().into(),
105 Base64::from(self.native.contents.clone()),
106 ))
107 }
108}
109
110impl Event {
111 pub(crate) async fn paginate(
125 db: &Db,
126 page: Page<Cursor>,
127 filter: EventFilter,
128 checkpoint_viewed_at: u64,
129 ) -> Result<Connection<String, Event>, Error> {
130 let cursor_viewed_at = page.validate_cursor_consistency()?;
131 let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
132
133 let query_constraint = match (filter.sender, &filter.emitting_module, &filter.event_type) {
139 (None, None, None) => None,
140 (Some(sender), None, None) => Some(select_sender(sender)),
141 (sender, None, Some(event_type)) => Some(select_event_type(event_type, sender)),
142 (sender, Some(module), None) => Some(select_emit_module(module, sender)),
143 (_, Some(_), Some(_)) => {
144 return Err(Error::Client(
145 "Filtering by both emitting module and event type is not supported".to_string(),
146 ));
147 }
148 };
149
150 use checkpoints::dsl;
151 let (prev, next, results) = db
152 .execute(move |conn| {
153 let tx_hi: i64 = conn.first(move || {
154 dsl::checkpoints.select(dsl::network_total_transactions)
155 .filter(dsl::sequence_number.eq(checkpoint_viewed_at as i64))
156 })?;
157
158 let (prev, next, mut events): (bool, bool, Vec<StoredEvent>) =
159 if let Some(filter_query) = query_constraint {
160 let query = add_bounds(filter_query, &filter.transaction_digest, &page, tx_hi);
161
162 let (prev, next, results) =
163 page.paginate_raw_query::<EvLookup>(conn, checkpoint_viewed_at, query)?;
164
165 let ev_lookups = results
166 .into_iter()
167 .map(|x| (x.tx, x.ev))
168 .collect::<Vec<(i64, i64)>>();
169
170 if ev_lookups.is_empty() {
171 return Ok::<_, diesel::result::Error>((prev, next, vec![]));
172 }
173
174 let events = conn.results(move || {
179 let query_string = ev_lookups.iter()
182 .map(|&(tx, ev)| {
183 format!("SELECT * FROM events WHERE tx_sequence_number = {} AND event_sequence_number = {}", tx, ev)
184 })
185 .collect::<Vec<String>>()
186 .join(" UNION ALL ");
187
188 query!(query_string).into_boxed()
189 })?;
190 (prev, next, events)
191 } else {
192 let query = add_bounds(query!("SELECT * FROM events"), &filter.transaction_digest, &page, tx_hi);
195 let (prev, next, events_iter) = page.paginate_raw_query::<StoredEvent>(conn, checkpoint_viewed_at, query)?;
196 let events = events_iter.collect::<Vec<StoredEvent>>();
197 (prev, next, events)
198 };
199
200 events.sort_by(|a, b| {
203 a.tx_sequence_number.cmp(&b.tx_sequence_number)
204 .then_with(|| a.event_sequence_number.cmp(&b.event_sequence_number))
205 });
206
207
208 Ok::<_, diesel::result::Error>((prev, next, events))
209 })
210 .await?;
211
212 let mut conn = Connection::new(prev, next);
213
214 for stored in results {
217 let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
218 conn.edges.push(Edge::new(
219 cursor,
220 Event::try_from_stored_event(stored, checkpoint_viewed_at)?,
221 ));
222 }
223
224 Ok(conn)
225 }
226
227 pub(crate) fn try_from_stored_transaction(
228 stored_tx: &StoredTransaction,
229 idx: usize,
230 checkpoint_viewed_at: u64,
231 ) -> Result<Self, Error> {
232 let Some(serialized_event) = &stored_tx.get_event_at_idx(idx) else {
233 return Err(Error::Internal(format!(
234 "Could not find event with event_sequence_number {} at transaction {}",
235 idx, stored_tx.tx_sequence_number
236 )));
237 };
238
239 let native_event: NativeEvent = bcs::from_bytes(serialized_event).map_err(|_| {
240 Error::Internal(format!(
241 "Failed to deserialize event with {} at transaction {}",
242 idx, stored_tx.tx_sequence_number
243 ))
244 })?;
245
246 let stored_event = StoredEvent {
247 tx_sequence_number: stored_tx.tx_sequence_number,
248 event_sequence_number: idx as i64,
249 transaction_digest: stored_tx.transaction_digest.clone(),
250 senders: vec![Some(native_event.sender.to_vec())],
251 package: native_event.package_id.to_vec(),
252 module: native_event.transaction_module.to_string(),
253 event_type: native_event
254 .type_
255 .to_canonical_string(true),
256 bcs: native_event.contents.clone(),
257 timestamp_ms: stored_tx.timestamp_ms,
258 };
259
260 Ok(Self {
261 stored: Some(stored_event),
262 native: native_event,
263 checkpoint_viewed_at,
264 })
265 }
266
267 fn try_from_stored_event(
268 stored: StoredEvent,
269 checkpoint_viewed_at: u64,
270 ) -> Result<Self, Error> {
271 let Some(Some(sender_bytes)) = stored.senders.first() else {
272 return Err(Error::Internal("No senders found for event".to_string()));
273 };
274 let sender = NativeIotaAddress::from_bytes(sender_bytes)
275 .map_err(|e| Error::Internal(e.to_string()))?;
276 let package_id =
277 ObjectID::from_bytes(&stored.package).map_err(|e| Error::Internal(e.to_string()))?;
278 let type_ = parse_iota_struct_tag(&stored.event_type)
279 .map_err(|e| Error::Internal(e.to_string()))?;
280 let transaction_module =
281 Identifier::from_str(&stored.module).map_err(|e| Error::Internal(e.to_string()))?;
282 let contents = stored.bcs.clone();
283 Ok(Event {
284 stored: Some(stored),
285 native: NativeEvent {
286 sender,
287 package_id,
288 transaction_module,
289 type_,
290 contents,
291 },
292 checkpoint_viewed_at,
293 })
294 }
295}