iota_graphql_rpc/
mutation.rs1use async_graphql::*;
5use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
6use fastcrypto::encoding::Base64;
7use iota_indexer::{
8 models::transactions::{OptimisticTransaction, StoredTransaction},
9 schema::{optimistic_transactions, transactions, tx_digests, tx_global_order},
10};
11use iota_json_rpc_api::WriteApiServer;
12use iota_json_rpc_types::IotaTransactionBlockResponseOptions;
13
14use crate::{
15 data::{Db, DbConnection, QueryExecutor},
16 error::Error,
17 server::builder::get_write_api,
18 types::{
19 execution_result::ExecutionResult, transaction_block::TransactionBlock,
20 transaction_block_effects::TransactionBlockEffects,
21 },
22};
23pub struct Mutation;
24
25async fn query_checkpointed_transaction_by_digest(
27 db: &Db,
28 digest_bytes: Vec<u8>,
29) -> Result<StoredTransaction, Error> {
30 db.execute_repeatable(move |conn| {
31 conn.first(move || {
32 transactions::table
33 .inner_join(
34 tx_digests::table
35 .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
36 )
37 .filter(tx_digests::tx_digest.eq(digest_bytes.clone()))
38 .select(StoredTransaction::as_select())
39 })
40 })
41 .await
42 .map_err(|e| Error::Internal(format!("Unable to query checkpointed transaction: {e}")))
43}
44
45async fn query_optimistic_transaction_by_digest(
47 db: &Db,
48 digest_bytes: Vec<u8>,
49) -> Result<OptimisticTransaction, Error> {
50 db.execute_repeatable(move |conn| {
51 conn.first(move || {
52 optimistic_transactions::table
53 .inner_join(
54 tx_global_order::table.on(optimistic_transactions::global_sequence_number
55 .eq(tx_global_order::global_sequence_number)
56 .and(
57 optimistic_transactions::optimistic_sequence_number
58 .eq(tx_global_order::optimistic_sequence_number),
59 )),
60 )
61 .filter(tx_global_order::tx_digest.eq(digest_bytes.clone()))
62 .select(OptimisticTransaction::as_select())
63 })
64 })
65 .await
66 .map_err(|e| Error::Internal(format!("Unable to query optimistic transaction: {e}")))
67}
68
69#[Object]
71impl Mutation {
72 async fn execute_transaction_block(
92 &self,
93 ctx: &Context<'_>,
94 tx_bytes: String,
95 signatures: Vec<String>,
96 ) -> Result<ExecutionResult> {
97 let write_api = get_write_api(ctx).extend()?;
98 let tx_data = Base64::try_from(tx_bytes)
99 .map_err(|e| {
100 Error::Client(format!(
101 "Unable to deserialize transaction bytes from Base64: {e}"
102 ))
103 })
104 .extend()?;
105
106 let mut sigs = Vec::new();
107 for sig in signatures {
108 sigs.push(
109 Base64::try_from(sig.clone())
110 .map_err(|e| {
111 Error::Client(format!(
112 "Unable to deserialize signature bytes {sig} from Base64: {e}"
113 ))
114 })
115 .extend()?,
116 );
117 }
118 let options = IotaTransactionBlockResponseOptions::new()
119 .with_events()
120 .with_raw_input()
121 .with_raw_effects();
122
123 let result = write_api
124 .execute_transaction_block(tx_data, sigs, Some(options), None)
125 .await
126 .map_err(|e| Error::Internal(format!("Unable to execute transaction: {e}")))
127 .extend()?;
128
129 let tx_digest = result.digest;
130 let digest_bytes = tx_digest.inner().to_vec();
131
132 let db: &Db = ctx.data_unchecked();
133 let query_optimistic_tx = query_optimistic_transaction_by_digest(db, digest_bytes.clone());
134 let query_checkpointed_tx = query_checkpointed_transaction_by_digest(db, digest_bytes);
135 tokio::pin!(query_optimistic_tx, query_checkpointed_tx);
136
137 let effects: Result<TransactionBlockEffects, _> = tokio::select! {
138 checkpointed_tx = &mut query_checkpointed_tx => match checkpointed_tx {
139 Ok(checkpointed_tx) => TransactionBlock::try_from(checkpointed_tx)?.try_into(),
140 _ => query_optimistic_tx.await?.try_into()
141 },
142 optimistic_tx = &mut query_optimistic_tx => {
143 match optimistic_tx {
144 Ok(optimistic_tx) => optimistic_tx.try_into(),
145 _ => TransactionBlock::try_from(query_checkpointed_tx.await?)?.try_into(),
146 }
147 }
148 };
149
150 Ok(ExecutionResult {
151 errors: if result.errors.is_empty() {
152 None
153 } else {
154 Some(result.errors)
155 },
156 effects: effects
157 .map_err(|_| Error::Internal("Transaction not indexed after execution".into()))
158 .extend()?,
159 })
160 }
161}