1use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::{encoding::Base64, traits::ToFromBytes};
9use iota_core::{
10 authority::AuthorityState, authority_client::NetworkAuthorityClient,
11 transaction_orchestrator::TransactionOrchestrator,
12};
13use iota_json_rpc_api::{JsonRpcMetrics, WriteApiOpenRpc, WriteApiServer};
14use iota_json_rpc_types::{
15 DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse, IotaTransactionBlock,
16 IotaTransactionBlockEvents, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
17};
18use iota_metrics::spawn_monitored_task;
19use iota_open_rpc::Module;
20use iota_types::{
21 base_types::IotaAddress,
22 crypto::default_hash,
23 digests::TransactionDigest,
24 effects::TransactionEffectsAPI,
25 iota_serde::BigInt,
26 quorum_driver_types::{
27 ExecuteTransactionRequestType, ExecuteTransactionRequestV1, ExecuteTransactionResponseV1,
28 },
29 signature::GenericSignature,
30 storage::PostExecutionPackageResolver,
31 transaction::{
32 InputObjectKind, Transaction, TransactionData, TransactionDataAPI, TransactionKind,
33 },
34};
35use jsonrpsee::{RpcModule, core::RpcResult};
36use shared_crypto::intent::{AppId, Intent, IntentMessage, IntentScope, IntentVersion};
37use tracing::instrument;
38
39use crate::{
40 IotaRpcModule, ObjectProviderCache,
41 authority_state::StateRead,
42 error::{Error, IotaRpcInputError},
43 get_balance_changes_from_effect, get_object_changes,
44 logger::FutureWithTracing,
45};
46
47pub struct TransactionExecutionApi {
48 state: Arc<dyn StateRead>,
49 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
50 metrics: Arc<JsonRpcMetrics>,
51}
52
53impl TransactionExecutionApi {
54 pub fn new(
55 state: Arc<AuthorityState>,
56 transaction_orchestrator: Arc<TransactionOrchestrator<NetworkAuthorityClient>>,
57 metrics: Arc<JsonRpcMetrics>,
58 ) -> Self {
59 Self {
60 state,
61 transaction_orchestrator,
62 metrics,
63 }
64 }
65
66 pub fn convert_bytes<T: serde::de::DeserializeOwned>(
67 &self,
68 tx_bytes: Base64,
69 ) -> Result<T, IotaRpcInputError> {
70 let data: T = bcs::from_bytes(&tx_bytes.to_vec()?)?;
71 Ok(data)
72 }
73
74 #[expect(clippy::type_complexity)]
75 fn prepare_execute_transaction_block(
76 &self,
77 tx_bytes: Base64,
78 signatures: Vec<Base64>,
79 opts: Option<IotaTransactionBlockResponseOptions>,
80 ) -> Result<
81 (
82 ExecuteTransactionRequestV1,
83 IotaTransactionBlockResponseOptions,
84 IotaAddress,
85 Vec<InputObjectKind>,
86 Transaction,
87 Option<IotaTransactionBlock>,
88 Vec<u8>,
89 ),
90 IotaRpcInputError,
91 > {
92 let opts = opts.unwrap_or_default();
93 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
94 let sender = tx_data.sender();
95 let input_objs = tx_data.input_objects().unwrap_or_default();
96
97 let mut sigs = Vec::new();
98 for sig in signatures {
99 sigs.push(GenericSignature::from_bytes(&sig.to_vec()?)?);
100 }
101 let txn = Transaction::from_generic_sig_data(tx_data, sigs);
102 let raw_transaction = if opts.show_raw_input {
103 bcs::to_bytes(txn.data())?
104 } else {
105 vec![]
106 };
107 let transaction = if opts.show_input {
108 let epoch_store = self.state.load_epoch_store_one_call_per_task();
109
110 Some(IotaTransactionBlock::try_from(
111 txn.data().clone(),
112 epoch_store.module_cache(),
113 *txn.digest(),
114 )?)
115 } else {
116 None
117 };
118
119 let request = ExecuteTransactionRequestV1 {
120 transaction: txn.clone(),
121 include_events: opts.show_events,
122 include_input_objects: opts.show_balance_changes || opts.show_object_changes,
123 include_output_objects: opts.show_balance_changes
124 || opts.show_object_changes
125 || opts.show_events,
127 include_auxiliary_data: false,
128 };
129
130 Ok((
131 request,
132 opts,
133 sender,
134 input_objs,
135 txn,
136 transaction,
137 raw_transaction,
138 ))
139 }
140
141 async fn execute_transaction_block(
142 &self,
143 tx_bytes: Base64,
144 signatures: Vec<Base64>,
145 opts: Option<IotaTransactionBlockResponseOptions>,
146 request_type: Option<ExecuteTransactionRequestType>,
147 ) -> Result<IotaTransactionBlockResponse, Error> {
148 let request_type =
149 request_type.unwrap_or(ExecuteTransactionRequestType::WaitForEffectsCert);
150 let (request, opts, sender, input_objs, txn, transaction, raw_transaction) =
151 self.prepare_execute_transaction_block(tx_bytes, signatures, opts)?;
152 let digest = *txn.digest();
153
154 let transaction_orchestrator = self.transaction_orchestrator.clone();
155 let orch_timer = self.metrics.orchestrator_latency_ms.start_timer();
156 let (response, is_executed_locally) = spawn_monitored_task!(
157 transaction_orchestrator.execute_transaction_block(request, request_type, None)
158 )
159 .await?
160 .map_err(Error::from)?;
161 drop(orch_timer);
162
163 self.handle_post_orchestration(
164 response,
165 is_executed_locally,
166 opts,
167 digest,
168 input_objs,
169 transaction,
170 raw_transaction,
171 sender,
172 )
173 .await
174 }
175
176 async fn handle_post_orchestration(
177 &self,
178 response: ExecuteTransactionResponseV1,
179 is_executed_locally: bool,
180 opts: IotaTransactionBlockResponseOptions,
181 digest: TransactionDigest,
182 input_objs: Vec<InputObjectKind>,
183 transaction: Option<IotaTransactionBlock>,
184 raw_transaction: Vec<u8>,
185 sender: IotaAddress,
186 ) -> Result<IotaTransactionBlockResponse, Error> {
187 let _post_orch_timer = self.metrics.post_orchestrator_latency_ms.start_timer();
188
189 let events = if opts.show_events {
190 let epoch_store = self.state.load_epoch_store_one_call_per_task();
191 let backing_package_store = PostExecutionPackageResolver::new(
192 self.state.get_backing_package_store().clone(),
193 &response.output_objects,
194 );
195 let mut layout_resolver = epoch_store
196 .executor()
197 .type_layout_resolver(Box::new(backing_package_store));
198 Some(IotaTransactionBlockEvents::try_from(
199 response.events.unwrap_or_default(),
200 digest,
201 None,
202 layout_resolver.as_mut(),
203 )?)
204 } else {
205 None
206 };
207
208 let object_cache = response.output_objects.map(|output_objects| {
209 ObjectProviderCache::new_with_output_objects(self.state.clone(), output_objects)
210 });
211
212 let balance_changes = match &object_cache {
213 Some(object_cache) if opts.show_balance_changes => Some(
214 get_balance_changes_from_effect(
215 object_cache,
216 &response.effects.effects,
217 input_objs,
218 None,
219 )
220 .await?,
221 ),
222 _ => None,
223 };
224
225 let object_changes = match &object_cache {
226 Some(object_cache) if opts.show_object_changes => Some(
227 get_object_changes(
228 object_cache,
229 sender,
230 response.effects.effects.modified_at_versions(),
231 response.effects.effects.all_changed_objects(),
232 response.effects.effects.all_removed_objects(),
233 )
234 .await?,
235 ),
236 _ => None,
237 };
238
239 let raw_effects = if opts.show_raw_effects {
240 bcs::to_bytes(&response.effects.effects)?
241 } else {
242 vec![]
243 };
244
245 Ok(IotaTransactionBlockResponse {
246 digest,
247 transaction,
248 raw_transaction,
249 effects: opts
250 .show_effects
251 .then_some(response.effects.effects.try_into()?),
252 events,
253 object_changes,
254 balance_changes,
255 timestamp_ms: None,
256 confirmed_local_execution: Some(is_executed_locally),
257 checkpoint: None,
258 errors: vec![],
259 raw_effects,
260 })
261 }
262
263 pub fn prepare_dry_run_transaction_block(
264 &self,
265 tx_bytes: Base64,
266 ) -> Result<(TransactionData, TransactionDigest, Vec<InputObjectKind>), IotaRpcInputError> {
267 let tx_data: TransactionData = self.convert_bytes(tx_bytes)?;
268 let input_objs = tx_data.input_objects()?;
269 let intent_msg = IntentMessage::new(
270 Intent {
271 version: IntentVersion::V0,
272 scope: IntentScope::TransactionData,
273 app_id: AppId::Iota,
274 },
275 tx_data,
276 );
277 let txn_digest = TransactionDigest::new(default_hash(&intent_msg.value));
278 Ok((intent_msg.value, txn_digest, input_objs))
279 }
280
281 async fn dry_run_transaction_block(
282 &self,
283 tx_bytes: Base64,
284 ) -> Result<DryRunTransactionBlockResponse, Error> {
285 let (txn_data, txn_digest, input_objs) =
286 self.prepare_dry_run_transaction_block(tx_bytes)?;
287 let sender = txn_data.sender();
288 let (resp, written_objects, transaction_effects, mock_gas) = self
289 .state
290 .dry_exec_transaction(txn_data.clone(), txn_digest)
291 .await?;
292 let object_cache = ObjectProviderCache::new_with_cache(self.state.clone(), written_objects);
293 let balance_changes = get_balance_changes_from_effect(
294 &object_cache,
295 &transaction_effects,
296 input_objs,
297 mock_gas,
298 )
299 .await?;
300 let object_changes = get_object_changes(
301 &object_cache,
302 sender,
303 transaction_effects.modified_at_versions(),
304 transaction_effects.all_changed_objects(),
305 transaction_effects.all_removed_objects(),
306 )
307 .await?;
308
309 Ok(DryRunTransactionBlockResponse {
310 effects: resp.effects,
311 events: resp.events,
312 object_changes,
313 balance_changes,
314 input: resp.input,
315 })
316 }
317}
318
319#[async_trait]
320impl WriteApiServer for TransactionExecutionApi {
321 #[instrument(skip(self))]
322 async fn execute_transaction_block(
323 &self,
324 tx_bytes: Base64,
325 signatures: Vec<Base64>,
326 opts: Option<IotaTransactionBlockResponseOptions>,
327 request_type: Option<ExecuteTransactionRequestType>,
328 ) -> RpcResult<IotaTransactionBlockResponse> {
329 self.execute_transaction_block(tx_bytes, signatures, opts, request_type)
330 .trace_timeout(Duration::from_secs(10))
331 .await
332 }
333
334 #[instrument(skip(self))]
335 async fn dev_inspect_transaction_block(
336 &self,
337 sender_address: IotaAddress,
338 tx_bytes: Base64,
339 gas_price: Option<BigInt<u64>>,
340 _epoch: Option<BigInt<u64>>,
341 additional_args: Option<DevInspectArgs>,
342 ) -> RpcResult<DevInspectResults> {
343 async move {
344 let DevInspectArgs {
345 gas_sponsor,
346 gas_budget,
347 gas_objects,
348 show_raw_txn_data_and_effects,
349 skip_checks,
350 } = additional_args.unwrap_or_default();
351 let tx_kind: TransactionKind = self.convert_bytes(tx_bytes)?;
352 self.state
353 .dev_inspect_transaction_block(
354 sender_address,
355 tx_kind,
356 gas_price.map(|i| *i),
357 gas_budget.map(|i| *i),
358 gas_sponsor,
359 gas_objects,
360 show_raw_txn_data_and_effects,
361 skip_checks,
362 )
363 .await
364 .map_err(Error::from)
365 }
366 .trace()
367 .await
368 }
369
370 #[instrument(skip(self))]
371 async fn dry_run_transaction_block(
372 &self,
373 tx_bytes: Base64,
374 ) -> RpcResult<DryRunTransactionBlockResponse> {
375 self.dry_run_transaction_block(tx_bytes).trace().await
376 }
377}
378
379impl IotaRpcModule for TransactionExecutionApi {
380 fn rpc(self) -> RpcModule<Self> {
381 self.into_rpc()
382 }
383
384 fn rpc_doc_module() -> Module {
385 WriteApiOpenRpc::module_doc()
386 }
387}