iota_json_rpc/
transaction_execution_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10    authority::AuthorityState, authority_client::NetworkAuthorityClient,
11    transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json::IotaJsonValue;
14use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
15use iota_json_rpc_types::{
16    DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaMoveViewCallResults,
17    IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
18    IotaTransactionBlockResponseOptions, IotaTypeTag, MoveFunctionName,
19};
20use iota_metrics::spawn_monitored_task;
21use iota_open_rpc::Module;
22use iota_package_resolver::{Package, PackageStore, error::Error as PackageResolverError};
23use iota_protocol_config::Chain;
24use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
25use iota_transaction_builder::TransactionBuilder;
26use iota_types::{
27    base_types::IotaAddress,
28    crypto::default_hash,
29    digests::TransactionDigest,
30    effects::TransactionEffectsAPI,
31    iota_serde::BigInt,
32    quorum_driver_types::{
33        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
34    },
35    signature::GenericSignature,
36    storage::PostExecutionPackageResolver,
37    transaction::{
38        InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
39    },
40};
41use jsonrpsee::{RpcModule, core::RpcResult};
42use move_core_types::account_address::AccountAddress;
43use tracing::{Instrument, instrument};
44
45use crate::{
46    IotaRpcModule, ObjectProviderCache,
47    authority_state::StateRead,
48    error::{Error, IotaRpcInputError},
49    get_balance_changes_from_effect, get_object_changes,
50    logger::FutureWithTracing,
51    transaction_builder_api::AuthorityStateDataReader,
52};
53
54#[derive(Clone)]
55pub struct TransactionExecutionApi {
56    state: Arc<dyn StateRead>,
57    transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
58    metrics: Arc<JsonRpcMetrics>,
59    transaction_builder: TransactionBuilder,
60}
61
62impl TransactionExecutionApi {
63    pub fn new(
64        state: Arc<AuthorityState>,
65        transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
66        metrics: Arc<JsonRpcMetrics>,
67    ) -> Self {
68        let reader = Arc::new(AuthorityStateDataReader::new(state.clone()));
69        Self {
70            state,
71            transaction_orchestrator,
72            metrics,
73            transaction_builder: TransactionBuilder::new(reader),
74        }
75    }
76
77    pub fn convert_bytes<T: serde::de::DeserializeOwned>(
78        &self,
79        tx_bytes: Base64,
80    ) -> Result<T, IotaRpcInputError> {
81        let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
82        Ok(data)
83    }
84
85    #[expect(clippy::type_complexity)]
86    fn prepare_execute_transaction_block(
87        &self,
88        tx_bytes: Base64,
89        signatures: Vec<Base64>,
90        opts: Option<IotaTransactionBlockResponseOptions>,
91    ) -> Result<
92        (
93            ExecuteTransactionRequestV1,
94            IotaTransactionBlockResponseOptions,
95            IotaAddress,
96            Vec<InputObjectKind>,
97            Transaction,
98            Option<IotaTransactionBlock>,
99            Vec<u8>,
100        ),
101        IotaRpcInputError,
102    > {
103        let opts = opts.unwrap_or_default();
104        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
105        let sender = tx_data.sender();
106        let input_objs = tx_data.input_objects().unwrap_or_default();
107
108        let mut sigs = Vec::new();
109        for sig in signatures {
110            sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
111        }
112        let txn = Transaction::from_generic_sig_data(tx_data, sigs);
113        let raw_transaction = if opts.show_raw_input {
114            bcs::to_bytes(txn.data())?
115        } else {
116            vec![]
117        };
118        let transaction = if opts.show_input {
119            let epoch_store = self.state.load_epoch_store_one_call_per_task();
120
121            Some(IotaTransactionBlock::try_from(
122                txn.data().clone(),
123                epoch_store.module_cache(),
124                *txn.digest(),
125            )?)
126        } else {
127            None
128        };
129
130        let request = ExecuteTransactionRequestV1 {
131            transaction: txn.clone(),
132            include_events: opts.show_events,
133            include_input_objects: opts.show_balance_changes || opts.show_object_changes,
134            include_output_objects: opts.show_balance_changes
135                || opts.show_object_changes
136                // In order to resolve events, we may need access to the newly published packages.
137                || opts.show_events,
138            include_auxiliary_data: false,
139        };
140
141        Ok((
142            request,
143            opts,
144            sender,
145            input_objs,
146            txn,
147            transaction,
148            raw_transaction,
149        ))
150    }
151
152    #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
153    async fn execute_transaction_block(
154        &self,
155        tx_bytes: Base64,
156        signatures: Vec<Base64>,
157        opts: Option<IotaTransactionBlockResponseOptions>,
158        request_type: Option<ExecuteTransactionRequestType>,
159    ) -> Result<IotaTransactionBlockResponse, Error> {
160        let request_type =
161            request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
162        let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
163            self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
164        let digest = *txn.digest();
165
166        let transaction_orchestrator = self.transaction_orchestrator.clone();
167        let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
168
169        tracing::trace!(
170            "Spawning transaction orchestrator task for transaction: {}",
171            digest
172        );
173        let (response, is_executed_locally) = spawn_monitored_task!(
174            transaction_orchestrator.execute_transaction_block(request, request_type, None)
175        )
176        .await?
177        .map_err(Error::from)?;
178        drop(orch_timer);
179
180        self.handle_post_orchestration(
181            response,
182            is_executed_locally,
183            opts,
184            digest,
185            input_objs,
186            transaction,
187            raw_transaction,
188            sender,
189        )
190        .await
191    }
192
193    #[instrument(level = "trace", skip_all)]
194    async fn handle_post_orchestration(
195        &self,
196        response: ExecuteTransactionResponseV1,
197        is_executed_locally: bool,
198        opts: IotaTransactionBlockResponseOptions,
199        digest: TransactionDigest,
200        input_objs: Vec<InputObjectKind>,
201        transaction: Option<IotaTransactionBlock>,
202        raw_transaction: Vec<u8>,
203        sender: IotaAddress,
204    ) -> Result<IotaTransactionBlockResponse, Error> {
205        let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
206
207        let events = if opts.show_events {
208            tracing::trace!("Resolving events");
209            let epoch_store = self.state.load_epoch_store_one_call_per_task();
210            let backing_package_store = PostExecutionPackageResolver::new(
211                self.state.get_backing_package_store().clone(),
212                &response.output_objects,
213            );
214            let mut layout_resolver = epoch_store
215                .executor()
216                .type_layout_resolver(Box::new(backing_package_store));
217            Some(IotaTransactionBlockEvents::try_from(
218                response.events.unwrap_or_default(),
219                digest,
220                None,
221                layout_resolver.as_mut(),
222            )?)
223        } else {
224            None
225        };
226
227        let object_cache = {
228            response.output_objects.map(|output_objects| {
229                ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
230            })
231        };
232
233        let balance_changes = match &object_cache {
234            Some(object_cache) if opts.show_balance_changes => Some(
235                get_balance_changes_from_effect(
236                    object_cache,
237                    &response.effects.effects,
238                    input_objs,
239                    None,
240                )
241                .instrument(tracing::trace_span!("resolving balance changes"))
242                .await?,
243            ),
244            _ => None,
245        };
246
247        let object_changes = match &object_cache {
248            Some(object_cache) if opts.show_object_changes => Some(
249                get_object_changes(
250                    object_cache,
251                    sender,
252                    response.effects.effects.modified_at_versions(),
253                    response.effects.effects.all_changed_objects(),
254                    response.effects.effects.all_removed_objects(),
255                )
256                .instrument(tracing::trace_span!("resolving object changes"))
257                .await?,
258            ),
259            _ => None,
260        };
261
262        let raw_effects = if opts.show_raw_effects {
263            bcs::to_bytes(&response.effects.effects)?
264        } else {
265            vec![]
266        };
267
268        Ok(IotaTransactionBlockResponse {
269            digest,
270            transaction,
271            raw_transaction,
272            effects: opts
273                .show_effects
274                .then_some(response.effects.effects.try_into()?),
275            events,
276            object_changes,
277            balance_changes,
278            timestamp_ms: None,
279            confirmed_local_execution: Some(is_executed_locally),
280            checkpoint: None,
281            errors: vec![],
282            raw_effects,
283        })
284    }
285
286    pub fn prepare_dry_run_transaction_block(
287        &self,
288        tx_bytes: Base64,
289    ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
290        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
291        let input_objs = tx_data.input_objects()?;
292        let intent_msg = IntentMessage::new(
293            Intent {
294                version: IntentVersion::V0,
295                scope: IntentScope::TransactionData,
296                app_id: IntentAppId::Iota,
297            },
298            tx_data,
299        );
300        let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
301        Ok((intent_msg.value, txn_digest, input_objs))
302    }
303
304    async fn dry_run_transaction_block(
305        &self,
306        tx_bytes: Base64,
307    ) -> Result<DryRunTransactionBlockResponse, Error> {
308        let (txn_data, txn_digest, input_objs) =
309            self.prepare_dry_run_transaction_block(tx_bytes)?;
310        let sender = txn_data.sender();
311
312        // Use spawn_blocking since dry_exec_transaction is a long-running synchronous
313        // operation
314        let state = self.state.clone();
315        let (resp, written_objects, transaction_effects, mock_gas) =
316            tokio::task::spawn_blocking(move || {
317                state.dry_exec_transaction(txn_data.clone(), txn_digest)
318            })
319            .await
320            .map_err(Error::from)??;
321
322        let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
323        let balance_changes = get_balance_changes_from_effect(
324            &object_cache,
325            &transaction_effects,
326            input_objs,
327            mock_gas,
328        )
329        .await?;
330        let object_changes = get_object_changes(
331            &object_cache,
332            sender,
333            transaction_effects.modified_at_versions(),
334            transaction_effects.all_changed_objects(),
335            transaction_effects.all_removed_objects(),
336        )
337        .await?;
338
339        Ok(DryRunTransactionBlockResponse {
340            effects: resp.effects,
341            events: resp.events,
342            object_changes,
343            balance_changes,
344            input: resp.input,
345            suggested_gas_price: resp.suggested_gas_price,
346            execution_error_source: resp.execution_error_source,
347        })
348    }
349}
350
351#[async_trait]
352impl WriteApiServer for TransactionExecutionApi {
353    #[instrument(skip(self))]
354    async fn execute_transaction_block(
355        &self,
356        tx_bytes: Base64,
357        signatures: Vec<Base64>,
358        opts: Option<IotaTransactionBlockResponseOptions>,
359        request_type: Option<ExecuteTransactionRequestType>,
360    ) -> RpcResult<IotaTransactionBlockResponse> {
361        self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
362            .trace_timeout(Duration::from_secs(10))
363            .await
364    }
365
366    /// Calls a move view function.
367    #[instrument(skip(self))]
368    async fn view_function_call(
369        &self,
370        function_name: String,
371        type_args: Option<Vec<IotaTypeTag>>,
372        arguments: Vec<IotaJsonValue>,
373    ) -> RpcResult<IotaMoveViewCallResults> {
374        let chain = self
375            .state
376            .get_chain_identifier()
377            .map_err(Error::from)?
378            .chain();
379        if !matches!(chain, Chain::Unknown) {
380            return Err(Error::UnsupportedFeature(format!(
381                "View function calls not supported yet on {}",
382                chain.as_str()
383            ))
384            .into());
385        }
386        let MoveFunctionName {
387            package,
388            module,
389            function,
390        } = function_name.as_str().parse().map_err(Error::from)?;
391        let sender = IotaAddress::ZERO;
392        let tx_kind = self
393            .transaction_builder
394            .move_view_call_tx_kind(
395                package,
396                &module,
397                &function,
398                type_args.unwrap_or_default(),
399                arguments,
400            )
401            .await
402            .map_err(Error::from)?;
403        let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(Error::from)?);
404        let dev_inspect_results = self
405            .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
406            .await?;
407        Ok(
408            IotaMoveViewCallResults::from_dev_inspect_results(self.clone(), dev_inspect_results)
409                .await
410                .map_err(Error::from)?,
411        )
412    }
413
414    #[instrument(skip(self))]
415    async fn dev_inspect_transaction_block(
416        &self,
417        sender_address: IotaAddress,
418        tx_bytes: Base64,
419        gas_price: Option<BigInt<u64>>,
420        _epoch: Option<BigInt<u64>>,
421        additional_args: Option<DevInspectArgs>,
422    ) -> RpcResult<DevInspectResults> {
423        async move {
424            let DevInspectArgs {
425                gas_sponsor,
426                gas_budget,
427                gas_objects,
428                show_raw_txn_data_and_effects,
429                skip_checks,
430            } = additional_args.unwrap_or_default();
431            let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
432            self.state
433                .dev_inspect_transaction_block(
434                    sender_address,
435                    tx_kind,
436                    gas_price.map(|i| *i),
437                    gas_budget.map(|i| *i),
438                    gas_sponsor,
439                    gas_objects,
440                    show_raw_txn_data_and_effects,
441                    skip_checks,
442                )
443                .await
444                .map_err(Error::from)
445        }
446        .trace()
447        .await
448    }
449
450    #[instrument(skip(self))]
451    async fn dry_run_transaction_block(
452        &self,
453        tx_bytes: Base64,
454    ) -> RpcResult<DryRunTransactionBlockResponse> {
455        self.dry_run_transaction_block(tx_bytes).trace().await
456    }
457}
458
459impl IotaRpcModule for TransactionExecutionApi {
460    fn rpc(self) -> RpcModule<Self> {
461        self.into_rpc()
462    }
463
464    fn rpc_doc_module() -> Module {
465        WriteApiOpenRpc::module_doc()
466    }
467}
468
469#[async_trait]
470impl PackageStore for TransactionExecutionApi {
471    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>, PackageResolverError> {
472        let backing_store = self.state.get_backing_package_store();
473        match backing_store.get_package_object(&(id.into())) {
474            Ok(Some(pkg)) => Ok(Arc::new(Package::read_from_package(pkg.move_package())?)),
475            Ok(None) => Err(PackageResolverError::PackageNotFound(id)),
476            Err(e) => Err(PackageResolverError::Store {
477                store: "Node",
478                source: Arc::new(e),
479            }),
480        }
481    }
482}