iota_graphql_rpc/
mutation.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use async_graphql::*;
5use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper};
6use fastcrypto::encoding::Base64;
7use iota_indexer::{
8    models::transactions::{OptimisticTransaction, StoredTransaction},
9    schema::{optimistic_transactions, transactions, tx_digests, tx_global_order},
10};
11use iota_json_rpc_api::WriteApiServer;
12use iota_json_rpc_types::IotaTransactionBlockResponseOptions;
13
14use crate::{
15    data::{Db, DbConnection, QueryExecutor},
16    error::Error,
17    server::builder::get_write_api,
18    types::{
19        execution_result::ExecutionResult, transaction_block::TransactionBlock,
20        transaction_block_effects::TransactionBlockEffects,
21    },
22};
23pub struct Mutation;
24
25/// Query checkpointed transaction by digest from the database
26async fn query_checkpointed_transaction_by_digest(
27    db: &Db,
28    digest_bytes: Vec<u8>,
29) -> Result<StoredTransaction, Error> {
30    db.execute_repeatable(move |conn| {
31        conn.first(move || {
32            transactions::table
33                .inner_join(
34                    tx_digests::table
35                        .on(transactions::tx_sequence_number.eq(tx_digests::tx_sequence_number)),
36                )
37                .filter(tx_digests::tx_digest.eq(digest_bytes.clone()))
38                .select(StoredTransaction::as_select())
39        })
40    })
41    .await
42    .map_err(|e| Error::Internal(format!("Unable to query checkpointed transaction: {e}")))
43}
44
45/// Query optimistic transaction by digest from the database
46async fn query_optimistic_transaction_by_digest(
47    db: &Db,
48    digest_bytes: Vec<u8>,
49) -> Result<OptimisticTransaction, Error> {
50    db.execute_repeatable(move |conn| {
51        conn.first(move || {
52            optimistic_transactions::table
53                .inner_join(
54                    tx_global_order::table.on(optimistic_transactions::global_sequence_number
55                        .eq(tx_global_order::global_sequence_number)
56                        .and(
57                            optimistic_transactions::optimistic_sequence_number
58                                .eq(tx_global_order::optimistic_sequence_number),
59                        )),
60                )
61                .filter(tx_global_order::tx_digest.eq(digest_bytes.clone()))
62                .select(OptimisticTransaction::as_select())
63        })
64    })
65    .await
66    .map_err(|e| Error::Internal(format!("Unable to query optimistic transaction: {e}")))
67}
68
69/// Mutations are used to write to the IOTA network.
70#[Object]
71impl Mutation {
72    /// Execute a transaction, committing its effects on chain.
73    ///
74    /// - `txBytes` is a `TransactionData` struct that has been BCS-encoded and
75    ///   then Base64-encoded.
76    /// - `signatures` are a list of `flag || signature || pubkey` bytes,
77    ///   Base64-encoded.
78    ///
79    /// Waits until the transaction has reached finality on chain to return its
80    /// transaction digest, or returns the error that prevented finality if
81    /// that was not possible. A transaction is final when its effects are
82    /// guaranteed on chain (it cannot be revoked).
83    ///
84    /// Transaction effects are now available immediately after execution
85    /// through `Query.transactionBlock`. However, other queries that depend
86    /// on the chain’s indexed state (e.g., address-level balance updates)
87    /// may still lag until the transaction has been checkpointed.
88    /// To confirm that a transaction has been included in a checkpoint, query
89    /// `Query.transactionBlock` and check whether the `effects.checkpoint`
90    /// field is set (or `null` if not yet checkpointed).
91    async fn execute_transaction_block(
92        &self,
93        ctx: &Context<'_>,
94        tx_bytes: String,
95        signatures: Vec<String>,
96    ) -> Result<ExecutionResult> {
97        let write_api = get_write_api(ctx).extend()?;
98        let tx_data = Base64::try_from(tx_bytes)
99            .map_err(|e| {
100                Error::Client(format!(
101                    "Unable to deserialize transaction bytes from Base64: {e}"
102                ))
103            })
104            .extend()?;
105
106        let mut sigs = Vec::new();
107        for sig in signatures {
108            sigs.push(
109                Base64::try_from(sig.clone())
110                    .map_err(|e| {
111                        Error::Client(format!(
112                            "Unable to deserialize signature bytes {sig} from Base64: {e}"
113                        ))
114                    })
115                    .extend()?,
116            );
117        }
118        let options = IotaTransactionBlockResponseOptions::new()
119            .with_events()
120            .with_raw_input()
121            .with_raw_effects();
122
123        let result = write_api
124            .execute_transaction_block(tx_data, sigs, Some(options), None)
125            .await
126            .map_err(|e| Error::Internal(format!("Unable to execute transaction: {e}")))
127            .extend()?;
128
129        let tx_digest = result.digest;
130        let digest_bytes = tx_digest.inner().to_vec();
131
132        let db: &Db = ctx.data_unchecked();
133        let query_optimistic_tx = query_optimistic_transaction_by_digest(db, digest_bytes.clone());
134        let query_checkpointed_tx = query_checkpointed_transaction_by_digest(db, digest_bytes);
135        tokio::pin!(query_optimistic_tx, query_checkpointed_tx);
136
137        let effects: Result<TransactionBlockEffects, _> = tokio::select! {
138                checkpointed_tx = &mut query_checkpointed_tx => match checkpointed_tx {
139                    Ok(checkpointed_tx) => TransactionBlock::try_from(checkpointed_tx)?.try_into(),
140                    _ => query_optimistic_tx.await?.try_into()
141                },
142                optimistic_tx = &mut query_optimistic_tx => {
143                    match optimistic_tx {
144                        Ok(optimistic_tx) => optimistic_tx.try_into(),
145                        _ => TransactionBlock::try_from(query_checkpointed_tx.await?)?.try_into(),
146                    }
147                }
148        };
149
150        Ok(ExecutionResult {
151            errors: if result.errors.is_empty() {
152                None
153            } else {
154                Some(result.errors)
155            },
156            effects: effects
157                .map_err(|_| Error::Internal("Transaction not indexed after execution".into()))
158                .extend()?,
159        })
160    }
161}