1use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10 authority::AuthorityState, authority_client::NetworkAuthorityClient,
11 transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json::IotaJsonValue;
14use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
15use iota_json_rpc_types::{
16 DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaMoveViewCallResults,
17 IotaTransactionBlock, IotaTransactionBlockEvents, IotaTransactionBlockResponse,
18 IotaTransactionBlockResponseOptions, IotaTypeTag, MoveFunctionName,
19};
20use iota_metrics::spawn_monitored_task;
21use iota_open_rpc::Module;
22use iota_package_resolver::{Package, PackageStore, error::Error as PackageResolverError};
23use iota_protocol_config::Chain;
24use iota_sdk_types::crypto::{Intent, IntentAppId, IntentMessage, IntentScope, IntentVersion};
25use iota_transaction_builder::TransactionBuilder;
26use iota_types::{
27 base_types::IotaAddress,
28 crypto::default_hash,
29 digests::TransactionDigest,
30 effects::TransactionEffectsAPI,
31 iota_serde::BigInt,
32 quorum_driver_types::{
33 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
34 },
35 signature::GenericSignature,
36 storage::PostExecutionPackageResolver,
37 transaction::{
38 InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
39 },
40};
41use jsonrpsee::{RpcModule, core::RpcResult};
42use move_core_types::account_address::AccountAddress;
43use tracing::{Instrument, instrument};
44
45use crate::{
46 IotaRpcModule, ObjectProviderCache,
47 authority_state::StateRead,
48 error::{Error, IotaRpcInputError},
49 get_balance_changes_from_effect, get_object_changes,
50 logger::FutureWithTracing,
51 transaction_builder_api::AuthorityStateDataReader,
52};
53
54#[derive(Clone)]
55pub struct TransactionExecutionApi {
56 state: Arc<dyn StateRead>,
57 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
58 metrics: Arc<JsonRpcMetrics>,
59 transaction_builder: TransactionBuilder,
60}
61
62impl TransactionExecutionApi {
63 pub fn new(
64 state: Arc<AuthorityState>,
65 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
66 metrics: Arc<JsonRpcMetrics>,
67 ) -> Self {
68 let reader = Arc::new(AuthorityStateDataReader::new(state.clone()));
69 Self {
70 state,
71 transaction_orchestrator,
72 metrics,
73 transaction_builder: TransactionBuilder::new(reader),
74 }
75 }
76
77 pub fn convert_bytes<T: serde::de::DeserializeOwned>(
78 &self,
79 tx_bytes: Base64,
80 ) -> Result<T, IotaRpcInputError> {
81 let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
82 Ok(data)
83 }
84
85 #[expect(clippy::type_complexity)]
86 fn prepare_execute_transaction_block(
87 &self,
88 tx_bytes: Base64,
89 signatures: Vec<Base64>,
90 opts: Option<IotaTransactionBlockResponseOptions>,
91 ) -> Result<
92 (
93 ExecuteTransactionRequestV1,
94 IotaTransactionBlockResponseOptions,
95 IotaAddress,
96 Vec<InputObjectKind>,
97 Transaction,
98 Option<IotaTransactionBlock>,
99 Vec<u8>,
100 ),
101 IotaRpcInputError,
102 > {
103 let opts = opts.unwrap_or_default();
104 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
105 let sender = tx_data.sender();
106 let input_objs = tx_data.input_objects().unwrap_or_default();
107
108 let mut sigs = Vec::new();
109 for sig in signatures {
110 sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
111 }
112 let txn = Transaction::from_generic_sig_data(tx_data, sigs);
113 let raw_transaction = if opts.show_raw_input {
114 bcs::to_bytes(txn.data())?
115 } else {
116 vec![]
117 };
118 let transaction = if opts.show_input {
119 let epoch_store = self.state.load_epoch_store_one_call_per_task();
120
121 Some(IotaTransactionBlock::try_from(
122 txn.data().clone(),
123 epoch_store.module_cache(),
124 *txn.digest(),
125 )?)
126 } else {
127 None
128 };
129
130 let request = ExecuteTransactionRequestV1 {
131 transaction: txn.clone(),
132 include_events: opts.show_events,
133 include_input_objects: opts.show_balance_changes || opts.show_object_changes,
134 include_output_objects: opts.show_balance_changes
135 || opts.show_object_changes
136 || opts.show_events,
138 include_auxiliary_data: false,
139 };
140
141 Ok((
142 request,
143 opts,
144 sender,
145 input_objs,
146 txn,
147 transaction,
148 raw_transaction,
149 ))
150 }
151
152 #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
153 async fn execute_transaction_block(
154 &self,
155 tx_bytes: Base64,
156 signatures: Vec<Base64>,
157 opts: Option<IotaTransactionBlockResponseOptions>,
158 request_type: Option<ExecuteTransactionRequestType>,
159 ) -> Result<IotaTransactionBlockResponse, Error> {
160 let request_type =
161 request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
162 let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
163 self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
164 let digest = *txn.digest();
165
166 let transaction_orchestrator = self.transaction_orchestrator.clone();
167 let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
168
169 tracing::trace!(
170 "Spawning transaction orchestrator task for transaction: {}",
171 digest
172 );
173 let (response, is_executed_locally) = spawn_monitored_task!(
174 transaction_orchestrator.execute_transaction_block(request, request_type, None)
175 )
176 .await?
177 .map_err(Error::from)?;
178 drop(orch_timer);
179
180 self.handle_post_orchestration(
181 response,
182 is_executed_locally,
183 opts,
184 digest,
185 input_objs,
186 transaction,
187 raw_transaction,
188 sender,
189 )
190 .await
191 }
192
193 #[instrument(level = "trace", skip_all)]
194 async fn handle_post_orchestration(
195 &self,
196 response: ExecuteTransactionResponseV1,
197 is_executed_locally: bool,
198 opts: IotaTransactionBlockResponseOptions,
199 digest: TransactionDigest,
200 input_objs: Vec<InputObjectKind>,
201 transaction: Option<IotaTransactionBlock>,
202 raw_transaction: Vec<u8>,
203 sender: IotaAddress,
204 ) -> Result<IotaTransactionBlockResponse, Error> {
205 let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
206
207 let events = if opts.show_events {
208 tracing::trace!("Resolving events");
209 let epoch_store = self.state.load_epoch_store_one_call_per_task();
210 let backing_package_store = PostExecutionPackageResolver::new(
211 self.state.get_backing_package_store().clone(),
212 &response.output_objects,
213 );
214 let mut layout_resolver = epoch_store
215 .executor()
216 .type_layout_resolver(Box::new(backing_package_store));
217 Some(IotaTransactionBlockEvents::try_from(
218 response.events.unwrap_or_default(),
219 digest,
220 None,
221 layout_resolver.as_mut(),
222 )?)
223 } else {
224 None
225 };
226
227 let object_cache = {
228 response.output_objects.map(|output_objects| {
229 ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
230 })
231 };
232
233 let balance_changes = match &object_cache {
234 Some(object_cache) if opts.show_balance_changes => Some(
235 get_balance_changes_from_effect(
236 object_cache,
237 &response.effects.effects,
238 input_objs,
239 None,
240 )
241 .instrument(tracing::trace_span!("resolving balance changes"))
242 .await?,
243 ),
244 _ => None,
245 };
246
247 let object_changes = match &object_cache {
248 Some(object_cache) if opts.show_object_changes => Some(
249 get_object_changes(
250 object_cache,
251 sender,
252 response.effects.effects.modified_at_versions(),
253 response.effects.effects.all_changed_objects(),
254 response.effects.effects.all_removed_objects(),
255 )
256 .instrument(tracing::trace_span!("resolving object changes"))
257 .await?,
258 ),
259 _ => None,
260 };
261
262 let raw_effects = if opts.show_raw_effects {
263 bcs::to_bytes(&response.effects.effects)?
264 } else {
265 vec![]
266 };
267
268 Ok(IotaTransactionBlockResponse {
269 digest,
270 transaction,
271 raw_transaction,
272 effects: opts
273 .show_effects
274 .then_some(response.effects.effects.try_into()?),
275 events,
276 object_changes,
277 balance_changes,
278 timestamp_ms: None,
279 confirmed_local_execution: Some(is_executed_locally),
280 checkpoint: None,
281 errors: vec![],
282 raw_effects,
283 })
284 }
285
286 pub fn prepare_dry_run_transaction_block(
287 &self,
288 tx_bytes: Base64,
289 ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
290 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
291 let input_objs = tx_data.input_objects()?;
292 let intent_msg = IntentMessage::new(
293 Intent {
294 version: IntentVersion::V0,
295 scope: IntentScope::TransactionData,
296 app_id: IntentAppId::Iota,
297 },
298 tx_data,
299 );
300 let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
301 Ok((intent_msg.value, txn_digest, input_objs))
302 }
303
304 async fn dry_run_transaction_block(
305 &self,
306 tx_bytes: Base64,
307 ) -> Result<DryRunTransactionBlockResponse, Error> {
308 let (txn_data, txn_digest, input_objs) =
309 self.prepare_dry_run_transaction_block(tx_bytes)?;
310 let sender = txn_data.sender();
311
312 let state = self.state.clone();
315 let (resp, written_objects, transaction_effects, mock_gas) =
316 tokio::task::spawn_blocking(move || {
317 state.dry_exec_transaction(txn_data.clone(), txn_digest)
318 })
319 .await
320 .map_err(Error::from)??;
321
322 let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
323 let balance_changes = get_balance_changes_from_effect(
324 &object_cache,
325 &transaction_effects,
326 input_objs,
327 mock_gas,
328 )
329 .await?;
330 let object_changes = get_object_changes(
331 &object_cache,
332 sender,
333 transaction_effects.modified_at_versions(),
334 transaction_effects.all_changed_objects(),
335 transaction_effects.all_removed_objects(),
336 )
337 .await?;
338
339 Ok(DryRunTransactionBlockResponse {
340 effects: resp.effects,
341 events: resp.events,
342 object_changes,
343 balance_changes,
344 input: resp.input,
345 suggested_gas_price: resp.suggested_gas_price,
346 })
347 }
348}
349
350#[async_trait]
351impl WriteApiServer for TransactionExecutionApi {
352 #[instrument(skip(self))]
353 async fn execute_transaction_block(
354 &self,
355 tx_bytes: Base64,
356 signatures: Vec<Base64>,
357 opts: Option<IotaTransactionBlockResponseOptions>,
358 request_type: Option<ExecuteTransactionRequestType>,
359 ) -> RpcResult<IotaTransactionBlockResponse> {
360 self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
361 .trace_timeout(Duration::from_secs(10))
362 .await
363 }
364
365 #[instrument(skip(self))]
367 async fn view_function_call(
368 &self,
369 function_name: String,
370 type_args: Option<Vec<IotaTypeTag>>,
371 call_args: Vec<IotaJsonValue>,
372 ) -> RpcResult<IotaMoveViewCallResults> {
373 let chain = self
374 .state
375 .get_chain_identifier()
376 .map_err(Error::from)?
377 .chain();
378 if !matches!(chain, Chain::Unknown) {
379 return Err(Error::UnsupportedFeature(format!(
380 "View function calls not supported yet on {}",
381 chain.as_str()
382 ))
383 .into());
384 }
385 let MoveFunctionName {
386 package,
387 module,
388 function,
389 } = function_name.as_str().parse().map_err(Error::from)?;
390 let sender = IotaAddress::ZERO;
391 let tx_kind = self
392 .transaction_builder
393 .move_view_call_tx_kind(
394 package,
395 &module,
396 &function,
397 type_args.unwrap_or_default(),
398 call_args,
399 )
400 .await
401 .map_err(Error::from)?;
402 let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(Error::from)?);
403 let dev_inspect_results = self
404 .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
405 .await?;
406 Ok(
407 IotaMoveViewCallResults::from_dev_inspect_results(self.clone(), dev_inspect_results)
408 .await
409 .map_err(Error::from)?,
410 )
411 }
412
413 #[instrument(skip(self))]
414 async fn dev_inspect_transaction_block(
415 &self,
416 sender_address: IotaAddress,
417 tx_bytes: Base64,
418 gas_price: Option<BigInt<u64>>,
419 _epoch: Option<BigInt<u64>>,
420 additional_args: Option<DevInspectArgs>,
421 ) -> RpcResult<DevInspectResults> {
422 async move {
423 let DevInspectArgs {
424 gas_sponsor,
425 gas_budget,
426 gas_objects,
427 show_raw_txn_data_and_effects,
428 skip_checks,
429 } = additional_args.unwrap_or_default();
430 let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
431 self.state
432 .dev_inspect_transaction_block(
433 sender_address,
434 tx_kind,
435 gas_price.map(|i| *i),
436 gas_budget.map(|i| *i),
437 gas_sponsor,
438 gas_objects,
439 show_raw_txn_data_and_effects,
440 skip_checks,
441 )
442 .await
443 .map_err(Error::from)
444 }
445 .trace()
446 .await
447 }
448
449 #[instrument(skip(self))]
450 async fn dry_run_transaction_block(
451 &self,
452 tx_bytes: Base64,
453 ) -> RpcResult<DryRunTransactionBlockResponse> {
454 self.dry_run_transaction_block(tx_bytes).trace().await
455 }
456}
457
458impl IotaRpcModule for TransactionExecutionApi {
459 fn rpc(self) -> RpcModule<Self> {
460 self.into_rpc()
461 }
462
463 fn rpc_doc_module() -> Module {
464 WriteApiOpenRpc::module_doc()
465 }
466}
467
468#[async_trait]
469impl PackageStore for TransactionExecutionApi {
470 async fn fetch(&self, id: AccountAddress) -> Result<Arc<Package>, PackageResolverError> {
471 let backing_store = self.state.get_backing_package_store();
472 match backing_store.get_package_object(&(id.into())) {
473 Ok(Some(pkg)) => Ok(Arc::new(Package::read_from_package(pkg.move_package())?)),
474 Ok(None) => Err(PackageResolverError::PackageNotFound(id)),
475 Err(e) => Err(PackageResolverError::Store {
476 store: "Node",
477 source: Arc::new(e),
478 }),
479 }
480 }
481}