iota_json_rpc/
transaction_execution_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10    authority::AuthorityState, authority_client::NetworkAuthorityClient,
11    transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
14use iota_json_rpc_types::{
15    DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaTransactionBlock,
16    IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
17};
18use iota_metrics::spawn_monitored_task;
19use iota_open_rpc::Module;
20use iota_types::{
21    base_types::IotaAddress,
22    crypto::default_hash,
23    digests::TransactionDigest,
24    effects::TransactionEffectsAPI,
25    iota_serde::BigInt,
26    quorum_driver_types::{
27        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
28    },
29    signature::GenericSignature,
30    storage::PostExecutionPackageResolver,
31    transaction::{
32        InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
33    },
34};
35use jsonrpsee::{RpcModule, core::RpcResult};
36use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
37use tracing::{Instrument, instrument};
38
39use crate::{
40    IotaRpcModule, ObjectProviderCache,
41    authority_state::StateRead,
42    error::{Error, IotaRpcInputError},
43    get_balance_changes_from_effect, get_object_changes,
44    logger::FutureWithTracing,
45};
46
47pub struct TransactionExecutionApi {
48    state: Arc<dyn StateRead>,
49    transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
50    metrics: Arc<JsonRpcMetrics>,
51}
52
53impl TransactionExecutionApi {
54    pub fn new(
55        state: Arc<AuthorityState>,
56        transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
57        metrics: Arc<JsonRpcMetrics>,
58    ) -> Self {
59        Self {
60            state,
61            transaction_orchestrator,
62            metrics,
63        }
64    }
65
66    pub fn convert_bytes<T: serde::de::DeserializeOwned>(
67        &self,
68        tx_bytes: Base64,
69    ) -> Result<T, IotaRpcInputError> {
70        let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
71        Ok(data)
72    }
73
74    #[expect(clippy::type_complexity)]
75    fn prepare_execute_transaction_block(
76        &self,
77        tx_bytes: Base64,
78        signatures: Vec<Base64>,
79        opts: Option<IotaTransactionBlockResponseOptions>,
80    ) -> Result<
81        (
82            ExecuteTransactionRequestV1,
83            IotaTransactionBlockResponseOptions,
84            IotaAddress,
85            Vec<InputObjectKind>,
86            Transaction,
87            Option<IotaTransactionBlock>,
88            Vec<u8>,
89        ),
90        IotaRpcInputError,
91    > {
92        let opts = opts.unwrap_or_default();
93        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
94        let sender = tx_data.sender();
95        let input_objs = tx_data.input_objects().unwrap_or_default();
96
97        let mut sigs = Vec::new();
98        for sig in signatures {
99            sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
100        }
101        let txn = Transaction::from_generic_sig_data(tx_data, sigs);
102        let raw_transaction = if opts.show_raw_input {
103            bcs::to_bytes(txn.data())?
104        } else {
105            vec![]
106        };
107        let transaction = if opts.show_input {
108            let epoch_store = self.state.load_epoch_store_one_call_per_task();
109
110            Some(IotaTransactionBlock::try_from(
111                txn.data().clone(),
112                epoch_store.module_cache(),
113                *txn.digest(),
114            )?)
115        } else {
116            None
117        };
118
119        let request = ExecuteTransactionRequestV1 {
120            transaction: txn.clone(),
121            include_events: opts.show_events,
122            include_input_objects: opts.show_balance_changes || opts.show_object_changes,
123            include_output_objects: opts.show_balance_changes
124                || opts.show_object_changes
125                // In order to resolve events, we may need access to the newly published packages.
126                || opts.show_events,
127            include_auxiliary_data: false,
128        };
129
130        Ok((
131            request,
132            opts,
133            sender,
134            input_objs,
135            txn,
136            transaction,
137            raw_transaction,
138        ))
139    }
140
141    #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
142    async fn execute_transaction_block(
143        &self,
144        tx_bytes: Base64,
145        signatures: Vec<Base64>,
146        opts: Option<IotaTransactionBlockResponseOptions>,
147        request_type: Option<ExecuteTransactionRequestType>,
148    ) -> Result<IotaTransactionBlockResponse, Error> {
149        let request_type =
150            request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
151        let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
152            self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
153        let digest = *txn.digest();
154
155        let transaction_orchestrator = self.transaction_orchestrator.clone();
156        let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
157
158        tracing::trace!(
159            "Spawning transaction orchestrator task for transaction: {}",
160            digest
161        );
162        let (response, is_executed_locally) = spawn_monitored_task!(
163            transaction_orchestrator.execute_transaction_block(request, request_type, None)
164        )
165        .await?
166        .map_err(Error::from)?;
167        drop(orch_timer);
168
169        self.handle_post_orchestration(
170            response,
171            is_executed_locally,
172            opts,
173            digest,
174            input_objs,
175            transaction,
176            raw_transaction,
177            sender,
178        )
179        .await
180    }
181
182    #[instrument(level = "trace", skip_all)]
183    async fn handle_post_orchestration(
184        &self,
185        response: ExecuteTransactionResponseV1,
186        is_executed_locally: bool,
187        opts: IotaTransactionBlockResponseOptions,
188        digest: TransactionDigest,
189        input_objs: Vec<InputObjectKind>,
190        transaction: Option<IotaTransactionBlock>,
191        raw_transaction: Vec<u8>,
192        sender: IotaAddress,
193    ) -> Result<IotaTransactionBlockResponse, Error> {
194        let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
195
196        let events = if opts.show_events {
197            tracing::trace!("Resolving events");
198            let epoch_store = self.state.load_epoch_store_one_call_per_task();
199            let backing_package_store = PostExecutionPackageResolver::new(
200                self.state.get_backing_package_store().clone(),
201                &response.output_objects,
202            );
203            let mut layout_resolver = epoch_store
204                .executor()
205                .type_layout_resolver(Box::new(backing_package_store));
206            Some(IotaTransactionBlockEvents::try_from(
207                response.events.unwrap_or_default(),
208                digest,
209                None,
210                layout_resolver.as_mut(),
211            )?)
212        } else {
213            None
214        };
215
216        let object_cache = {
217            response.output_objects.map(|output_objects| {
218                ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
219            })
220        };
221
222        let balance_changes = match &object_cache {
223            Some(object_cache) if opts.show_balance_changes => Some(
224                get_balance_changes_from_effect(
225                    object_cache,
226                    &response.effects.effects,
227                    input_objs,
228                    None,
229                )
230                .instrument(tracing::trace_span!("resolving balance changes"))
231                .await?,
232            ),
233            _ => None,
234        };
235
236        let object_changes = match &object_cache {
237            Some(object_cache) if opts.show_object_changes => Some(
238                get_object_changes(
239                    object_cache,
240                    sender,
241                    response.effects.effects.modified_at_versions(),
242                    response.effects.effects.all_changed_objects(),
243                    response.effects.effects.all_removed_objects(),
244                )
245                .instrument(tracing::trace_span!("resolving object changes"))
246                .await?,
247            ),
248            _ => None,
249        };
250
251        let raw_effects = if opts.show_raw_effects {
252            bcs::to_bytes(&response.effects.effects)?
253        } else {
254            vec![]
255        };
256
257        Ok(IotaTransactionBlockResponse {
258            digest,
259            transaction,
260            raw_transaction,
261            effects: opts
262                .show_effects
263                .then_some(response.effects.effects.try_into()?),
264            events,
265            object_changes,
266            balance_changes,
267            timestamp_ms: None,
268            confirmed_local_execution: Some(is_executed_locally),
269            checkpoint: None,
270            errors: vec![],
271            raw_effects,
272        })
273    }
274
275    pub fn prepare_dry_run_transaction_block(
276        &self,
277        tx_bytes: Base64,
278    ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
279        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
280        let input_objs = tx_data.input_objects()?;
281        let intent_msg = IntentMessage::new(
282            Intent {
283                version: IntentVersion::V0,
284                scope: IntentScope::TransactionData,
285                app_id: AppId::Iota,
286            },
287            tx_data,
288        );
289        let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
290        Ok((intent_msg.value, txn_digest, input_objs))
291    }
292
293    async fn dry_run_transaction_block(
294        &self,
295        tx_bytes: Base64,
296    ) -> Result<DryRunTransactionBlockResponse, Error> {
297        let (txn_data, txn_digest, input_objs) =
298            self.prepare_dry_run_transaction_block(tx_bytes)?;
299        let sender = txn_data.sender();
300
301        // Use spawn_blocking since dry_exec_transaction is a long-running synchronous
302        // operation
303        let state = self.state.clone();
304        let (resp, written_objects, transaction_effects, mock_gas) =
305            tokio::task::spawn_blocking(move || {
306                state.dry_exec_transaction(txn_data.clone(), txn_digest)
307            })
308            .await
309            .map_err(Error::from)??;
310
311        let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
312        let balance_changes = get_balance_changes_from_effect(
313            &object_cache,
314            &transaction_effects,
315            input_objs,
316            mock_gas,
317        )
318        .await?;
319        let object_changes = get_object_changes(
320            &object_cache,
321            sender,
322            transaction_effects.modified_at_versions(),
323            transaction_effects.all_changed_objects(),
324            transaction_effects.all_removed_objects(),
325        )
326        .await?;
327
328        Ok(DryRunTransactionBlockResponse {
329            effects: resp.effects,
330            events: resp.events,
331            object_changes,
332            balance_changes,
333            input: resp.input,
334            suggested_gas_price: resp.suggested_gas_price,
335        })
336    }
337}
338
339#[async_trait]
340impl WriteApiServer for TransactionExecutionApi {
341    #[instrument(skip(self))]
342    async fn execute_transaction_block(
343        &self,
344        tx_bytes: Base64,
345        signatures: Vec<Base64>,
346        opts: Option<IotaTransactionBlockResponseOptions>,
347        request_type: Option<ExecuteTransactionRequestType>,
348    ) -> RpcResult<IotaTransactionBlockResponse> {
349        self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
350            .trace_timeout(Duration::from_secs(10))
351            .await
352    }
353
354    #[instrument(skip(self))]
355    async fn dev_inspect_transaction_block(
356        &self,
357        sender_address: IotaAddress,
358        tx_bytes: Base64,
359        gas_price: Option<BigInt<u64>>,
360        _epoch: Option<BigInt<u64>>,
361        additional_args: Option<DevInspectArgs>,
362    ) -> RpcResult<DevInspectResults> {
363        async move {
364            let DevInspectArgs {
365                gas_sponsor,
366                gas_budget,
367                gas_objects,
368                show_raw_txn_data_and_effects,
369                skip_checks,
370            } = additional_args.unwrap_or_default();
371            let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
372            self.state
373                .dev_inspect_transaction_block(
374                    sender_address,
375                    tx_kind,
376                    gas_price.map(|i| *i),
377                    gas_budget.map(|i| *i),
378                    gas_sponsor,
379                    gas_objects,
380                    show_raw_txn_data_and_effects,
381                    skip_checks,
382                )
383                .await
384                .map_err(Error::from)
385        }
386        .trace()
387        .await
388    }
389
390    #[instrument(skip(self))]
391    async fn dry_run_transaction_block(
392        &self,
393        tx_bytes: Base64,
394    ) -> RpcResult<DryRunTransactionBlockResponse> {
395        self.dry_run_transaction_block(tx_bytes).trace().await
396    }
397}
398
399impl IotaRpcModule for TransactionExecutionApi {
400    fn rpc(self) -> RpcModule<Self> {
401        self.into_rpc()
402    }
403
404    fn rpc_doc_module() -> Module {
405        WriteApiOpenRpc::module_doc()
406    }
407}