Skip to main content

iota_json_rpc/
transaction_execution_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10    authority::AuthorityState, authority_client::NetworkAuthorityClient,
11    transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json::IotaJsonValue;
14use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
15use iota_json_rpc_types::{
16    DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
17    ExecuteTransactionRequestType as ExecuteTransactionRequestTypeSchema, IotaExecutionStatus,
18    IotaMoveViewCallResults, IotaTransactionBlock, IotaTransactionBlockEffects,
19    IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
20    IotaTransactionBlockResponseOptions, IotaTypeTag, MoveFunctionName,
21};
22use iota_metrics::spawn_monitored_task;
23use iota_open_rpc::Module;
24use iota_package_resolver::{
25    Package, PackageStore, Resolver, error::Error as PackageResolverError,
26};
27use iota_protocol_config::Chain;
28use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
29use iota_transaction_builder::TransactionBuilder;
30use iota_types::{
31    base_types::{IotaAddress, ObjectID},
32    digests::TransactionDigest,
33    effects::{TransactionEffectsAPI, TransactionEffectsExt},
34    iota_serde::BigInt,
35    quorum_driver_types::{
36        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
37    },
38    signature::GenericSignature,
39    storage::PostExecutionPackageResolver,
40    transaction::{
41        InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
42    },
43};
44use jsonrpsee::{RpcModule, core::RpcResult};
45use tracing::{Instrument, instrument};
46
47use crate::{
48    IotaRpcModule, ObjectProviderCache,
49    authority_state::StateRead,
50    error::{Error, IotaRpcInputError},
51    get_balance_changes_from_effect, get_object_changes,
52    logger::FutureWithTracing,
53    transaction_builder_api::AuthorityStateDataReader,
54};
55
56#[derive(Clone)]
57pub struct TransactionExecutionApi {
58    state: Arc<dyn StateRead>,
59    transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
60    metrics: Arc<JsonRpcMetrics>,
61    transaction_builder: TransactionBuilder,
62}
63
64impl TransactionExecutionApi {
65    pub fn new(
66        state: Arc<AuthorityState>,
67        transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
68        metrics: Arc<JsonRpcMetrics>,
69    ) -> Self {
70        let reader = Arc::new(AuthorityStateDataReader::new(state.clone()));
71        Self {
72            state,
73            transaction_orchestrator,
74            metrics,
75            transaction_builder: TransactionBuilder::new(reader),
76        }
77    }
78
79    pub fn convert_bytes<T: serde::de::DeserializeOwned>(
80        &self,
81        tx_bytes: Base64,
82    ) -> Result<T, IotaRpcInputError> {
83        let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
84        Ok(data)
85    }
86
87    #[expect(clippy::type_complexity)]
88    fn prepare_execute_transaction_block(
89        &self,
90        tx_bytes: Base64,
91        signatures: Vec<Base64>,
92        opts: Option<IotaTransactionBlockResponseOptions>,
93    ) -> Result<
94        (
95            ExecuteTransactionRequestV1,
96            IotaTransactionBlockResponseOptions,
97            IotaAddress,
98            Vec<InputObjectKind>,
99            Transaction,
100            Option<IotaTransactionBlock>,
101            Vec<u8>,
102        ),
103        IotaRpcInputError,
104    > {
105        let opts = opts.unwrap_or_default();
106        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
107        let sender = tx_data.sender();
108        let input_objs = tx_data.input_objects().unwrap_or_default();
109
110        let mut sigs = Vec::new();
111        for sig in signatures {
112            sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
113        }
114        let txn = Transaction::from_generic_sig_data(tx_data, sigs);
115        let raw_transaction = if opts.show_raw_input {
116            bcs::to_bytes(txn.data())?
117        } else {
118            vec![]
119        };
120        let transaction = if opts.show_input {
121            let epoch_store = self.state.load_epoch_store_one_call_per_task();
122
123            Some(IotaTransactionBlock::try_from(
124                txn.data().clone(),
125                epoch_store.module_cache(),
126                *txn.digest(),
127            )?)
128        } else {
129            None
130        };
131
132        let request = ExecuteTransactionRequestV1 {
133            transaction: txn.clone(),
134            include_events: opts.show_events,
135            include_input_objects: opts.show_balance_changes || opts.show_object_changes,
136            include_output_objects: opts.show_balance_changes
137                || opts.show_object_changes
138                // In order to resolve events, we may need access to the newly published packages.
139                || opts.show_events,
140            include_auxiliary_data: false,
141        };
142
143        Ok((
144            request,
145            opts,
146            sender,
147            input_objs,
148            txn,
149            transaction,
150            raw_transaction,
151        ))
152    }
153
154    #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
155    async fn execute_transaction_block(
156        &self,
157        tx_bytes: Base64,
158        signatures: Vec<Base64>,
159        opts: Option<IotaTransactionBlockResponseOptions>,
160        request_type: Option<ExecuteTransactionRequestType>,
161    ) -> Result<IotaTransactionBlockResponse, Error> {
162        let request_type =
163            request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
164        let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
165            self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
166        let digest = *txn.digest();
167
168        let transaction_orchestrator = self.transaction_orchestrator.clone();
169        let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
170
171        tracing::trace!(
172            "Spawning transaction orchestrator task for transaction: {}",
173            digest
174        );
175        let (response, is_executed_locally) = spawn_monitored_task!(
176            transaction_orchestrator.execute_transaction_block(request, request_type, None)
177        )
178        .await?
179        .map_err(Error::from)?;
180        drop(orch_timer);
181
182        self.handle_post_orchestration(
183            response,
184            is_executed_locally,
185            opts,
186            digest,
187            input_objs,
188            transaction,
189            raw_transaction,
190            sender,
191        )
192        .await
193    }
194
195    #[instrument(level = "trace", skip_all)]
196    async fn handle_post_orchestration(
197        &self,
198        response: ExecuteTransactionResponseV1,
199        is_executed_locally: bool,
200        opts: IotaTransactionBlockResponseOptions,
201        digest: TransactionDigest,
202        input_objs: Vec<InputObjectKind>,
203        transaction: Option<IotaTransactionBlock>,
204        raw_transaction: Vec<u8>,
205        sender: IotaAddress,
206    ) -> Result<IotaTransactionBlockResponse, Error> {
207        let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
208
209        let events = if opts.show_events {
210            tracing::trace!("Resolving events");
211            let epoch_store = self.state.load_epoch_store_one_call_per_task();
212            let backing_package_store = PostExecutionPackageResolver::new(
213                self.state.get_backing_package_store().clone(),
214                &response.output_objects,
215            );
216            let mut layout_resolver = epoch_store
217                .executor()
218                .type_layout_resolver(Box::new(backing_package_store));
219            Some(IotaTransactionBlockEvents::try_from(
220                response.events.unwrap_or_default(),
221                digest,
222                None,
223                layout_resolver.as_mut(),
224            )?)
225        } else {
226            None
227        };
228
229        // Skip cache (and downstream balance/object_changes) when the validator
230        // returned no input/output objects — e.g. the already-executed early-return.
231        // Without this guard, cache misses fall through to a provider lookup that
232        // races with local state and returns "version higher than latest".
233        let object_cache = if (opts.show_balance_changes || opts.show_object_changes)
234            && (response.input_objects.is_some() || response.output_objects.is_some())
235        {
236            let mut object_cache = ObjectProviderCache::new(self.state.clone());
237            if let Some(input_objects) = response.input_objects {
238                object_cache.insert_objects_into_cache(input_objects);
239            }
240            if let Some(output_objects) = response.output_objects {
241                object_cache.insert_objects_into_cache(output_objects);
242            }
243            Some(object_cache)
244        } else {
245            None
246        };
247
248        let balance_changes = match &object_cache {
249            Some(object_cache) if opts.show_balance_changes => Some(
250                get_balance_changes_from_effect(
251                    object_cache,
252                    &response.effects.effects,
253                    input_objs,
254                    None,
255                )
256                .instrument(tracing::trace_span!("resolving balance changes"))
257                .await?,
258            ),
259            _ => None,
260        };
261
262        let object_changes = match &object_cache {
263            Some(object_cache) if opts.show_object_changes => Some(
264                get_object_changes(
265                    object_cache,
266                    sender,
267                    response.effects.effects.modified_at_versions(),
268                    response.effects.effects.all_changed_objects(),
269                    response.effects.effects.all_removed_objects(),
270                )
271                .instrument(tracing::trace_span!("resolving object changes"))
272                .await?,
273            ),
274            _ => None,
275        };
276
277        let raw_effects = if opts.show_raw_effects {
278            bcs::to_bytes(&response.effects.effects)?
279        } else {
280            vec![]
281        };
282        let resolver = Resolver::new(self.clone());
283
284        let effects = if opts.show_effects {
285            Some(
286                IotaTransactionBlockEffects::from_native_with_clever_error(
287                    response.effects.effects,
288                    &resolver,
289                )
290                .await,
291            )
292        } else {
293            None
294        };
295
296        let errors = match effects.as_ref().map(|e| e.status()) {
297            Some(IotaExecutionStatus::Failure { error }) => vec![error.clone()],
298            _ => vec![],
299        };
300
301        Ok(IotaTransactionBlockResponse {
302            digest,
303            transaction,
304            raw_transaction,
305            effects,
306            events,
307            object_changes,
308            balance_changes,
309            timestamp_ms: None,
310            confirmed_local_execution: Some(is_executed_locally),
311            checkpoint: None,
312            errors,
313            raw_effects,
314        })
315    }
316
317    pub fn prepare_dry_run_transaction_block(
318        &self,
319        tx_bytes: Base64,
320    ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
321        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
322        let input_objs = tx_data.input_objects()?;
323        let intent_msg = IntentMessage::new(
324            Intent {
325                version: IntentVersion::V0,
326                scope: IntentScope::TransactionData,
327                app_id: IntentAppId::Iota,
328            },
329            tx_data,
330        );
331        let txn_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
332        Ok((intent_msg.value, txn_digest, input_objs))
333    }
334
335    async fn dry_run_transaction_block(
336        &self,
337        tx_bytes: Base64,
338    ) -> Result<DryRunTransactionBlockResponse, Error> {
339        let (txn_data, txn_digest, input_objs) =
340            self.prepare_dry_run_transaction_block(tx_bytes)?;
341        let sender = txn_data.sender();
342
343        // Use spawn_blocking since dry_exec_transaction is a long-running synchronous
344        // operation
345        let state = self.state.clone();
346        let (resp, written_objects, transaction_effects, mock_gas) =
347            tokio::task::spawn_blocking(move || {
348                state.dry_exec_transaction(txn_data.clone(), txn_digest)
349            })
350            .await
351            .map_err(Error::from)??;
352
353        let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
354        let balance_changes = get_balance_changes_from_effect(
355            &object_cache,
356            &transaction_effects,
357            input_objs,
358            mock_gas,
359        )
360        .await?;
361        let object_changes = get_object_changes(
362            &object_cache,
363            sender,
364            transaction_effects.modified_at_versions(),
365            transaction_effects.all_changed_objects(),
366            transaction_effects.all_removed_objects(),
367        )
368        .await?;
369
370        let resolver = Resolver::new(self.clone());
371        let effects = IotaTransactionBlockEffects::from_native_with_clever_error(
372            transaction_effects,
373            &resolver,
374        )
375        .await;
376
377        Ok(DryRunTransactionBlockResponse {
378            effects,
379            events: resp.events,
380            object_changes,
381            balance_changes,
382            input: resp.input,
383            suggested_gas_price: resp.suggested_gas_price,
384            execution_error_source: resp.execution_error_source,
385        })
386    }
387}
388
389#[async_trait]
390impl WriteApiServer for TransactionExecutionApi {
391    #[instrument(skip(self))]
392    async fn execute_transaction_block(
393        &self,
394        tx_bytes: Base64,
395        signatures: Vec<Base64>,
396        opts: Option<IotaTransactionBlockResponseOptions>,
397        request_type: Option<ExecuteTransactionRequestTypeSchema>,
398    ) -> RpcResult<IotaTransactionBlockResponse> {
399        self.execute_transaction_block(tx_bytes, signatures, opts, request_type.map(Into::into))
400            .trace_timeout(Duration::from_secs(10))
401            .await
402    }
403
404    /// Calls a move view function.
405    #[instrument(skip(self))]
406    async fn view_function_call(
407        &self,
408        function_name: String,
409        type_args: Option<Vec<IotaTypeTag>>,
410        arguments: Vec<IotaJsonValue>,
411    ) -> RpcResult<IotaMoveViewCallResults> {
412        let chain = self
413            .state
414            .get_chain_identifier()
415            .map_err(Error::from)?
416            .chain();
417        if !matches!(chain, Chain::Unknown) {
418            return Err(Error::UnsupportedFeature(format!(
419                "View function calls not supported yet on {}",
420                chain.as_str()
421            ))
422            .into());
423        }
424        let MoveFunctionName {
425            package,
426            module,
427            function,
428        } = function_name.as_str().parse().map_err(Error::from)?;
429        let sender = IotaAddress::ZERO;
430        let tx_kind = self
431            .transaction_builder
432            .move_view_call_tx_kind(
433                package,
434                &module,
435                &function,
436                type_args.unwrap_or_default(),
437                arguments,
438            )
439            .await
440            .map_err(Error::from)?;
441        let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(Error::from)?);
442        let dev_inspect_results = self
443            .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
444            .await?;
445        Ok(
446            IotaMoveViewCallResults::from_dev_inspect_results(self.clone(), dev_inspect_results)
447                .await
448                .map_err(Error::from)?,
449        )
450    }
451
452    #[instrument(skip(self, sender_address), fields(sender_address = %sender_address))]
453    async fn dev_inspect_transaction_block(
454        &self,
455        sender_address: IotaAddress,
456        tx_bytes: Base64,
457        gas_price: Option<BigInt<u64>>,
458        _epoch: Option<BigInt<u64>>,
459        additional_args: Option<DevInspectArgs>,
460    ) -> RpcResult<DevInspectResults> {
461        async move {
462            let DevInspectArgs {
463                gas_sponsor,
464                gas_budget,
465                gas_objects,
466                show_raw_txn_data_and_effects,
467                skip_checks,
468            } = additional_args.unwrap_or_default();
469            let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
470            self.state
471                .dev_inspect_transaction_block(
472                    sender_address,
473                    tx_kind,
474                    gas_price.map(|i| *i),
475                    gas_budget,
476                    gas_sponsor,
477                    gas_objects,
478                    show_raw_txn_data_and_effects,
479                    skip_checks,
480                )
481                .await
482                .map_err(Error::from)
483        }
484        .trace()
485        .await
486    }
487
488    #[instrument(skip(self))]
489    async fn dry_run_transaction_block(
490        &self,
491        tx_bytes: Base64,
492    ) -> RpcResult<DryRunTransactionBlockResponse> {
493        self.dry_run_transaction_block(tx_bytes).trace().await
494    }
495}
496
497impl IotaRpcModule for TransactionExecutionApi {
498    fn rpc(self) -> RpcModule<Self> {
499        self.into_rpc()
500    }
501
502    fn rpc_doc_module() -> Module {
503        WriteApiOpenRpc::module_doc()
504    }
505}
506
507#[async_trait]
508impl PackageStore for TransactionExecutionApi {
509    async fn fetch(&self, id: IotaAddress) -> Result<Arc<Package>, PackageResolverError> {
510        let backing_store = self.state.get_backing_package_store();
511        match backing_store.get_package_object(&ObjectID::new(id.into_bytes())) {
512            Ok(Some(pkg)) => Ok(Arc::new(Package::read_from_package(pkg.move_package())?)),
513            Ok(None) => Err(PackageResolverError::PackageNotFound(id)),
514            Err(e) => Err(PackageResolverError::Store {
515                store: "Node",
516                source: Arc::new(e),
517            }),
518        }
519    }
520}