1use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10 authority::AuthorityState, authority_client::NetworkAuthorityClient,
11 transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
14use iota_json_rpc_types::{
15 DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaTransactionBlock,
16 IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
17};
18use iota_metrics::spawn_monitored_task;
19use iota_open_rpc::Module;
20use iota_types::{
21 base_types::IotaAddress,
22 crypto::default_hash,
23 digests::TransactionDigest,
24 effects::TransactionEffectsAPI,
25 iota_serde::BigInt,
26 quorum_driver_types::{
27 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
28 },
29 signature::GenericSignature,
30 storage::PostExecutionPackageResolver,
31 transaction::{
32 InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
33 },
34};
35use jsonrpsee::{RpcModule, core::RpcResult};
36use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
37use tracing::{Instrument, instrument};
38
39use crate::{
40 IotaRpcModule, ObjectProviderCache,
41 authority_state::StateRead,
42 error::{Error, IotaRpcInputError},
43 get_balance_changes_from_effect, get_object_changes,
44 logger::FutureWithTracing,
45};
46
47pub struct TransactionExecutionApi {
48 state: Arc<dyn StateRead>,
49 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
50 metrics: Arc<JsonRpcMetrics>,
51}
52
53impl TransactionExecutionApi {
54 pub fn new(
55 state: Arc<AuthorityState>,
56 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
57 metrics: Arc<JsonRpcMetrics>,
58 ) -> Self {
59 Self {
60 state,
61 transaction_orchestrator,
62 metrics,
63 }
64 }
65
66 pub fn convert_bytes<T: serde::de::DeserializeOwned>(
67 &self,
68 tx_bytes: Base64,
69 ) -> Result<T, IotaRpcInputError> {
70 let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
71 Ok(data)
72 }
73
74 #[expect(clippy::type_complexity)]
75 fn prepare_execute_transaction_block(
76 &self,
77 tx_bytes: Base64,
78 signatures: Vec<Base64>,
79 opts: Option<IotaTransactionBlockResponseOptions>,
80 ) -> Result<
81 (
82 ExecuteTransactionRequestV1,
83 IotaTransactionBlockResponseOptions,
84 IotaAddress,
85 Vec<InputObjectKind>,
86 Transaction,
87 Option<IotaTransactionBlock>,
88 Vec<u8>,
89 ),
90 IotaRpcInputError,
91 > {
92 let opts = opts.unwrap_or_default();
93 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
94 let sender = tx_data.sender();
95 let input_objs = tx_data.input_objects().unwrap_or_default();
96
97 let mut sigs = Vec::new();
98 for sig in signatures {
99 sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
100 }
101 let txn = Transaction::from_generic_sig_data(tx_data, sigs);
102 let raw_transaction = if opts.show_raw_input {
103 bcs::to_bytes(txn.data())?
104 } else {
105 vec![]
106 };
107 let transaction = if opts.show_input {
108 let epoch_store = self.state.load_epoch_store_one_call_per_task();
109
110 Some(IotaTransactionBlock::try_from(
111 txn.data().clone(),
112 epoch_store.module_cache(),
113 *txn.digest(),
114 )?)
115 } else {
116 None
117 };
118
119 let request = ExecuteTransactionRequestV1 {
120 transaction: txn.clone(),
121 include_events: opts.show_events,
122 include_input_objects: opts.show_balance_changes || opts.show_object_changes,
123 include_output_objects: opts.show_balance_changes
124 || opts.show_object_changes
125 || opts.show_events,
127 include_auxiliary_data: false,
128 };
129
130 Ok((
131 request,
132 opts,
133 sender,
134 input_objs,
135 txn,
136 transaction,
137 raw_transaction,
138 ))
139 }
140
141 #[instrument("json_rpc_api_execute_transaction_block", level = "trace", skip_all)]
142 async fn execute_transaction_block(
143 &self,
144 tx_bytes: Base64,
145 signatures: Vec<Base64>,
146 opts: Option<IotaTransactionBlockResponseOptions>,
147 request_type: Option<ExecuteTransactionRequestType>,
148 ) -> Result<IotaTransactionBlockResponse, Error> {
149 let request_type =
150 request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
151 let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
152 self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
153 let digest = *txn.digest();
154
155 let transaction_orchestrator = self.transaction_orchestrator.clone();
156 let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
157
158 tracing::trace!(
159 "Spawning transaction orchestrator task for transaction: {}",
160 digest
161 );
162 let (response, is_executed_locally) = spawn_monitored_task!(
163 transaction_orchestrator.execute_transaction_block(request, request_type, None)
164 )
165 .await?
166 .map_err(Error::from)?;
167 drop(orch_timer);
168
169 self.handle_post_orchestration(
170 response,
171 is_executed_locally,
172 opts,
173 digest,
174 input_objs,
175 transaction,
176 raw_transaction,
177 sender,
178 )
179 .await
180 }
181
182 #[instrument(level = "trace", skip_all)]
183 async fn handle_post_orchestration(
184 &self,
185 response: ExecuteTransactionResponseV1,
186 is_executed_locally: bool,
187 opts: IotaTransactionBlockResponseOptions,
188 digest: TransactionDigest,
189 input_objs: Vec<InputObjectKind>,
190 transaction: Option<IotaTransactionBlock>,
191 raw_transaction: Vec<u8>,
192 sender: IotaAddress,
193 ) -> Result<IotaTransactionBlockResponse, Error> {
194 let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
195
196 let events = if opts.show_events {
197 tracing::trace!("Resolving events");
198 let epoch_store = self.state.load_epoch_store_one_call_per_task();
199 let backing_package_store = PostExecutionPackageResolver::new(
200 self.state.get_backing_package_store().clone(),
201 &response.output_objects,
202 );
203 let mut layout_resolver = epoch_store
204 .executor()
205 .type_layout_resolver(Box::new(backing_package_store));
206 Some(IotaTransactionBlockEvents::try_from(
207 response.events.unwrap_or_default(),
208 digest,
209 None,
210 layout_resolver.as_mut(),
211 )?)
212 } else {
213 None
214 };
215
216 let object_cache = {
217 response.output_objects.map(|output_objects| {
218 ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
219 })
220 };
221
222 let balance_changes = match &object_cache {
223 Some(object_cache) if opts.show_balance_changes => Some(
224 get_balance_changes_from_effect(
225 object_cache,
226 &response.effects.effects,
227 input_objs,
228 None,
229 )
230 .instrument(tracing::trace_span!("resolving balance changes"))
231 .await?,
232 ),
233 _ => None,
234 };
235
236 let object_changes = match &object_cache {
237 Some(object_cache) if opts.show_object_changes => Some(
238 get_object_changes(
239 object_cache,
240 sender,
241 response.effects.effects.modified_at_versions(),
242 response.effects.effects.all_changed_objects(),
243 response.effects.effects.all_removed_objects(),
244 )
245 .instrument(tracing::trace_span!("resolving object changes"))
246 .await?,
247 ),
248 _ => None,
249 };
250
251 let raw_effects = if opts.show_raw_effects {
252 bcs::to_bytes(&response.effects.effects)?
253 } else {
254 vec![]
255 };
256
257 Ok(IotaTransactionBlockResponse {
258 digest,
259 transaction,
260 raw_transaction,
261 effects: opts
262 .show_effects
263 .then_some(response.effects.effects.try_into()?),
264 events,
265 object_changes,
266 balance_changes,
267 timestamp_ms: None,
268 confirmed_local_execution: Some(is_executed_locally),
269 checkpoint: None,
270 errors: vec![],
271 raw_effects,
272 })
273 }
274
275 pub fn prepare_dry_run_transaction_block(
276 &self,
277 tx_bytes: Base64,
278 ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
279 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
280 let input_objs = tx_data.input_objects()?;
281 let intent_msg = IntentMessage::new(
282 Intent {
283 version: IntentVersion::V0,
284 scope: IntentScope::TransactionData,
285 app_id: AppId::Iota,
286 },
287 tx_data,
288 );
289 let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
290 Ok((intent_msg.value, txn_digest, input_objs))
291 }
292
293 async fn dry_run_transaction_block(
294 &self,
295 tx_bytes: Base64,
296 ) -> Result<DryRunTransactionBlockResponse, Error> {
297 let (txn_data, txn_digest, input_objs) =
298 self.prepare_dry_run_transaction_block(tx_bytes)?;
299 let sender = txn_data.sender();
300
301 let state = self.state.clone();
304 let (resp, written_objects, transaction_effects, mock_gas) =
305 tokio::task::spawn_blocking(move || {
306 state.dry_exec_transaction(txn_data.clone(), txn_digest)
307 })
308 .await
309 .map_err(Error::from)??;
310
311 let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
312 let balance_changes = get_balance_changes_from_effect(
313 &object_cache,
314 &transaction_effects,
315 input_objs,
316 mock_gas,
317 )
318 .await?;
319 let object_changes = get_object_changes(
320 &object_cache,
321 sender,
322 transaction_effects.modified_at_versions(),
323 transaction_effects.all_changed_objects(),
324 transaction_effects.all_removed_objects(),
325 )
326 .await?;
327
328 Ok(DryRunTransactionBlockResponse {
329 effects: resp.effects,
330 events: resp.events,
331 object_changes,
332 balance_changes,
333 input: resp.input,
334 suggested_gas_price: resp.suggested_gas_price,
335 })
336 }
337}
338
339#[async_trait]
340impl WriteApiServer for TransactionExecutionApi {
341 #[instrument(skip(self))]
342 async fn execute_transaction_block(
343 &self,
344 tx_bytes: Base64,
345 signatures: Vec<Base64>,
346 opts: Option<IotaTransactionBlockResponseOptions>,
347 request_type: Option<ExecuteTransactionRequestType>,
348 ) -> RpcResult<IotaTransactionBlockResponse> {
349 self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
350 .trace_timeout(Duration::from_secs(10))
351 .await
352 }
353
354 #[instrument(skip(self))]
355 async fn dev_inspect_transaction_block(
356 &self,
357 sender_address: IotaAddress,
358 tx_bytes: Base64,
359 gas_price: Option<BigInt<u64>>,
360 _epoch: Option<BigInt<u64>>,
361 additional_args: Option<DevInspectArgs>,
362 ) -> RpcResult<DevInspectResults> {
363 async move {
364 let DevInspectArgs {
365 gas_sponsor,
366 gas_budget,
367 gas_objects,
368 show_raw_txn_data_and_effects,
369 skip_checks,
370 } = additional_args.unwrap_or_default();
371 let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
372 self.state
373 .dev_inspect_transaction_block(
374 sender_address,
375 tx_kind,
376 gas_price.map(|i| *i),
377 gas_budget.map(|i| *i),
378 gas_sponsor,
379 gas_objects,
380 show_raw_txn_data_and_effects,
381 skip_checks,
382 )
383 .await
384 .map_err(Error::from)
385 }
386 .trace()
387 .await
388 }
389
390 #[instrument(skip(self))]
391 async fn dry_run_transaction_block(
392 &self,
393 tx_bytes: Base64,
394 ) -> RpcResult<DryRunTransactionBlockResponse> {
395 self.dry_run_transaction_block(tx_bytes).trace().await
396 }
397}
398
399impl IotaRpcModule for TransactionExecutionApi {
400 fn rpc(self) -> RpcModule<Self> {
401 self.into_rpc()
402 }
403
404 fn rpc_doc_module() -> Module {
405 WriteApiOpenRpc::module_doc()
406 }
407}