1use std::{str::FromStr, sync::Arc};
6
7use diesel::prelude::*;
8use iota_json_rpc_types::{BcsEvent, IotaEvent, type_and_fields_from_move_event_data};
9use iota_package_resolver::{PackageStore, Resolver};
10use iota_types::{
11 base_types::{IotaAddress, ObjectID},
12 digests::TransactionDigest,
13 event::EventID,
14 object::bounded_visitor::BoundedVisitor,
15 parse_iota_struct_tag,
16};
17use move_core_types::identifier::Identifier;
18
19use crate::{
20 errors::IndexerError,
21 schema::{events, optimistic_events},
22 types::IndexedEvent,
23};
24
25#[derive(Queryable, QueryableByName, Selectable, Insertable, Debug, Clone)]
26#[diesel(table_name = events)]
27pub struct StoredEvent {
28 #[diesel(sql_type = diesel::sql_types::BigInt)]
29 pub tx_sequence_number: i64,
30
31 #[diesel(sql_type = diesel::sql_types::BigInt)]
32 pub event_sequence_number: i64,
33
34 #[diesel(sql_type = diesel::sql_types::Binary)]
35 pub transaction_digest: Vec<u8>,
36
37 #[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::pg::sql_types::Bytea>>)]
38 pub senders: Vec<Option<Vec<u8>>>,
39
40 #[diesel(sql_type = diesel::sql_types::Binary)]
41 pub package: Vec<u8>,
42
43 #[diesel(sql_type = diesel::sql_types::Text)]
44 pub module: String,
45
46 #[diesel(sql_type = diesel::sql_types::Text)]
47 pub event_type: String,
48
49 #[diesel(sql_type = diesel::sql_types::BigInt)]
50 pub timestamp_ms: i64,
51
52 #[diesel(sql_type = diesel::sql_types::Binary)]
53 pub bcs: Vec<u8>,
54}
55
56#[derive(Queryable, QueryableByName, Selectable, Insertable, Debug, Clone)]
57#[diesel(table_name = optimistic_events)]
58pub struct OptimisticEvent {
59 #[diesel(sql_type = diesel::sql_types::BigInt)]
60 pub tx_insertion_order: i64,
61
62 #[diesel(sql_type = diesel::sql_types::BigInt)]
63 pub event_sequence_number: i64,
64
65 #[diesel(sql_type = diesel::sql_types::Binary)]
66 pub transaction_digest: Vec<u8>,
67
68 #[diesel(sql_type = diesel::sql_types::Array<diesel::sql_types::Nullable<diesel::pg::sql_types::Bytea>>)]
69 pub senders: Vec<Option<Vec<u8>>>,
70
71 #[diesel(sql_type = diesel::sql_types::Binary)]
72 pub package: Vec<u8>,
73
74 #[diesel(sql_type = diesel::sql_types::Text)]
75 pub module: String,
76
77 #[diesel(sql_type = diesel::sql_types::Text)]
78 pub event_type: String,
79
80 #[diesel(sql_type = diesel::sql_types::Binary)]
81 pub bcs: Vec<u8>,
82}
83
84pub type SendersType = Vec<Option<Vec<u8>>>;
85
86impl From<IndexedEvent> for StoredEvent {
87 fn from(event: IndexedEvent) -> Self {
88 Self {
89 tx_sequence_number: event.tx_sequence_number as i64,
90 event_sequence_number: event.event_sequence_number as i64,
91 transaction_digest: event.transaction_digest.into_inner().to_vec(),
92 senders: event
93 .senders
94 .into_iter()
95 .map(|sender| Some(sender.to_vec()))
96 .collect(),
97 package: event.package.to_vec(),
98 module: event.module.clone(),
99 event_type: event.event_type.clone(),
100 bcs: event.bcs.clone(),
101 timestamp_ms: event.timestamp_ms as i64,
102 }
103 }
104}
105
106impl From<OptimisticEvent> for StoredEvent {
107 fn from(event: OptimisticEvent) -> Self {
108 Self {
109 tx_sequence_number: event.tx_insertion_order,
110 event_sequence_number: event.event_sequence_number,
111 transaction_digest: event.transaction_digest,
112 senders: event.senders,
113 package: event.package,
114 module: event.module,
115 event_type: event.event_type,
116 bcs: event.bcs,
117 timestamp_ms: -1,
118 }
119 }
120}
121
122impl From<StoredEvent> for OptimisticEvent {
123 fn from(event: StoredEvent) -> Self {
124 Self {
125 tx_insertion_order: event.tx_sequence_number,
126 event_sequence_number: event.event_sequence_number,
127 transaction_digest: event.transaction_digest,
128 senders: event.senders,
129 package: event.package,
130 module: event.module,
131 event_type: event.event_type,
132 bcs: event.bcs,
133 }
134 }
135}
136
137impl StoredEvent {
138 pub async fn try_into_iota_event(
139 self,
140 package_resolver: Arc<Resolver<impl PackageStore>>,
141 ) -> Result<IotaEvent, IndexerError> {
142 let package_id = ObjectID::from_bytes(self.package.clone()).map_err(|_e| {
143 IndexerError::PersistentStorageDataCorruption(format!(
144 "Failed to parse event package ID: {:?}",
145 self.package
146 ))
147 })?;
148 let sender = {
150 {
151 self.senders.first().ok_or_else(|| {
152 IndexerError::PersistentStorageDataCorruption(
153 "Event senders should contain at least one address".to_string(),
154 )
155 })?
156 }
157 };
158 let sender = match sender {
159 Some(ref s) => IotaAddress::from_bytes(s).map_err(|_e| {
160 IndexerError::PersistentStorageDataCorruption(format!(
161 "Failed to parse event sender address: {:?}",
162 sender
163 ))
164 })?,
165 None => {
166 return Err(IndexerError::PersistentStorageDataCorruption(
167 "Event senders element should not be null".to_string(),
168 ));
169 }
170 };
171
172 let type_ = parse_iota_struct_tag(&self.event_type)?;
173 let move_type_layout = package_resolver
174 .type_layout(type_.clone().into())
175 .await
176 .map_err(|e| {
177 IndexerError::ResolveMoveStruct(format!(
178 "Failed to convert to iota event with Error: {e}",
179 ))
180 })?;
181 let move_object = BoundedVisitor::deserialize_value(&self.bcs, &move_type_layout)
182 .map_err(|e| IndexerError::Serde(e.to_string()))?;
183 let (_, parsed_json) = type_and_fields_from_move_event_data(move_object)
184 .map_err(|e| IndexerError::Serde(e.to_string()))?;
185 let tx_digest =
186 TransactionDigest::try_from(self.transaction_digest.as_slice()).map_err(|e| {
187 IndexerError::Serde(format!(
188 "Failed to parse transaction digest: {:?}, error: {}",
189 self.transaction_digest, e
190 ))
191 })?;
192 Ok(IotaEvent {
193 id: EventID {
194 tx_digest,
195 event_seq: self.event_sequence_number as u64,
196 },
197 package_id,
198 transaction_module: Identifier::from_str(&self.module)?,
199 sender,
200 type_,
201 bcs: BcsEvent::new(self.bcs),
202 parsed_json,
203 timestamp_ms: Some(self.timestamp_ms as u64),
204 })
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use iota_types::event::Event;
211 use move_core_types::{account_address::AccountAddress, language_storage::StructTag};
212
213 use super::*;
214
215 #[test]
216 fn test_canonical_string_of_event_type() {
217 let tx_digest = TransactionDigest::default();
218 let event = Event {
219 package_id: ObjectID::random(),
220 transaction_module: Identifier::new("test").unwrap(),
221 sender: AccountAddress::random().into(),
222 type_: StructTag {
223 address: AccountAddress::TWO,
224 module: Identifier::new("test").unwrap(),
225 name: Identifier::new("test").unwrap(),
226 type_params: vec![],
227 },
228 contents: vec![],
229 };
230
231 let indexed_event = IndexedEvent::from_event(1, 1, 1, tx_digest, &event, 100);
232
233 let stored_event = StoredEvent::from(indexed_event);
234
235 assert_eq!(
236 stored_event.event_type,
237 "0x0000000000000000000000000000000000000000000000000000000000000002::test::test"
238 );
239 }
240}