iota_graphql_rpc/types/event/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::str::FromStr;
6
7use async_graphql::{
8    connection::{Connection, CursorType, Edge},
9    *,
10};
11use cursor::EvLookup;
12use diesel::{ExpressionMethods, QueryDsl};
13use iota_indexer::{
14    models::{events::StoredEvent, transactions::StoredTransaction},
15    schema::{checkpoints, events},
16};
17use iota_types::{
18    Identifier,
19    base_types::{IotaAddress as NativeIotaAddress, ObjectID},
20    event::Event as NativeEvent,
21    parse_iota_struct_tag,
22};
23use lookups::{add_bounds, select_emit_module, select_event_type, select_sender};
24
25use crate::{
26    data::{self, Db, DbConnection, QueryExecutor},
27    error::Error,
28    query,
29    types::{
30        address::Address,
31        base64::Base64,
32        cursor::{Page, Target},
33        date_time::DateTime,
34        move_module::MoveModule,
35        move_value::MoveValue,
36    },
37};
38
39mod cursor;
40mod filter;
41mod lookups;
42pub(crate) use cursor::Cursor;
43pub(crate) use filter::EventFilter;
44
45/// An IOTA node emits one of the following events:
46/// Move event
47/// Publish event
48/// Transfer object event
49/// Delete object event
50/// New object event
51/// Epoch change event
52#[derive(Clone, Debug)]
53pub(crate) struct Event {
54    pub stored: Option<StoredEvent>,
55    pub native: NativeEvent,
56    /// The checkpoint sequence number this was viewed at.
57    pub checkpoint_viewed_at: u64,
58}
59
60type Query<ST, GB> = data::Query<ST, events::table, GB>;
61
62#[Object]
63impl Event {
64    /// The Move module containing some function that when called by
65    /// a programmable transaction block (PTB) emitted this event.
66    /// For example, if a PTB invokes A::m1::foo, which internally
67    /// calls A::m2::emit_event to emit an event,
68    /// the sending module would be A::m1.
69    async fn sending_module(&self, ctx: &Context<'_>) -> Result<Option<MoveModule>> {
70        MoveModule::query(
71            ctx,
72            self.native.package_id.into(),
73            &self.native.transaction_module.to_string(),
74            self.checkpoint_viewed_at,
75        )
76        .await
77        .extend()
78    }
79
80    /// Address of the sender of the event
81    async fn sender(&self) -> Result<Option<Address>> {
82        if self.native.sender == NativeIotaAddress::ZERO {
83            return Ok(None);
84        }
85
86        Ok(Some(Address {
87            address: self.native.sender.into(),
88            checkpoint_viewed_at: self.checkpoint_viewed_at,
89        }))
90    }
91
92    /// UTC timestamp in milliseconds since epoch (1/1/1970)
93    async fn timestamp(&self) -> Result<Option<DateTime>, Error> {
94        if let Some(stored) = &self.stored {
95            Ok(Some(DateTime::from_ms(stored.timestamp_ms)?))
96        } else {
97            Ok(None)
98        }
99    }
100
101    #[graphql(flatten)]
102    async fn move_value(&self) -> Result<MoveValue> {
103        Ok(MoveValue::new(
104            self.native.type_.clone().into(),
105            Base64::from(self.native.contents.clone()),
106        ))
107    }
108}
109
110impl Event {
111    /// Query the database for a `page` of events. The Page uses the
112    /// transaction, event, and checkpoint sequence numbers as the cursor to
113    /// determine the correct page of results. The query can optionally be
114    /// further `filter`-ed by the `EventFilter`.
115    ///
116    /// The `checkpoint_viewed_at` parameter represents the checkpoint sequence
117    /// number at which this page was queried. Each entity returned in the
118    /// connection inherits this checkpoint, so that when viewing that
119    /// entity's state, it's as if it's being viewed at this checkpoint.
120    ///
121    /// The cursors in `page` might also include checkpoint viewed at fields. If
122    /// these are set, they take precedence over the checkpoint that
123    /// pagination is being conducted in.
124    pub(crate) async fn paginate(
125        db: &Db,
126        page: Page<Cursor>,
127        filter: EventFilter,
128        checkpoint_viewed_at: u64,
129    ) -> Result<Connection<String, Event>, Error> {
130        let cursor_viewed_at = page.validate_cursor_consistency()?;
131        let checkpoint_viewed_at = cursor_viewed_at.unwrap_or(checkpoint_viewed_at);
132
133        // Construct tx and ev sequence number query with table-relevant filters, if
134        // they exist. The resulting query will look something like `SELECT
135        // tx_sequence_number, event_sequence_number FROM lookup_table WHERE
136        // ...`. If no filter is provided we don't need to use any lookup tables
137        // and can just query `events` table, as can be seen in the code below.
138        let query_constraint = match (filter.sender, &filter.emitting_module, &filter.event_type) {
139            (None, None, None) => None,
140            (Some(sender), None, None) => Some(select_sender(sender)),
141            (sender, None, Some(event_type)) => Some(select_event_type(event_type, sender)),
142            (sender, Some(module), None) => Some(select_emit_module(module, sender)),
143            (_, Some(_), Some(_)) => {
144                return Err(Error::Client(
145                    "Filtering by both emitting module and event type is not supported".to_string(),
146                ));
147            }
148        };
149
150        use checkpoints::dsl;
151        let (prev, next, results) = db
152            .execute(move |conn| {
153                let tx_hi: i64 = conn.first(move || {
154                    dsl::checkpoints.select(dsl::network_total_transactions)
155                        .filter(dsl::sequence_number.eq(checkpoint_viewed_at as i64))
156                })?;
157
158                let (prev, next, mut events): (bool, bool, Vec<StoredEvent>) =
159                    if let Some(filter_query) =  query_constraint {
160                        let query = add_bounds(filter_query, &filter.transaction_digest, &page, tx_hi);
161
162                        let (prev, next, results) =
163                            page.paginate_raw_query::<EvLookup>(conn, checkpoint_viewed_at, query)?;
164
165                        let ev_lookups = results
166                            .into_iter()
167                            .map(|x| (x.tx, x.ev))
168                            .collect::<Vec<(i64, i64)>>();
169
170                        if ev_lookups.is_empty() {
171                            return Ok::<_, diesel::result::Error>((prev, next, vec![]));
172                        }
173
174                        // Unlike a multi-get on a single column which can be serviced by a query `IN
175                        // (...)`, because events have a composite primary key, the query planner tends
176                        // to perform a sequential scan when given a list of tuples to lookup. A query
177                        // using `UNION ALL` allows us to leverage the index on the composite key.
178                        let events = conn.results(move || {
179                            // Diesel's DSL does not current support chained `UNION ALL`, so we have to turn
180                            // to `RawQuery` here.
181                            let query_string = ev_lookups.iter()
182                                .map(|&(tx, ev)| {
183                                    format!("SELECT * FROM events WHERE tx_sequence_number = {} AND event_sequence_number = {}", tx, ev)
184                                })
185                                .collect::<Vec<String>>()
186                                .join(" UNION ALL ");
187
188                            query!(query_string).into_boxed()
189                        })?;
190                        (prev, next, events)
191                    } else {
192                        // No filter is provided so we add bounds to the basic `SELECT * FROM
193                        // events` query and call it a day.
194                        let query = add_bounds(query!("SELECT * FROM events"), &filter.transaction_digest, &page, tx_hi);
195                        let (prev, next, events_iter) = page.paginate_raw_query::<StoredEvent>(conn, checkpoint_viewed_at, query)?;
196                        let events = events_iter.collect::<Vec<StoredEvent>>();
197                        (prev, next, events)
198                    };
199
200                // UNION ALL does not guarantee order, so we need to sort the results. Whether
201                // `first` or `last, the result set is always sorted in ascending order.
202                events.sort_by(|a, b| {
203                        a.tx_sequence_number.cmp(&b.tx_sequence_number)
204                            .then_with(|| a.event_sequence_number.cmp(&b.event_sequence_number))
205                });
206
207
208                Ok::<_, diesel::result::Error>((prev, next, events))
209            })
210            .await?;
211
212        let mut conn = Connection::new(prev, next);
213
214        // The "checkpoint viewed at" sets a consistent upper bound for the nested
215        // queries.
216        for stored in results {
217            let cursor = stored.cursor(checkpoint_viewed_at).encode_cursor();
218            conn.edges.push(Edge::new(
219                cursor,
220                Event::try_from_stored_event(stored, checkpoint_viewed_at)?,
221            ));
222        }
223
224        Ok(conn)
225    }
226
227    pub(crate) fn try_from_stored_transaction(
228        stored_tx: &StoredTransaction,
229        idx: usize,
230        checkpoint_viewed_at: u64,
231    ) -> Result<Self, Error> {
232        let Some(serialized_event) = &stored_tx.get_event_at_idx(idx) else {
233            return Err(Error::Internal(format!(
234                "Could not find event with event_sequence_number {} at transaction {}",
235                idx, stored_tx.tx_sequence_number
236            )));
237        };
238
239        let native_event: NativeEvent = bcs::from_bytes(serialized_event).map_err(|_| {
240            Error::Internal(format!(
241                "Failed to deserialize event with {} at transaction {}",
242                idx, stored_tx.tx_sequence_number
243            ))
244        })?;
245
246        let stored_event = StoredEvent {
247            tx_sequence_number: stored_tx.tx_sequence_number,
248            event_sequence_number: idx as i64,
249            transaction_digest: stored_tx.transaction_digest.clone(),
250            senders: vec![Some(native_event.sender.to_vec())],
251            package: native_event.package_id.to_vec(),
252            module: native_event.transaction_module.to_string(),
253            event_type: native_event
254                .type_
255                .to_canonical_string(/* with_prefix */ true),
256            bcs: native_event.contents.clone(),
257            timestamp_ms: stored_tx.timestamp_ms,
258        };
259
260        Ok(Self {
261            stored: Some(stored_event),
262            native: native_event,
263            checkpoint_viewed_at,
264        })
265    }
266
267    fn try_from_stored_event(
268        stored: StoredEvent,
269        checkpoint_viewed_at: u64,
270    ) -> Result<Self, Error> {
271        let Some(Some(sender_bytes)) = stored.senders.first() else {
272            return Err(Error::Internal("No senders found for event".to_string()));
273        };
274        let sender = NativeIotaAddress::from_bytes(sender_bytes)
275            .map_err(|e| Error::Internal(e.to_string()))?;
276        let package_id =
277            ObjectID::from_bytes(&stored.package).map_err(|e| Error::Internal(e.to_string()))?;
278        let type_ = parse_iota_struct_tag(&stored.event_type)
279            .map_err(|e| Error::Internal(e.to_string()))?;
280        let transaction_module =
281            Identifier::from_str(&stored.module).map_err(|e| Error::Internal(e.to_string()))?;
282        let contents = stored.bcs.clone();
283        Ok(Event {
284            stored: Some(stored),
285            native: NativeEvent {
286                sender,
287                package_id,
288                transaction_module,
289                type_,
290                contents,
291            },
292            checkpoint_viewed_at,
293        })
294    }
295}