iota_json_rpc/
transaction_execution_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10    authority::AuthorityState, authority_client::NetworkAuthorityClient,
11    transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json::IotaJsonValue;
14use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
15use iota_json_rpc_types::{
16    DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaMoveViewCallResults,
17    IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
18    IotaTransactionBlockResponseOptions, IotaTypeTag, MoveFunctionName,
19};
20use iota_metrics::spawn_monitored_task;
21use iota_open_rpc::Module;
22use iota_package_resolver::{Package, PackageStore, error::Error as PackageResolverError};
23use iota_protocol_config::Chain;
24use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
25use iota_transaction_builder::TransactionBuilder;
26use iota_types::{
27    base_types::IotaAddress,
28    crypto::default_hash,
29    digests::TransactionDigest,
30    effects::TransactionEffectsAPI,
31    iota_serde::BigInt,
32    quorum_driver_types::{
33        ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
34    },
35    signature::GenericSignature,
36    storage::PostExecutionPackageResolver,
37    transaction::{
38        InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
39    },
40};
41use jsonrpsee::{RpcModule, core::RpcResult};
42use move_core_types::account_address::AccountAddress;
43use tracing::{Instrument, instrument};
44
45use crate::{
46    IotaRpcModule, ObjectProviderCache,
47    authority_state::StateRead,
48    error::{Error, IotaRpcInputError},
49    get_balance_changes_from_effect, get_object_changes,
50    logger::FutureWithTracing,
51    transaction_builder_api::AuthorityStateDataReader,
52};
53
54#[derive(Clone)]
55pub struct TransactionExecutionApi {
56    state: Arc<dyn StateRead>,
57    transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
58    metrics: Arc<JsonRpcMetrics>,
59    transaction_builder: TransactionBuilder,
60}
61
62impl TransactionExecutionApi {
63    pub fn new(
64        state: Arc<AuthorityState>,
65        transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
66        metrics: Arc<JsonRpcMetrics>,
67    ) -> Self {
68        let reader = Arc::new(AuthorityStateDataReader::new(state.clone()));
69        Self {
70            state,
71            transaction_orchestrator,
72            metrics,
73            transaction_builder: TransactionBuilder::new(reader),
74        }
75    }
76
77    pub fn convert_bytes<T: serde::de::DeserializeOwned>(
78        &self,
79        tx_bytes: Base64,
80    ) -> Result<T, IotaRpcInputError> {
81        let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
82        Ok(data)
83    }
84
85    #[expect(clippy::type_complexity)]
86    fn prepare_execute_transaction_block(
87        &self,
88        tx_bytes: Base64,
89        signatures: Vec<Base64>,
90        opts: Option<IotaTransactionBlockResponseOptions>,
91    ) -> Result<
92        (
93            ExecuteTransactionRequestV1,
94            IotaTransactionBlockResponseOptions,
95            IotaAddress,
96            Vec<InputObjectKind>,
97            Transaction,
98            Option<IotaTransactionBlock>,
99            Vec<u8>,
100        ),
101        IotaRpcInputError,
102    > {
103        let opts = opts.unwrap_or_default();
104        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
105        let sender = tx_data.sender();
106        let input_objs = tx_data.input_objects().unwrap_or_default();
107
108        let mut sigs = Vec::new();
109        for sig in signatures {
110            sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
111        }
112        let txn = Transaction::from_generic_sig_data(tx_data, sigs);
113        let raw_transaction = if opts.show_raw_input {
114            bcs::to_bytes(txn.data())?
115        } else {
116            vec![]
117        };
118        let transaction = if opts.show_input {
119            let epoch_store = self.state.load_epoch_store_one_call_per_task();
120
121            Some(IotaTransactionBlock::try_from(
122                txn.data().clone(),
123                epoch_store.module_cache(),
124                *txn.digest(),
125            )?)
126        } else {
127            None
128        };
129
130        let request = ExecuteTransactionRequestV1 {
131            transaction: txn.clone(),
132            include_events: opts.show_events,
133            include_input_objects: opts.show_balance_changes || opts.show_object_changes,
134            include_output_objects: opts.show_balance_changes
135                || opts.show_object_changes
136                // In order to resolve events, we may need access to the newly published packages.
137                || opts.show_events,
138            include_auxiliary_data: false,
139        };
140
141        Ok((
142            request,
143            opts,
144            sender,
145            input_objs,
146            txn,
147            transaction,
148            raw_transaction,
149        ))
150    }
151
152    #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
153    async fn execute_transaction_block(
154        &self,
155        tx_bytes: Base64,
156        signatures: Vec<Base64>,
157        opts: Option<IotaTransactionBlockResponseOptions>,
158        request_type: Option<ExecuteTransactionRequestType>,
159    ) -> Result<IotaTransactionBlockResponse, Error> {
160        let request_type =
161            request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
162        let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
163            self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
164        let digest = *txn.digest();
165
166        let transaction_orchestrator = self.transaction_orchestrator.clone();
167        let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
168
169        tracing::trace!(
170            "Spawning transaction orchestrator task for transaction: {}",
171            digest
172        );
173        let (response, is_executed_locally) = spawn_monitored_task!(
174            transaction_orchestrator.execute_transaction_block(request, request_type, None)
175        )
176        .await?
177        .map_err(Error::from)?;
178        drop(orch_timer);
179
180        self.handle_post_orchestration(
181            response,
182            is_executed_locally,
183            opts,
184            digest,
185            input_objs,
186            transaction,
187            raw_transaction,
188            sender,
189        )
190        .await
191    }
192
193    #[instrument(level = "trace", skip_all)]
194    async fn handle_post_orchestration(
195        &self,
196        response: ExecuteTransactionResponseV1,
197        is_executed_locally: bool,
198        opts: IotaTransactionBlockResponseOptions,
199        digest: TransactionDigest,
200        input_objs: Vec<InputObjectKind>,
201        transaction: Option<IotaTransactionBlock>,
202        raw_transaction: Vec<u8>,
203        sender: IotaAddress,
204    ) -> Result<IotaTransactionBlockResponse, Error> {
205        let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
206
207        let events = if opts.show_events {
208            tracing::trace!("Resolving events");
209            let epoch_store = self.state.load_epoch_store_one_call_per_task();
210            let backing_package_store = PostExecutionPackageResolver::new(
211                self.state.get_backing_package_store().clone(),
212                &response.output_objects,
213            );
214            let mut layout_resolver = epoch_store
215                .executor()
216                .type_layout_resolver(Box::new(backing_package_store));
217            Some(IotaTransactionBlockEvents::try_from(
218                response.events.unwrap_or_default(),
219                digest,
220                None,
221                layout_resolver.as_mut(),
222            )?)
223        } else {
224            None
225        };
226
227        let object_cache = {
228            response.output_objects.map(|output_objects| {
229                ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
230            })
231        };
232
233        let balance_changes = match &object_cache {
234            Some(object_cache) if opts.show_balance_changes => Some(
235                get_balance_changes_from_effect(
236                    object_cache,
237                    &response.effects.effects,
238                    input_objs,
239                    None,
240                )
241                .instrument(tracing::trace_span!("resolving balance changes"))
242                .await?,
243            ),
244            _ => None,
245        };
246
247        let object_changes = match &object_cache {
248            Some(object_cache) if opts.show_object_changes => Some(
249                get_object_changes(
250                    object_cache,
251                    sender,
252                    response.effects.effects.modified_at_versions(),
253                    response.effects.effects.all_changed_objects(),
254                    response.effects.effects.all_removed_objects(),
255                )
256                .instrument(tracing::trace_span!("resolving object changes"))
257                .await?,
258            ),
259            _ => None,
260        };
261
262        let raw_effects = if opts.show_raw_effects {
263            bcs::to_bytes(&response.effects.effects)?
264        } else {
265            vec![]
266        };
267
268        Ok(IotaTransactionBlockResponse {
269            digest,
270            transaction,
271            raw_transaction,
272            effects: opts
273                .show_effects
274                .then_some(response.effects.effects.try_into()?),
275            events,
276            object_changes,
277            balance_changes,
278            timestamp_ms: None,
279            confirmed_local_execution: Some(is_executed_locally),
280            checkpoint: None,
281            errors: vec![],
282            raw_effects,
283        })
284    }
285
286    pub fn prepare_dry_run_transaction_block(
287        &self,
288        tx_bytes: Base64,
289    ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
290        let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
291        let input_objs = tx_data.input_objects()?;
292        let intent_msg = IntentMessage::new(
293            Intent {
294                version: IntentVersion::V0,
295                scope: IntentScope::TransactionData,
296                app_id: IntentAppId::Iota,
297            },
298            tx_data,
299        );
300        let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
301        Ok((intent_msg.value, txn_digest, input_objs))
302    }
303
304    async fn dry_run_transaction_block(
305        &self,
306        tx_bytes: Base64,
307    ) -> Result<DryRunTransactionBlockResponse, Error> {
308        let (txn_data, txn_digest, input_objs) =
309            self.prepare_dry_run_transaction_block(tx_bytes)?;
310        let sender = txn_data.sender();
311
312        // Use spawn_blocking since dry_exec_transaction is a long-running synchronous
313        // operation
314        let state = self.state.clone();
315        let (resp, written_objects, transaction_effects, mock_gas) =
316            tokio::task::spawn_blocking(move || {
317                state.dry_exec_transaction(txn_data.clone(), txn_digest)
318            })
319            .await
320            .map_err(Error::from)??;
321
322        let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
323        let balance_changes = get_balance_changes_from_effect(
324            &object_cache,
325            &transaction_effects,
326            input_objs,
327            mock_gas,
328        )
329        .await?;
330        let object_changes = get_object_changes(
331            &object_cache,
332            sender,
333            transaction_effects.modified_at_versions(),
334            transaction_effects.all_changed_objects(),
335            transaction_effects.all_removed_objects(),
336        )
337        .await?;
338
339        Ok(DryRunTransactionBlockResponse {
340            effects: resp.effects,
341            events: resp.events,
342            object_changes,
343            balance_changes,
344            input: resp.input,
345            suggested_gas_price: resp.suggested_gas_price,
346        })
347    }
348}
349
350#[async_trait]
351impl WriteApiServer for TransactionExecutionApi {
352    #[instrument(skip(self))]
353    async fn execute_transaction_block(
354        &self,
355        tx_bytes: Base64,
356        signatures: Vec<Base64>,
357        opts: Option<IotaTransactionBlockResponseOptions>,
358        request_type: Option<ExecuteTransactionRequestType>,
359    ) -> RpcResult<IotaTransactionBlockResponse> {
360        self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
361            .trace_timeout(Duration::from_secs(10))
362            .await
363    }
364
365    /// Calls a move view function.
366    #[instrument(skip(self))]
367    async fn view_function_call(
368        &self,
369        function_name: String,
370        type_args: Option<Vec<IotaTypeTag>>,
371        call_args: Vec<IotaJsonValue>,
372    ) -> RpcResult<IotaMoveViewCallResults> {
373        let chain = self
374            .state
375            .get_chain_identifier()
376            .map_err(Error::from)?
377            .chain();
378        if !matches!(chain, Chain::Unknown) {
379            return Err(Error::UnsupportedFeature(format!(
380                "View function calls not supported yet on {}",
381                chain.as_str()
382            ))
383            .into());
384        }
385        let MoveFunctionName {
386            package,
387            module,
388            function,
389        } = function_name.as_str().parse().map_err(Error::from)?;
390        let sender = IotaAddress::ZERO;
391        let tx_kind = self
392            .transaction_builder
393            .move_view_call_tx_kind(
394                package,
395                &module,
396                &function,
397                type_args.unwrap_or_default(),
398                call_args,
399            )
400            .await
401            .map_err(Error::from)?;
402        let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(Error::from)?);
403        let dev_inspect_results = self
404            .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
405            .await?;
406        Ok(
407            IotaMoveViewCallResults::from_dev_inspect_results(self.clone(), dev_inspect_results)
408                .await
409                .map_err(Error::from)?,
410        )
411    }
412
413    #[instrument(skip(self))]
414    async fn dev_inspect_transaction_block(
415        &self,
416        sender_address: IotaAddress,
417        tx_bytes: Base64,
418        gas_price: Option<BigInt<u64>>,
419        _epoch: Option<BigInt<u64>>,
420        additional_args: Option<DevInspectArgs>,
421    ) -> RpcResult<DevInspectResults> {
422        async move {
423            let DevInspectArgs {
424                gas_sponsor,
425                gas_budget,
426                gas_objects,
427                show_raw_txn_data_and_effects,
428                skip_checks,
429            } = additional_args.unwrap_or_default();
430            let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
431            self.state
432                .dev_inspect_transaction_block(
433                    sender_address,
434                    tx_kind,
435                    gas_price.map(|i| *i),
436                    gas_budget.map(|i| *i),
437                    gas_sponsor,
438                    gas_objects,
439                    show_raw_txn_data_and_effects,
440                    skip_checks,
441                )
442                .await
443                .map_err(Error::from)
444        }
445        .trace()
446        .await
447    }
448
449    #[instrument(skip(self))]
450    async fn dry_run_transaction_block(
451        &self,
452        tx_bytes: Base64,
453    ) -> RpcResult<DryRunTransactionBlockResponse> {
454        self.dry_run_transaction_block(tx_bytes).trace().await
455    }
456}
457
458impl IotaRpcModule for TransactionExecutionApi {
459    fn rpc(self) -> RpcModule<Self> {
460        self.into_rpc()
461    }
462
463    fn rpc_doc_module() -> Module {
464        WriteApiOpenRpc::module_doc()
465    }
466}
467
468#[async_trait]
469impl PackageStore for TransactionExecutionApi {
470    async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>, PackageResolverError> {
471        let backing_store = self.state.get_backing_package_store();
472        match backing_store.get_package_object(&(id.into())) {
473            Ok(Some(pkg)) => Ok(Arc::new(Package::read_from_package(pkg.move_package())?)),
474            Ok(None) => Err(PackageResolverError::PackageNotFound(id)),
475            Err(e) => Err(PackageResolverError::Store {
476                store: "Node",
477                source: Arc::new(e),
478            }),
479        }
480    }
481}