iota_graphql_rpc/
mutation.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use async_graphql::*;
5use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
6use fastcrypto::encoding::Base64;
7use iota_indexer::{
8    models::transactions::{OptimisticTransaction, StoredTransaction},
9    optimistic_indexing::OptimisticTransactionExecutor,
10    schema::{optimistic_transactions, transactions, tx_digests, tx_global_order},
11};
12use iota_json_rpc_types::IotaTransactionBlockResponseOptions;
13
14use crate::{
15    data::{Db, DbConnection, QueryExecutor},
16    error::Error,
17    types::{
18        execution_result::ExecutionResult, transaction_block::TransactionBlock,
19        transaction_block_effects::TransactionBlockEffects,
20    },
21};
22pub struct Mutation;
23
24/// Query checkpointed transaction by digest from the database
25async fn query_checkpointed_transaction_by_digest(
26    db: &Db,
27    digest_bytes: Vec<u8>,
28) -> Result<StoredTransaction, Error> {
29    db.execute_repeatable(move |conn| {
30        conn.first(move || {
31            transactions::table
32                .inner_join(
33                    tx_digests::table
34                        .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
35                )
36                .filter(tx_digests::tx_digest.eq(digest_bytes.clone()))
37                .select(StoredTransaction::as_select())
38        })
39    })
40    .await
41    .map_err(|e| Error::Internal(format!("Unable to query checkpointed transaction: {e}")))
42}
43
44/// Query optimistic transaction by digest from the database
45async fn query_optimistic_transaction_by_digest(
46    db: &Db,
47    digest_bytes: Vec<u8>,
48) -> Result<OptimisticTransaction, Error> {
49    db.execute_repeatable(move |conn| {
50        conn.first(move || {
51            optimistic_transactions::table
52                .inner_join(
53                    tx_global_order::table.on(optimistic_transactions::global_sequence_number
54                        .eq(tx_global_order::global_sequence_number)
55                        .and(
56                            optimistic_transactions::optimistic_sequence_number
57                                .eq(tx_global_order::optimistic_sequence_number),
58                        )),
59                )
60                .filter(tx_global_order::tx_digest.eq(digest_bytes.clone()))
61                .select(OptimisticTransaction::as_select())
62        })
63    })
64    .await
65    .map_err(|e| Error::Internal(format!("Unable to query optimistic transaction: {e}")))
66}
67
68/// Mutations are used to write to the IOTA network.
69#[Object]
70impl Mutation {
71    /// Execute a transaction, committing its effects on chain.
72    ///
73    /// - `txBytes` is a `TransactionData` struct that has been BCS-encoded and
74    ///   then Base64-encoded.
75    /// - `signatures` are a list of `flag || signature || pubkey` bytes,
76    ///   Base64-encoded.
77    ///
78    /// Waits until the transaction has reached finality on chain to return its
79    /// transaction digest, or returns the error that prevented finality if
80    /// that was not possible. A transaction is final when its effects are
81    /// guaranteed on chain (it cannot be revoked).
82    ///
83    /// Transaction effects are now available immediately after execution
84    /// through `Query.transactionBlock`. However, other queries that depend
85    /// on the chain’s indexed state (e.g., address-level balance updates)
86    /// may still lag until the transaction has been checkpointed.
87    /// To confirm that a transaction has been included in a checkpoint, query
88    /// `Query.transactionBlock` and check whether the `effects.checkpoint`
89    /// field is set (or `null` if not yet checkpointed).
90    async fn execute_transaction_block(
91        &self,
92        ctx: &Context<'_>,
93        tx_bytes: String,
94        signatures: Vec<String>,
95    ) -> Result<ExecutionResult> {
96        let optimistic_tx_executor: &Option<OptimisticTransactionExecutor> = ctx
97            .data()
98            .map_err(|_| {
99                Error::Internal("Unable to fetch OptimisticTransactionExecutor".to_string())
100            })
101            .extend()?;
102        let optimistic_tx_executor = optimistic_tx_executor
103            .as_ref()
104            .ok_or_else(|| {
105                Error::Internal("OptimisticTransactionExecutor not initialized".to_string())
106            })
107            .extend()?;
108        let tx_data = Base64::try_from(tx_bytes)
109            .map_err(|e| {
110                Error::Client(format!(
111                    "Unable to deserialize transaction bytes from Base64: {e}"
112                ))
113            })
114            .extend()?;
115
116        let mut sigs = Vec::new();
117        for sig in signatures {
118            sigs.push(
119                Base64::try_from(sig.clone())
120                    .map_err(|e| {
121                        Error::Client(format!(
122                            "Unable to deserialize signature bytes {sig} from Base64: {e}"
123                        ))
124                    })
125                    .extend()?,
126            );
127        }
128        let options = IotaTransactionBlockResponseOptions::new()
129            .with_events()
130            .with_raw_input()
131            .with_raw_effects();
132
133        let result = optimistic_tx_executor
134            .execute_and_index_transaction(tx_data, sigs, Some(options))
135            .await
136            .map_err(|e| Error::Internal(format!("Unable to execute transaction: {e}")))
137            .extend()?;
138
139        let tx_digest = result.digest;
140        let digest_bytes = tx_digest.inner().to_vec();
141
142        let db: &Db = ctx.data_unchecked();
143        let query_optimistic_tx = query_optimistic_transaction_by_digest(db, digest_bytes.clone());
144        let query_checkpointed_tx = query_checkpointed_transaction_by_digest(db, digest_bytes);
145        tokio::pin!(query_optimistic_tx, query_checkpointed_tx);
146
147        let effects: Result<TransactionBlockEffects, _> = tokio::select! {
148                checkpointed_tx = &mut query_checkpointed_tx => match checkpointed_tx {
149                    Ok(checkpointed_tx) => TransactionBlock::try_from(checkpointed_tx)?.try_into(),
150                    _ => query_optimistic_tx.await?.try_into()
151                },
152                optimistic_tx = &mut query_optimistic_tx => {
153                    match optimistic_tx {
154                        Ok(optimistic_tx) => optimistic_tx.try_into(),
155                        _ => TransactionBlock::try_from(query_checkpointed_tx.await?)?.try_into(),
156                    }
157                }
158        };
159
160        Ok(ExecutionResult {
161            errors: if result.errors.is_empty() {
162                None
163            } else {
164                Some(result.errors)
165            },
166            effects: effects
167                .map_err(|_| Error::Internal("Transaction not indexed after execution".into()))
168                .extend()?,
169        })
170    }
171}