iota_indexer/models/
events.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, sync::Arc};
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{BcsEvent, IotaEvent, type_and_fields_from_move_event_data};
9use iota_package_resolver::{PackageStore, Resolver};
10use iota_types::{
11    base_types::{IotaAddress, ObjectID},
12    digests::TransactionDigest,
13    event::EventID,
14    object::bounded_visitor::BoundedVisitor,
15    parse_iota_struct_tag,
16};
17use move_core_types::identifier::Identifier;
18
19use crate::{
20    errors::IndexerError,
21    schema::{events, optimistic_events},
22    types::IndexedEvent,
23};
24
25#[derive(Queryable, QueryableByName, Selectable, Insertable, Debug, Clone)]
26#[diesel(table_name = events)]
27pub struct StoredEvent {
28    #[diesel(sql_type = diesel::sql_types::BigInt)]
29    pub tx_sequence_number: i64,
30
31    #[diesel(sql_type = diesel::sql_types::BigInt)]
32    pub event_sequence_number: i64,
33
34    #[diesel(sql_type = diesel::sql_types::Binary)]
35    pub transaction_digest: Vec<u8>,
36
37    #[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::pg::sql_types::Bytea>>)]
38    pub senders: Vec<Option<Vec<u8>>>,
39
40    #[diesel(sql_type = diesel::sql_types::Binary)]
41    pub package: Vec<u8>,
42
43    #[diesel(sql_type = diesel::sql_types::Text)]
44    pub module: String,
45
46    #[diesel(sql_type = diesel::sql_types::Text)]
47    pub event_type: String,
48
49    #[diesel(sql_type = diesel::sql_types::BigInt)]
50    pub timestamp_ms: i64,
51
52    #[diesel(sql_type = diesel::sql_types::Binary)]
53    pub bcs: Vec<u8>,
54}
55
56#[derive(Queryable, QueryableByName, Selectable, Insertable, Debug, Clone)]
57#[diesel(table_name = optimistic_events)]
58pub struct OptimisticEvent {
59    #[diesel(sql_type = diesel::sql_types::BigInt)]
60    pub tx_insertion_order: i64,
61
62    #[diesel(sql_type = diesel::sql_types::BigInt)]
63    pub event_sequence_number: i64,
64
65    #[diesel(sql_type = diesel::sql_types::Binary)]
66    pub transaction_digest: Vec<u8>,
67
68    #[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::pg::sql_types::Bytea>>)]
69    pub senders: Vec<Option<Vec<u8>>>,
70
71    #[diesel(sql_type = diesel::sql_types::Binary)]
72    pub package: Vec<u8>,
73
74    #[diesel(sql_type = diesel::sql_types::Text)]
75    pub module: String,
76
77    #[diesel(sql_type = diesel::sql_types::Text)]
78    pub event_type: String,
79
80    #[diesel(sql_type = diesel::sql_types::Binary)]
81    pub bcs: Vec<u8>,
82}
83
84pub type SendersType = Vec<Option<Vec<u8>>>;
85
86impl From<IndexedEvent> for StoredEvent {
87    fn from(event: IndexedEvent) -> Self {
88        Self {
89            tx_sequence_number: event.tx_sequence_number as i64,
90            event_sequence_number: event.event_sequence_number as i64,
91            transaction_digest: event.transaction_digest.into_inner().to_vec(),
92            senders: event
93                .senders
94                .into_iter()
95                .map(|sender| Some(sender.to_vec()))
96                .collect(),
97            package: event.package.to_vec(),
98            module: event.module.clone(),
99            event_type: event.event_type.clone(),
100            bcs: event.bcs.clone(),
101            timestamp_ms: event.timestamp_ms as i64,
102        }
103    }
104}
105
106impl From<OptimisticEvent> for StoredEvent {
107    fn from(event: OptimisticEvent) -> Self {
108        Self {
109            tx_sequence_number: event.tx_insertion_order,
110            event_sequence_number: event.event_sequence_number,
111            transaction_digest: event.transaction_digest,
112            senders: event.senders,
113            package: event.package,
114            module: event.module,
115            event_type: event.event_type,
116            bcs: event.bcs,
117            timestamp_ms: -1,
118        }
119    }
120}
121
122impl From<StoredEvent> for OptimisticEvent {
123    fn from(event: StoredEvent) -> Self {
124        Self {
125            tx_insertion_order: event.tx_sequence_number,
126            event_sequence_number: event.event_sequence_number,
127            transaction_digest: event.transaction_digest,
128            senders: event.senders,
129            package: event.package,
130            module: event.module,
131            event_type: event.event_type,
132            bcs: event.bcs,
133        }
134    }
135}
136
137impl StoredEvent {
138    pub async fn try_into_iota_event(
139        self,
140        package_resolver: Arc<Resolver<impl PackageStore>>,
141    ) -> Result<IotaEvent, IndexerError> {
142        let package_id = ObjectID::from_bytes(self.package.clone()).map_err(|_e| {
143            IndexerError::PersistentStorageDataCorruption(format!(
144                "Failed to parse event package ID: {:?}",
145                self.package
146            ))
147        })?;
148        // Note: IotaEvent only has one sender today, so we always use the first one.
149        let sender = {
150            {
151                self.senders.first().ok_or_else(|| {
152                    IndexerError::PersistentStorageDataCorruption(
153                        "Event senders should contain at least one address".to_string(),
154                    )
155                })?
156            }
157        };
158        let sender = match sender {
159            Some(ref s) => IotaAddress::from_bytes(s).map_err(|_e| {
160                IndexerError::PersistentStorageDataCorruption(format!(
161                    "Failed to parse event sender address: {:?}",
162                    sender
163                ))
164            })?,
165            None => {
166                return Err(IndexerError::PersistentStorageDataCorruption(
167                    "Event senders element should not be null".to_string(),
168                ));
169            }
170        };
171
172        let type_ = parse_iota_struct_tag(&self.event_type)?;
173        let move_type_layout = package_resolver
174            .type_layout(type_.clone().into())
175            .await
176            .map_err(|e| {
177                IndexerError::ResolveMoveStruct(format!(
178                    "Failed to convert to iota event with Error: {e}",
179                ))
180            })?;
181        let move_object = BoundedVisitor::deserialize_value(&self.bcs, &move_type_layout)
182            .map_err(|e| IndexerError::Serde(e.to_string()))?;
183        let (_, parsed_json) = type_and_fields_from_move_event_data(move_object)
184            .map_err(|e| IndexerError::Serde(e.to_string()))?;
185        let tx_digest =
186            TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
187                IndexerError::Serde(format!(
188                    "Failed to parse transaction digest: {:?}, error: {}",
189                    self.transaction_digest, e
190                ))
191            })?;
192        Ok(IotaEvent {
193            id: EventID {
194                tx_digest,
195                event_seq: self.event_sequence_number as u64,
196            },
197            package_id,
198            transaction_module: Identifier::from_str(&self.module)?,
199            sender,
200            type_,
201            bcs: BcsEvent::new(self.bcs),
202            parsed_json,
203            timestamp_ms: Some(self.timestamp_ms as u64),
204        })
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use iota_types::event::Event;
211    use move_core_types::{account_address::AccountAddress, language_storage::StructTag};
212
213    use super::*;
214
215    #[test]
216    fn test_canonical_string_of_event_type() {
217        let tx_digest = TransactionDigest::default();
218        let event = Event {
219            package_id: ObjectID::random(),
220            transaction_module: Identifier::new("test").unwrap(),
221            sender: AccountAddress::random().into(),
222            type_: StructTag {
223                address: AccountAddress::TWO,
224                module: Identifier::new("test").unwrap(),
225                name: Identifier::new("test").unwrap(),
226                type_params: vec![],
227            },
228            contents: vec![],
229        };
230
231        let indexed_event = IndexedEvent::from_event(1, 1, 1, tx_digest, &event, 100);
232
233        let stored_event = StoredEvent::from(indexed_event);
234
235        assert_eq!(
236            stored_event.event_type,
237            "0x0000000000000000000000000000000000000000000000000000000000000002::test::test"
238        );
239    }
240}