iota_graphql_rpc/
mutation.rs1use async_graphql::*;
5use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
6use fastcrypto::encoding::Base64;
7use iota_indexer::{
8 models::transactions::{OptimisticTransaction, StoredTransaction},
9 optimistic_indexing::OptimisticTransactionExecutor,
10 schema::{optimistic_transactions, transactions, tx_digests, tx_global_order},
11};
12use iota_json_rpc_types::IotaTransactionBlockResponseOptions;
13
14use crate::{
15 data::{Db, DbConnection, QueryExecutor},
16 error::Error,
17 types::{
18 execution_result::ExecutionResult, transaction_block::TransactionBlock,
19 transaction_block_effects::TransactionBlockEffects,
20 },
21};
22pub struct Mutation;
23
24async fn query_checkpointed_transaction_by_digest(
26 db: &Db,
27 digest_bytes: Vec<u8>,
28) -> Result<StoredTransaction, Error> {
29 db.execute_repeatable(move |conn| {
30 conn.first(move || {
31 transactions::table
32 .inner_join(
33 tx_digests::table
34 .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
35 )
36 .filter(tx_digests::tx_digest.eq(digest_bytes.clone()))
37 .select(StoredTransaction::as_select())
38 })
39 })
40 .await
41 .map_err(|e| Error::Internal(format!("Unable to query checkpointed transaction: {e}")))
42}
43
44async fn query_optimistic_transaction_by_digest(
46 db: &Db,
47 digest_bytes: Vec<u8>,
48) -> Result<OptimisticTransaction, Error> {
49 db.execute_repeatable(move |conn| {
50 conn.first(move || {
51 optimistic_transactions::table
52 .inner_join(
53 tx_global_order::table.on(optimistic_transactions::global_sequence_number
54 .eq(tx_global_order::global_sequence_number)
55 .and(
56 optimistic_transactions::optimistic_sequence_number
57 .eq(tx_global_order::optimistic_sequence_number),
58 )),
59 )
60 .filter(tx_global_order::tx_digest.eq(digest_bytes.clone()))
61 .select(OptimisticTransaction::as_select())
62 })
63 })
64 .await
65 .map_err(|e| Error::Internal(format!("Unable to query optimistic transaction: {e}")))
66}
67
68#[Object]
70impl Mutation {
71 async fn execute_transaction_block(
91 &self,
92 ctx: &Context<'_>,
93 tx_bytes: String,
94 signatures: Vec<String>,
95 ) -> Result<ExecutionResult> {
96 let optimistic_tx_executor: &Option<OptimisticTransactionExecutor> = ctx
97 .data()
98 .map_err(|_| {
99 Error::Internal("Unable to fetch OptimisticTransactionExecutor".to_string())
100 })
101 .extend()?;
102 let optimistic_tx_executor = optimistic_tx_executor
103 .as_ref()
104 .ok_or_else(|| {
105 Error::Internal("OptimisticTransactionExecutor not initialized".to_string())
106 })
107 .extend()?;
108 let tx_data = Base64::try_from(tx_bytes)
109 .map_err(|e| {
110 Error::Client(format!(
111 "Unable to deserialize transaction bytes from Base64: {e}"
112 ))
113 })
114 .extend()?;
115
116 let mut sigs = Vec::new();
117 for sig in signatures {
118 sigs.push(
119 Base64::try_from(sig.clone())
120 .map_err(|e| {
121 Error::Client(format!(
122 "Unable to deserialize signature bytes {sig} from Base64: {e}"
123 ))
124 })
125 .extend()?,
126 );
127 }
128 let options = IotaTransactionBlockResponseOptions::new()
129 .with_events()
130 .with_raw_input()
131 .with_raw_effects();
132
133 let result = optimistic_tx_executor
134 .execute_and_index_transaction(tx_data, sigs, Some(options))
135 .await
136 .map_err(|e| Error::Internal(format!("Unable to execute transaction: {e}")))
137 .extend()?;
138
139 let tx_digest = result.digest;
140 let digest_bytes = tx_digest.inner().to_vec();
141
142 let db: &Db = ctx.data_unchecked();
143 let query_optimistic_tx = query_optimistic_transaction_by_digest(db, digest_bytes.clone());
144 let query_checkpointed_tx = query_checkpointed_transaction_by_digest(db, digest_bytes);
145 tokio::pin!(query_optimistic_tx, query_checkpointed_tx);
146
147 let effects: Result<TransactionBlockEffects, _> = tokio::select! {
148 checkpointed_tx = &mut query_checkpointed_tx => match checkpointed_tx {
149 Ok(checkpointed_tx) => TransactionBlock::try_from(checkpointed_tx)?.try_into(),
150 _ => query_optimistic_tx.await?.try_into()
151 },
152 optimistic_tx = &mut query_optimistic_tx => {
153 match optimistic_tx {
154 Ok(optimistic_tx) => optimistic_tx.try_into(),
155 _ => TransactionBlock::try_from(query_checkpointed_tx.await?)?.try_into(),
156 }
157 }
158 };
159
160 Ok(ExecutionResult {
161 errors: if result.errors.is_empty() {
162 None
163 } else {
164 Some(result.errors)
165 },
166 effects: effects
167 .map_err(|_| Error::Internal("Transaction not indexed after execution".into()))
168 .extend()?,
169 })
170 }
171}