1use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10 authority::AuthorityState, authority_client::NetworkAuthorityClient,
11 transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json::IotaJsonValue;
14use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
15use iota_json_rpc_types::{
16 DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
17 ExecuteTransactionRequestType as ExecuteTransactionRequestTypeSchema, IotaExecutionStatus,
18 IotaMoveViewCallResults, IotaTransactionBlock, IotaTransactionBlockEffects,
19 IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
20 IotaTransactionBlockResponseOptions, IotaTypeTag, MoveFunctionName,
21};
22use iota_metrics::spawn_monitored_task;
23use iota_open_rpc::Module;
24use iota_package_resolver::{
25 Package, PackageStore, Resolver, error::Error as PackageResolverError,
26};
27use iota_protocol_config::Chain;
28use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
29use iota_transaction_builder::TransactionBuilder;
30use iota_types::{
31 base_types::{IotaAddress, ObjectID},
32 digests::TransactionDigest,
33 effects::{TransactionEffectsAPI, TransactionEffectsExt},
34 iota_serde::BigInt,
35 quorum_driver_types::{
36 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
37 },
38 signature::GenericSignature,
39 storage::PostExecutionPackageResolver,
40 transaction::{
41 InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
42 },
43};
44use jsonrpsee::{RpcModule, core::RpcResult};
45use tracing::{Instrument, instrument};
46
47use crate::{
48 IotaRpcModule, ObjectProviderCache,
49 authority_state::StateRead,
50 error::{Error, IotaRpcInputError},
51 get_balance_changes_from_effect, get_object_changes,
52 logger::FutureWithTracing,
53 transaction_builder_api::AuthorityStateDataReader,
54};
55
56#[derive(Clone)]
57pub struct TransactionExecutionApi {
58 state: Arc<dyn StateRead>,
59 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
60 metrics: Arc<JsonRpcMetrics>,
61 transaction_builder: TransactionBuilder,
62}
63
64impl TransactionExecutionApi {
65 pub fn new(
66 state: Arc<AuthorityState>,
67 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
68 metrics: Arc<JsonRpcMetrics>,
69 ) -> Self {
70 let reader = Arc::new(AuthorityStateDataReader::new(state.clone()));
71 Self {
72 state,
73 transaction_orchestrator,
74 metrics,
75 transaction_builder: TransactionBuilder::new(reader),
76 }
77 }
78
79 pub fn convert_bytes<T: serde::de::DeserializeOwned>(
80 &self,
81 tx_bytes: Base64,
82 ) -> Result<T, IotaRpcInputError> {
83 let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
84 Ok(data)
85 }
86
87 #[expect(clippy::type_complexity)]
88 fn prepare_execute_transaction_block(
89 &self,
90 tx_bytes: Base64,
91 signatures: Vec<Base64>,
92 opts: Option<IotaTransactionBlockResponseOptions>,
93 ) -> Result<
94 (
95 ExecuteTransactionRequestV1,
96 IotaTransactionBlockResponseOptions,
97 IotaAddress,
98 Vec<InputObjectKind>,
99 Transaction,
100 Option<IotaTransactionBlock>,
101 Vec<u8>,
102 ),
103 IotaRpcInputError,
104 > {
105 let opts = opts.unwrap_or_default();
106 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
107 let sender = tx_data.sender();
108 let input_objs = tx_data.input_objects().unwrap_or_default();
109
110 let mut sigs = Vec::new();
111 for sig in signatures {
112 sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
113 }
114 let txn = Transaction::from_generic_sig_data(tx_data, sigs);
115 let raw_transaction = if opts.show_raw_input {
116 bcs::to_bytes(txn.data())?
117 } else {
118 vec![]
119 };
120 let transaction = if opts.show_input {
121 let epoch_store = self.state.load_epoch_store_one_call_per_task();
122
123 Some(IotaTransactionBlock::try_from(
124 txn.data().clone(),
125 epoch_store.module_cache(),
126 *txn.digest(),
127 )?)
128 } else {
129 None
130 };
131
132 let request = ExecuteTransactionRequestV1 {
133 transaction: txn.clone(),
134 include_events: opts.show_events,
135 include_input_objects: opts.show_balance_changes || opts.show_object_changes,
136 include_output_objects: opts.show_balance_changes
137 || opts.show_object_changes
138 || opts.show_events,
140 include_auxiliary_data: false,
141 };
142
143 Ok((
144 request,
145 opts,
146 sender,
147 input_objs,
148 txn,
149 transaction,
150 raw_transaction,
151 ))
152 }
153
154 #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
155 async fn execute_transaction_block(
156 &self,
157 tx_bytes: Base64,
158 signatures: Vec<Base64>,
159 opts: Option<IotaTransactionBlockResponseOptions>,
160 request_type: Option<ExecuteTransactionRequestType>,
161 ) -> Result<IotaTransactionBlockResponse, Error> {
162 let request_type =
163 request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
164 let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
165 self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
166 let digest = *txn.digest();
167
168 let transaction_orchestrator = self.transaction_orchestrator.clone();
169 let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
170
171 tracing::trace!(
172 "Spawning transaction orchestrator task for transaction: {}",
173 digest
174 );
175 let (response, is_executed_locally) = spawn_monitored_task!(
176 transaction_orchestrator.execute_transaction_block(request, request_type, None)
177 )
178 .await?
179 .map_err(Error::from)?;
180 drop(orch_timer);
181
182 self.handle_post_orchestration(
183 response,
184 is_executed_locally,
185 opts,
186 digest,
187 input_objs,
188 transaction,
189 raw_transaction,
190 sender,
191 )
192 .await
193 }
194
195 #[instrument(level = "trace", skip_all)]
196 async fn handle_post_orchestration(
197 &self,
198 response: ExecuteTransactionResponseV1,
199 is_executed_locally: bool,
200 opts: IotaTransactionBlockResponseOptions,
201 digest: TransactionDigest,
202 input_objs: Vec<InputObjectKind>,
203 transaction: Option<IotaTransactionBlock>,
204 raw_transaction: Vec<u8>,
205 sender: IotaAddress,
206 ) -> Result<IotaTransactionBlockResponse, Error> {
207 let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
208
209 let events = if opts.show_events {
210 tracing::trace!("Resolving events");
211 let epoch_store = self.state.load_epoch_store_one_call_per_task();
212 let backing_package_store = PostExecutionPackageResolver::new(
213 self.state.get_backing_package_store().clone(),
214 &response.output_objects,
215 );
216 let mut layout_resolver = epoch_store
217 .executor()
218 .type_layout_resolver(Box::new(backing_package_store));
219 Some(IotaTransactionBlockEvents::try_from(
220 response.events.unwrap_or_default(),
221 digest,
222 None,
223 layout_resolver.as_mut(),
224 )?)
225 } else {
226 None
227 };
228
229 let object_cache = if (opts.show_balance_changes || opts.show_object_changes)
234 && (response.input_objects.is_some() || response.output_objects.is_some())
235 {
236 let mut object_cache = ObjectProviderCache::new(self.state.clone());
237 if let Some(input_objects) = response.input_objects {
238 object_cache.insert_objects_into_cache(input_objects);
239 }
240 if let Some(output_objects) = response.output_objects {
241 object_cache.insert_objects_into_cache(output_objects);
242 }
243 Some(object_cache)
244 } else {
245 None
246 };
247
248 let balance_changes = match &object_cache {
249 Some(object_cache) if opts.show_balance_changes => Some(
250 get_balance_changes_from_effect(
251 object_cache,
252 &response.effects.effects,
253 input_objs,
254 None,
255 )
256 .instrument(tracing::trace_span!("resolving balance changes"))
257 .await?,
258 ),
259 _ => None,
260 };
261
262 let object_changes = match &object_cache {
263 Some(object_cache) if opts.show_object_changes => Some(
264 get_object_changes(
265 object_cache,
266 sender,
267 response.effects.effects.modified_at_versions(),
268 response.effects.effects.all_changed_objects(),
269 response.effects.effects.all_removed_objects(),
270 )
271 .instrument(tracing::trace_span!("resolving object changes"))
272 .await?,
273 ),
274 _ => None,
275 };
276
277 let raw_effects = if opts.show_raw_effects {
278 bcs::to_bytes(&response.effects.effects)?
279 } else {
280 vec![]
281 };
282 let resolver = Resolver::new(self.clone());
283
284 let effects = if opts.show_effects {
285 Some(
286 IotaTransactionBlockEffects::from_native_with_clever_error(
287 response.effects.effects,
288 &resolver,
289 )
290 .await,
291 )
292 } else {
293 None
294 };
295
296 let errors = match effects.as_ref().map(|e| e.status()) {
297 Some(IotaExecutionStatus::Failure { error }) => vec![error.clone()],
298 _ => vec![],
299 };
300
301 Ok(IotaTransactionBlockResponse {
302 digest,
303 transaction,
304 raw_transaction,
305 effects,
306 events,
307 object_changes,
308 balance_changes,
309 timestamp_ms: None,
310 confirmed_local_execution: Some(is_executed_locally),
311 checkpoint: None,
312 errors,
313 raw_effects,
314 })
315 }
316
317 pub fn prepare_dry_run_transaction_block(
318 &self,
319 tx_bytes: Base64,
320 ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
321 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
322 let input_objs = tx_data.input_objects()?;
323 let intent_msg = IntentMessage::new(
324 Intent {
325 version: IntentVersion::V0,
326 scope: IntentScope::TransactionData,
327 app_id: IntentAppId::Iota,
328 },
329 tx_data,
330 );
331 let txn_digest = TransactionDigest::new(intent_msg.value.digest().into_inner());
332 Ok((intent_msg.value, txn_digest, input_objs))
333 }
334
335 async fn dry_run_transaction_block(
336 &self,
337 tx_bytes: Base64,
338 ) -> Result<DryRunTransactionBlockResponse, Error> {
339 let (txn_data, txn_digest, input_objs) =
340 self.prepare_dry_run_transaction_block(tx_bytes)?;
341 let sender = txn_data.sender();
342
343 let state = self.state.clone();
346 let (resp, written_objects, transaction_effects, mock_gas) =
347 tokio::task::spawn_blocking(move || {
348 state.dry_exec_transaction(txn_data.clone(), txn_digest)
349 })
350 .await
351 .map_err(Error::from)??;
352
353 let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
354 let balance_changes = get_balance_changes_from_effect(
355 &object_cache,
356 &transaction_effects,
357 input_objs,
358 mock_gas,
359 )
360 .await?;
361 let object_changes = get_object_changes(
362 &object_cache,
363 sender,
364 transaction_effects.modified_at_versions(),
365 transaction_effects.all_changed_objects(),
366 transaction_effects.all_removed_objects(),
367 )
368 .await?;
369
370 let resolver = Resolver::new(self.clone());
371 let effects = IotaTransactionBlockEffects::from_native_with_clever_error(
372 transaction_effects,
373 &resolver,
374 )
375 .await;
376
377 Ok(DryRunTransactionBlockResponse {
378 effects,
379 events: resp.events,
380 object_changes,
381 balance_changes,
382 input: resp.input,
383 suggested_gas_price: resp.suggested_gas_price,
384 execution_error_source: resp.execution_error_source,
385 })
386 }
387}
388
389#[async_trait]
390impl WriteApiServer for TransactionExecutionApi {
391 #[instrument(skip(self))]
392 async fn execute_transaction_block(
393 &self,
394 tx_bytes: Base64,
395 signatures: Vec<Base64>,
396 opts: Option<IotaTransactionBlockResponseOptions>,
397 request_type: Option<ExecuteTransactionRequestTypeSchema>,
398 ) -> RpcResult<IotaTransactionBlockResponse> {
399 self.execute_transaction_block(tx_bytes, signatures, opts, request_type.map(Into::into))
400 .trace_timeout(Duration::from_secs(10))
401 .await
402 }
403
404 #[instrument(skip(self))]
406 async fn view_function_call(
407 &self,
408 function_name: String,
409 type_args: Option<Vec<IotaTypeTag>>,
410 arguments: Vec<IotaJsonValue>,
411 ) -> RpcResult<IotaMoveViewCallResults> {
412 let chain = self
413 .state
414 .get_chain_identifier()
415 .map_err(Error::from)?
416 .chain();
417 if !matches!(chain, Chain::Unknown) {
418 return Err(Error::UnsupportedFeature(format!(
419 "View function calls not supported yet on {}",
420 chain.as_str()
421 ))
422 .into());
423 }
424 let MoveFunctionName {
425 package,
426 module,
427 function,
428 } = function_name.as_str().parse().map_err(Error::from)?;
429 let sender = IotaAddress::ZERO;
430 let tx_kind = self
431 .transaction_builder
432 .move_view_call_tx_kind(
433 package,
434 &module,
435 &function,
436 type_args.unwrap_or_default(),
437 arguments,
438 )
439 .await
440 .map_err(Error::from)?;
441 let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(Error::from)?);
442 let dev_inspect_results = self
443 .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
444 .await?;
445 Ok(
446 IotaMoveViewCallResults::from_dev_inspect_results(self.clone(), dev_inspect_results)
447 .await
448 .map_err(Error::from)?,
449 )
450 }
451
452 #[instrument(skip(self, sender_address), fields(sender_address = %sender_address))]
453 async fn dev_inspect_transaction_block(
454 &self,
455 sender_address: IotaAddress,
456 tx_bytes: Base64,
457 gas_price: Option<BigInt<u64>>,
458 _epoch: Option<BigInt<u64>>,
459 additional_args: Option<DevInspectArgs>,
460 ) -> RpcResult<DevInspectResults> {
461 async move {
462 let DevInspectArgs {
463 gas_sponsor,
464 gas_budget,
465 gas_objects,
466 show_raw_txn_data_and_effects,
467 skip_checks,
468 } = additional_args.unwrap_or_default();
469 let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
470 self.state
471 .dev_inspect_transaction_block(
472 sender_address,
473 tx_kind,
474 gas_price.map(|i| *i),
475 gas_budget,
476 gas_sponsor,
477 gas_objects,
478 show_raw_txn_data_and_effects,
479 skip_checks,
480 )
481 .await
482 .map_err(Error::from)
483 }
484 .trace()
485 .await
486 }
487
488 #[instrument(skip(self))]
489 async fn dry_run_transaction_block(
490 &self,
491 tx_bytes: Base64,
492 ) -> RpcResult<DryRunTransactionBlockResponse> {
493 self.dry_run_transaction_block(tx_bytes).trace().await
494 }
495}
496
497impl IotaRpcModule for TransactionExecutionApi {
498 fn rpc(self) -> RpcModule<Self> {
499 self.into_rpc()
500 }
501
502 fn rpc_doc_module() -> Module {
503 WriteApiOpenRpc::module_doc()
504 }
505}
506
507#[async_trait]
508impl PackageStore for TransactionExecutionApi {
509 async fn fetch(&self, id: IotaAddress) -> Result<Arc<Package>, PackageResolverError> {
510 let backing_store = self.state.get_backing_package_store();
511 match backing_store.get_package_object(&ObjectID::new(id.into_bytes())) {
512 Ok(Some(pkg)) => Ok(Arc::new(Package::read_from_package(pkg.move_package())?)),
513 Ok(None) => Err(PackageResolverError::PackageNotFound(id)),
514 Err(e) => Err(PackageResolverError::Store {
515 store: "Node",
516 source: Arc::new(e),
517 }),
518 }
519 }
520}