1use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10 authority::AuthorityState, authority_client::NetworkAuthorityClient,
11 transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json::IotaJsonValue;
14use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
15use iota_json_rpc_types::{
16 DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaMoveViewCallResults,
17 IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
18 IotaTransactionBlockResponseOptions, IotaTypeTag, MoveFunctionName,
19};
20use iota_metrics::spawn_monitored_task;
21use iota_open_rpc::Module;
22use iota_package_resolver::{Package, PackageStore, error::Error as PackageResolverError};
23use iota_protocol_config::Chain;
24use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
25use iota_transaction_builder::TransactionBuilder;
26use iota_types::{
27 base_types::IotaAddress,
28 crypto::default_hash,
29 digests::TransactionDigest,
30 effects::TransactionEffectsAPI,
31 iota_serde::BigInt,
32 quorum_driver_types::{
33 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
34 },
35 signature::GenericSignature,
36 storage::PostExecutionPackageResolver,
37 transaction::{
38 InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
39 },
40};
41use jsonrpsee::{RpcModule, core::RpcResult};
42use move_core_types::account_address::AccountAddress;
43use tracing::{Instrument, instrument};
44
45use crate::{
46 IotaRpcModule, ObjectProviderCache,
47 authority_state::StateRead,
48 error::{Error, IotaRpcInputError},
49 get_balance_changes_from_effect, get_object_changes,
50 logger::FutureWithTracing,
51 transaction_builder_api::AuthorityStateDataReader,
52};
53
54#[derive(Clone)]
55pub struct TransactionExecutionApi {
56 state: Arc<dyn StateRead>,
57 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
58 metrics: Arc<JsonRpcMetrics>,
59 transaction_builder: TransactionBuilder,
60}
61
62impl TransactionExecutionApi {
63 pub fn new(
64 state: Arc<AuthorityState>,
65 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
66 metrics: Arc<JsonRpcMetrics>,
67 ) -> Self {
68 let reader = Arc::new(AuthorityStateDataReader::new(state.clone()));
69 Self {
70 state,
71 transaction_orchestrator,
72 metrics,
73 transaction_builder: TransactionBuilder::new(reader),
74 }
75 }
76
77 pub fn convert_bytes<T: serde::de::DeserializeOwned>(
78 &self,
79 tx_bytes: Base64,
80 ) -> Result<T, IotaRpcInputError> {
81 let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
82 Ok(data)
83 }
84
85 #[expect(clippy::type_complexity)]
86 fn prepare_execute_transaction_block(
87 &self,
88 tx_bytes: Base64,
89 signatures: Vec<Base64>,
90 opts: Option<IotaTransactionBlockResponseOptions>,
91 ) -> Result<
92 (
93 ExecuteTransactionRequestV1,
94 IotaTransactionBlockResponseOptions,
95 IotaAddress,
96 Vec<InputObjectKind>,
97 Transaction,
98 Option<IotaTransactionBlock>,
99 Vec<u8>,
100 ),
101 IotaRpcInputError,
102 > {
103 let opts = opts.unwrap_or_default();
104 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
105 let sender = tx_data.sender();
106 let input_objs = tx_data.input_objects().unwrap_or_default();
107
108 let mut sigs = Vec::new();
109 for sig in signatures {
110 sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
111 }
112 let txn = Transaction::from_generic_sig_data(tx_data, sigs);
113 let raw_transaction = if opts.show_raw_input {
114 bcs::to_bytes(txn.data())?
115 } else {
116 vec![]
117 };
118 let transaction = if opts.show_input {
119 let epoch_store = self.state.load_epoch_store_one_call_per_task();
120
121 Some(IotaTransactionBlock::try_from(
122 txn.data().clone(),
123 epoch_store.module_cache(),
124 *txn.digest(),
125 )?)
126 } else {
127 None
128 };
129
130 let request = ExecuteTransactionRequestV1 {
131 transaction: txn.clone(),
132 include_events: opts.show_events,
133 include_input_objects: opts.show_balance_changes || opts.show_object_changes,
134 include_output_objects: opts.show_balance_changes
135 || opts.show_object_changes
136 || opts.show_events,
138 include_auxiliary_data: false,
139 };
140
141 Ok((
142 request,
143 opts,
144 sender,
145 input_objs,
146 txn,
147 transaction,
148 raw_transaction,
149 ))
150 }
151
152 #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
153 async fn execute_transaction_block(
154 &self,
155 tx_bytes: Base64,
156 signatures: Vec<Base64>,
157 opts: Option<IotaTransactionBlockResponseOptions>,
158 request_type: Option<ExecuteTransactionRequestType>,
159 ) -> Result<IotaTransactionBlockResponse, Error> {
160 let request_type =
161 request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
162 let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
163 self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
164 let digest = *txn.digest();
165
166 let transaction_orchestrator = self.transaction_orchestrator.clone();
167 let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
168
169 tracing::trace!(
170 "Spawning transaction orchestrator task for transaction: {}",
171 digest
172 );
173 let (response, is_executed_locally) = spawn_monitored_task!(
174 transaction_orchestrator.execute_transaction_block(request, request_type, None)
175 )
176 .await?
177 .map_err(Error::from)?;
178 drop(orch_timer);
179
180 self.handle_post_orchestration(
181 response,
182 is_executed_locally,
183 opts,
184 digest,
185 input_objs,
186 transaction,
187 raw_transaction,
188 sender,
189 )
190 .await
191 }
192
193 #[instrument(level = "trace", skip_all)]
194 async fn handle_post_orchestration(
195 &self,
196 response: ExecuteTransactionResponseV1,
197 is_executed_locally: bool,
198 opts: IotaTransactionBlockResponseOptions,
199 digest: TransactionDigest,
200 input_objs: Vec<InputObjectKind>,
201 transaction: Option<IotaTransactionBlock>,
202 raw_transaction: Vec<u8>,
203 sender: IotaAddress,
204 ) -> Result<IotaTransactionBlockResponse, Error> {
205 let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
206
207 let events = if opts.show_events {
208 tracing::trace!("Resolving events");
209 let epoch_store = self.state.load_epoch_store_one_call_per_task();
210 let backing_package_store = PostExecutionPackageResolver::new(
211 self.state.get_backing_package_store().clone(),
212 &response.output_objects,
213 );
214 let mut layout_resolver = epoch_store
215 .executor()
216 .type_layout_resolver(Box::new(backing_package_store));
217 Some(IotaTransactionBlockEvents::try_from(
218 response.events.unwrap_or_default(),
219 digest,
220 None,
221 layout_resolver.as_mut(),
222 )?)
223 } else {
224 None
225 };
226
227 let object_cache = {
228 response.output_objects.map(|output_objects| {
229 ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
230 })
231 };
232
233 let balance_changes = match &object_cache {
234 Some(object_cache) if opts.show_balance_changes => Some(
235 get_balance_changes_from_effect(
236 object_cache,
237 &response.effects.effects,
238 input_objs,
239 None,
240 )
241 .instrument(tracing::trace_span!("resolving balance changes"))
242 .await?,
243 ),
244 _ => None,
245 };
246
247 let object_changes = match &object_cache {
248 Some(object_cache) if opts.show_object_changes => Some(
249 get_object_changes(
250 object_cache,
251 sender,
252 response.effects.effects.modified_at_versions(),
253 response.effects.effects.all_changed_objects(),
254 response.effects.effects.all_removed_objects(),
255 )
256 .instrument(tracing::trace_span!("resolving object changes"))
257 .await?,
258 ),
259 _ => None,
260 };
261
262 let raw_effects = if opts.show_raw_effects {
263 bcs::to_bytes(&response.effects.effects)?
264 } else {
265 vec![]
266 };
267
268 Ok(IotaTransactionBlockResponse {
269 digest,
270 transaction,
271 raw_transaction,
272 effects: opts
273 .show_effects
274 .then_some(response.effects.effects.try_into()?),
275 events,
276 object_changes,
277 balance_changes,
278 timestamp_ms: None,
279 confirmed_local_execution: Some(is_executed_locally),
280 checkpoint: None,
281 errors: vec![],
282 raw_effects,
283 })
284 }
285
286 pub fn prepare_dry_run_transaction_block(
287 &self,
288 tx_bytes: Base64,
289 ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
290 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
291 let input_objs = tx_data.input_objects()?;
292 let intent_msg = IntentMessage::new(
293 Intent {
294 version: IntentVersion::V0,
295 scope: IntentScope::TransactionData,
296 app_id: IntentAppId::Iota,
297 },
298 tx_data,
299 );
300 let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
301 Ok((intent_msg.value, txn_digest, input_objs))
302 }
303
304 async fn dry_run_transaction_block(
305 &self,
306 tx_bytes: Base64,
307 ) -> Result<DryRunTransactionBlockResponse, Error> {
308 let (txn_data, txn_digest, input_objs) =
309 self.prepare_dry_run_transaction_block(tx_bytes)?;
310 let sender = txn_data.sender();
311
312 let state = self.state.clone();
315 let (resp, written_objects, transaction_effects, mock_gas) =
316 tokio::task::spawn_blocking(move || {
317 state.dry_exec_transaction(txn_data.clone(), txn_digest)
318 })
319 .await
320 .map_err(Error::from)??;
321
322 let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
323 let balance_changes = get_balance_changes_from_effect(
324 &object_cache,
325 &transaction_effects,
326 input_objs,
327 mock_gas,
328 )
329 .await?;
330 let object_changes = get_object_changes(
331 &object_cache,
332 sender,
333 transaction_effects.modified_at_versions(),
334 transaction_effects.all_changed_objects(),
335 transaction_effects.all_removed_objects(),
336 )
337 .await?;
338
339 Ok(DryRunTransactionBlockResponse {
340 effects: resp.effects,
341 events: resp.events,
342 object_changes,
343 balance_changes,
344 input: resp.input,
345 suggested_gas_price: resp.suggested_gas_price,
346 execution_error_source: resp.execution_error_source,
347 })
348 }
349}
350
351#[async_trait]
352impl WriteApiServer for TransactionExecutionApi {
353 #[instrument(skip(self))]
354 async fn execute_transaction_block(
355 &self,
356 tx_bytes: Base64,
357 signatures: Vec<Base64>,
358 opts: Option<IotaTransactionBlockResponseOptions>,
359 request_type: Option<ExecuteTransactionRequestType>,
360 ) -> RpcResult<IotaTransactionBlockResponse> {
361 self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
362 .trace_timeout(Duration::from_secs(10))
363 .await
364 }
365
366 #[instrument(skip(self))]
368 async fn view_function_call(
369 &self,
370 function_name: String,
371 type_args: Option<Vec<IotaTypeTag>>,
372 arguments: Vec<IotaJsonValue>,
373 ) -> RpcResult<IotaMoveViewCallResults> {
374 let chain = self
375 .state
376 .get_chain_identifier()
377 .map_err(Error::from)?
378 .chain();
379 if !matches!(chain, Chain::Unknown) {
380 return Err(Error::UnsupportedFeature(format!(
381 "View function calls not supported yet on {}",
382 chain.as_str()
383 ))
384 .into());
385 }
386 let MoveFunctionName {
387 package,
388 module,
389 function,
390 } = function_name.as_str().parse().map_err(Error::from)?;
391 let sender = IotaAddress::ZERO;
392 let tx_kind = self
393 .transaction_builder
394 .move_view_call_tx_kind(
395 package,
396 &module,
397 &function,
398 type_args.unwrap_or_default(),
399 arguments,
400 )
401 .await
402 .map_err(Error::from)?;
403 let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(Error::from)?);
404 let dev_inspect_results = self
405 .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
406 .await?;
407 Ok(
408 IotaMoveViewCallResults::from_dev_inspect_results(self.clone(), dev_inspect_results)
409 .await
410 .map_err(Error::from)?,
411 )
412 }
413
414 #[instrument(skip(self))]
415 async fn dev_inspect_transaction_block(
416 &self,
417 sender_address: IotaAddress,
418 tx_bytes: Base64,
419 gas_price: Option<BigInt<u64>>,
420 _epoch: Option<BigInt<u64>>,
421 additional_args: Option<DevInspectArgs>,
422 ) -> RpcResult<DevInspectResults> {
423 async move {
424 let DevInspectArgs {
425 gas_sponsor,
426 gas_budget,
427 gas_objects,
428 show_raw_txn_data_and_effects,
429 skip_checks,
430 } = additional_args.unwrap_or_default();
431 let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
432 self.state
433 .dev_inspect_transaction_block(
434 sender_address,
435 tx_kind,
436 gas_price.map(|i| *i),
437 gas_budget.map(|i| *i),
438 gas_sponsor,
439 gas_objects,
440 show_raw_txn_data_and_effects,
441 skip_checks,
442 )
443 .await
444 .map_err(Error::from)
445 }
446 .trace()
447 .await
448 }
449
450 #[instrument(skip(self))]
451 async fn dry_run_transaction_block(
452 &self,
453 tx_bytes: Base64,
454 ) -> RpcResult<DryRunTransactionBlockResponse> {
455 self.dry_run_transaction_block(tx_bytes).trace().await
456 }
457}
458
459impl IotaRpcModule for TransactionExecutionApi {
460 fn rpc(self) -> RpcModule<Self> {
461 self.into_rpc()
462 }
463
464 fn rpc_doc_module() -> Module {
465 WriteApiOpenRpc::module_doc()
466 }
467}
468
469#[async_trait]
470impl PackageStore for TransactionExecutionApi {
471 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>, PackageResolverError> {
472 let backing_store = self.state.get_backing_package_store();
473 match backing_store.get_package_object(&(id.into())) {
474 Ok(Some(pkg)) => Ok(Arc::new(Package::read_from_package(pkg.move_package())?)),
475 Ok(None) => Err(PackageResolverError::PackageNotFound(id)),
476 Err(e) => Err(PackageResolverError::Store {
477 store: "Node",
478 source: Arc::new(e),
479 }),
480 }
481 }
482}