1use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::encoding::Base64;
9use futures::{FutureExt, TryFutureExt};
10use iota_grpc_client::{
11 Client as GrpcClient, ReadMask,
12 read_mask_fields::{EpochField, SimulateExecutedTransactionField, SimulateField},
13};
14use iota_json::IotaJsonValue;
15use iota_json_rpc::{
16 IotaRpcModule, ObjectProvider, get_balance_changes_from_effect, get_object_changes,
17};
18use iota_json_rpc_api::WriteApiServer;
19use iota_json_rpc_types::{
20 DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
21 ExecuteTransactionRequestType, IotaMoveViewCallResults, IotaTransactionBlock,
22 IotaTransactionBlockEffects, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
23 IotaTypeTag, MoveFunctionName,
24};
25use iota_open_rpc::Module;
26use iota_package_resolver::{PackageStore, Resolver};
27use iota_protocol_config::Chain;
28use iota_sdk_types::ObjectId;
29use iota_transaction_builder::TransactionBuilder;
30use iota_types::{
31 base_types::{IotaAddress, SequenceNumber},
32 digests::TransactionDigest,
33 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEffectsExt},
34 error::ExecutionError,
35 iota_serde::BigInt,
36 object::{Object, PastObjectRead},
37 signature::GenericSignature,
38 transaction::{
39 GasData, SenderSignedData, TransactionData, TransactionDataAPI, TransactionDataV1,
40 TransactionExpiration, TransactionKind,
41 },
42};
43use jsonrpsee::{RpcModule, core::RpcResult};
44
45use crate::{
46 apis::error::Error as ApiError,
47 errors::{IndexerError, IndexerResult},
48 ingestion::primary::prepare::InMemObjectCache,
49 models::transactions::{StoredTransaction, tx_events_to_iota_tx_events},
50 optimistic_indexing::{IngestionPath, OptimisticTransactionExecutor},
51 read::IndexerReader,
52 store::package_resolver::IndexerStorePackageResolver,
53 types::{IndexedObjectChange, grpc_conversion},
54};
55
56const DRY_RUN_TRANSACTION_READ_MASK: &[&str] = &[
58 SimulateExecutedTransactionField::SIGNATURES_BCS,
59 SimulateExecutedTransactionField::EFFECTS_BCS,
60 SimulateExecutedTransactionField::EVENTS_EVENTS_BCS,
61 SimulateExecutedTransactionField::INPUT_OBJECTS_BCS,
62 SimulateExecutedTransactionField::OUTPUT_OBJECTS_BCS,
63 SimulateField::SUGGESTED_GAS_PRICE,
64 SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_SOURCE,
65];
66const DEV_INSPECT_TRANSACTION_READ_MASK: &[&str] = &[
67 SimulateExecutedTransactionField::EFFECTS_BCS,
68 SimulateExecutedTransactionField::EVENTS_EVENTS_BCS,
69 SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_BCS_KIND,
70 SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_SOURCE,
71 SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_COMMAND_INDEX,
72 SimulateField::EXECUTION_RESULT_COMMAND_RESULTS_MUTATED_BY_REF,
73 SimulateField::EXECUTION_RESULT_COMMAND_RESULTS_RETURN_VALUES,
74];
75
76#[derive(Clone)]
77pub struct WriteApi {
78 fullnode_grpc_client: GrpcClient,
79 transaction_builder: TransactionBuilder,
80 package_resolver: Arc<Resolver<IndexerStorePackageResolver>>,
81 reader: Arc<IndexerReader>,
82}
83
84#[derive(Clone)]
85pub struct OptimisticWriteApi {
86 write_api: WriteApi,
87 optimistic_tx_executor: OptimisticTransactionExecutor,
88}
89
90impl WriteApi {
91 pub fn new(fullnode_grpc_client: GrpcClient, reader: IndexerReader) -> Self {
92 let package_resolver = IndexerStorePackageResolver::new(reader.get_pool());
93 let data_reader = Arc::new(reader);
94 Self {
95 reader: data_reader.clone(),
96 fullnode_grpc_client,
97 transaction_builder: TransactionBuilder::new(data_reader),
98 package_resolver: Arc::new(Resolver::new(package_resolver)),
99 }
100 }
101
102 async fn dry_run_transaction_block_impl(
103 &self,
104 tx_bytes: Base64,
105 package_resolver: &Arc<Resolver<impl PackageStore>>,
106 ) -> IndexerResult<DryRunTransactionBlockResponse> {
107 let transaction_data = bcs::from_bytes::<TransactionData>(&tx_bytes.to_vec()?)?;
108 let tx_digest = transaction_data.digest();
109
110 let simulate_tx_response = self
111 .fullnode_grpc_client
112 .simulate_transaction(
113 transaction_data.clone(),
114 false,
115 Some(ReadMask::from(DRY_RUN_TRANSACTION_READ_MASK)),
116 )
117 .await?
118 .into_inner();
119
120 let executed_transaction = simulate_tx_response.executed_transaction()?;
121 let execution_error_source = simulate_tx_response
122 .execution_error()
123 .and_then(|e| e.source.clone());
124 let suggested_gas_price = simulate_tx_response.suggested_gas_price;
125
126 let input_objects = grpc_conversion::objects(executed_transaction.input_objects()?)?;
127 let output_objects = grpc_conversion::objects(executed_transaction.output_objects()?)?;
128
129 let objects = input_objects
130 .iter()
131 .chain(output_objects.iter())
132 .collect::<Vec<_>>();
133
134 let tx_effects: TransactionEffects = executed_transaction.effects()?.effects()?;
135
136 let tx_signatures = executed_transaction
137 .signatures()?
138 .signatures
139 .iter()
140 .map(|s| -> IndexerResult<_> { Ok(GenericSignature::try_from(s.signature()?)?) })
141 .collect::<IndexerResult<Vec<GenericSignature>>>()?;
142
143 let sender_signed_data = SenderSignedData::new(transaction_data.clone(), tx_signatures);
144
145 let tx_events = executed_transaction.events()?.events()?;
146
147 let in_mem_tx_changes = TxObjectResolver::new(&objects, self.reader.clone());
148
149 let fut1 = in_mem_tx_changes
151 .get_changes(&transaction_data, &tx_effects, &tx_digest)
152 .map_ok(|(balance_changes, object_changes)| {
153 (
154 balance_changes,
155 object_changes
156 .into_iter()
157 .map(iota_json_rpc_types::ObjectChange::from)
158 .collect::<Vec<_>>(),
159 )
160 });
161
162 let fut2 = IotaTransactionBlock::try_from_with_package_resolver(
163 sender_signed_data,
164 package_resolver,
165 tx_digest,
166 )
167 .map_err(Into::into);
168
169 let fut3 = tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, None);
172
173 let fut4 = IotaTransactionBlockEffects::from_native_with_clever_error(
174 tx_effects.clone(),
175 package_resolver,
176 )
177 .map(Ok);
178
179 let ((balance_changes, object_changes), transaction_block, events, effects) =
180 futures::future::try_join4(fut1, fut2, fut3, fut4).await?;
181
182 Ok(DryRunTransactionBlockResponse {
183 effects,
184 events,
185 object_changes,
186 balance_changes,
187 input: transaction_block.data,
188 suggested_gas_price,
189 execution_error_source,
190 })
191 }
192
193 async fn dev_inspect_transaction_block_impl(
194 &self,
195 sender_address: IotaAddress,
196 tx_bytes: Base64,
197 gas_price: Option<BigInt<u64>>,
198 additional_args: Option<DevInspectArgs>,
199 package_resolver: &Arc<Resolver<impl PackageStore>>,
200 ) -> IndexerResult<DevInspectResults> {
201 let DevInspectArgs {
202 gas_sponsor,
203 gas_budget,
204 gas_objects,
205 show_raw_txn_data_and_effects,
206 skip_checks,
207 } = additional_args.unwrap_or_default();
208
209 let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
210 let skip_checks = skip_checks.unwrap_or(true);
211
212 let (price, budget) = match (gas_price, gas_budget) {
213 (Some(price), Some(budget)) => (price.into_inner(), budget),
214 (price, budget) => {
215 let (ref_price, max_gas) = self.reference_gas_price_and_max_tx_gas().await?;
216 (
217 price.map(BigInt::into_inner).unwrap_or(ref_price),
218 budget.unwrap_or(max_gas),
219 )
220 }
221 };
222
223 let owner = gas_sponsor.unwrap_or(sender_address);
224 let payment = gas_objects.unwrap_or_default();
225
226 let kind = bcs::from_bytes::<TransactionKind>(&tx_bytes.to_vec()?)?;
227
228 let transaction_data = TransactionData::V1(TransactionDataV1 {
229 kind,
230 sender: sender_address,
231 gas_payment: GasData {
232 objects: payment,
233 owner,
234 price,
235 budget,
236 },
237 expiration: TransactionExpiration::None,
238 });
239
240 let raw_txn_data = show_raw_txn_data_and_effects
241 .then(|| bcs::to_bytes(&transaction_data))
242 .transpose()?
243 .unwrap_or_default();
244
245 let simulate_tx_response = self
246 .fullnode_grpc_client
247 .simulate_transaction(
248 transaction_data,
249 skip_checks,
250 Some(ReadMask::from(DEV_INSPECT_TRANSACTION_READ_MASK)),
251 )
252 .await?
253 .into_inner();
254
255 let executed_transaction = simulate_tx_response.executed_transaction()?;
256
257 let tx_effects: TransactionEffects = executed_transaction.effects()?.effects()?;
258
259 let raw_effects = show_raw_txn_data_and_effects
260 .then(|| bcs::to_bytes(&tx_effects))
261 .transpose()?
262 .unwrap_or_default();
263
264 let tx_events = executed_transaction.events()?.events()?;
265
266 let tx_digest = *tx_effects.transaction_digest();
267 let events =
270 tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, None).await?;
271
272 let execution_error = simulate_tx_response
273 .execution_error()
274 .map(|execution_error| -> IndexerResult<_> {
275 let exec_err = execution_error.error_kind()?;
276 let source = execution_error
277 .source
278 .clone()
279 .map(|s| -> Box<dyn std::error::Error + Send + Sync> { s.into() });
280
281 let mut error = ExecutionError::new(exec_err, source);
282 if let Some(command_index) = execution_error.command_index {
283 error = error.with_command_index(command_index);
284 }
285 Ok(error.to_string())
286 })
287 .transpose()?;
288
289 let results = simulate_tx_response
290 .command_results()
291 .map(|command_results| grpc_conversion::command_results(command_results.clone()))
292 .transpose()?;
293
294 Ok(DevInspectResults {
295 effects: tx_effects.try_into()?,
296 events,
297 results,
298 error: execution_error,
299 raw_txn_data,
300 raw_effects,
301 })
302 }
303
304 async fn reference_gas_price_and_max_tx_gas(&self) -> IndexerResult<(u64, u64)> {
306 let epoch = self
307 .fullnode_grpc_client
308 .get_epoch(
309 None, {
311 let max_tx_gas = EpochField::attribute("max_tx_gas");
312 Some(ReadMask::from(&[
313 EpochField::REFERENCE_GAS_PRICE,
314 &max_tx_gas,
315 ]))
316 },
317 )
318 .await?
319 .into_inner();
320
321 let max_tx_gas = epoch
322 .protocol_config()?
323 .attributes()?
324 .get("max_tx_gas")
325 .ok_or_else(|| {
326 IndexerError::Grpc("protocol_config's `max_tx_gas` should be available".into())
327 })?
328 .parse::<u64>()
329 .map_err(|e| IndexerError::Grpc(e.to_string()))?;
330
331 Ok((epoch.reference_gas_price(), max_tx_gas))
332 }
333}
334
335impl OptimisticWriteApi {
336 pub fn new(write_api: WriteApi, optimistic_tx_executor: OptimisticTransactionExecutor) -> Self {
337 Self {
338 write_api,
339 optimistic_tx_executor,
340 }
341 }
342
343 async fn build_response(
344 &self,
345 ingestion_path: IngestionPath,
346 options: IotaTransactionBlockResponseOptions,
347 ) -> Result<IotaTransactionBlockResponse, IndexerError> {
348 let package_resolver = self.write_api.package_resolver.clone();
349 let stored_transaction = StoredTransaction::from(ingestion_path);
350 stored_transaction
351 .try_into_iota_transaction_block_response(options, &package_resolver)
352 .await
353 }
354
355 pub fn executor(&self) -> &OptimisticTransactionExecutor {
356 &self.optimistic_tx_executor
357 }
358}
359
360#[async_trait]
361impl WriteApiServer for WriteApi {
362 async fn execute_transaction_block(
365 &self,
366 _tx_bytes: Base64,
367 _signatures: Vec<Base64>,
368 _options: Option<IotaTransactionBlockResponseOptions>,
369 _request_type: Option<ExecuteTransactionRequestType>,
370 ) -> RpcResult<IotaTransactionBlockResponse> {
371 Err(IndexerError::Generic(
372 "execute_transaction_block should be called from OptimisticWriteApi".into(),
373 )
374 .into())
375 }
376
377 async fn dev_inspect_transaction_block(
378 &self,
379 sender_address: IotaAddress,
380 tx_bytes: Base64,
381 gas_price: Option<BigInt<u64>>,
382 _epoch: Option<BigInt<u64>>,
383 additional_args: Option<DevInspectArgs>,
384 ) -> RpcResult<DevInspectResults> {
385 self.dev_inspect_transaction_block_impl(
386 sender_address,
387 tx_bytes,
388 gas_price,
389 additional_args,
390 &self.package_resolver,
391 )
392 .await
393 .map_err(Into::into)
394 }
395
396 async fn dry_run_transaction_block(
397 &self,
398 tx_bytes: Base64,
399 ) -> RpcResult<DryRunTransactionBlockResponse> {
400 self.dry_run_transaction_block_impl(tx_bytes, &self.package_resolver)
401 .await
402 .map_err(Into::into)
403 }
404
405 async fn view_function_call(
406 &self,
407 function_name: String,
408 type_args: Option<Vec<IotaTypeTag>>,
409 arguments: Vec<IotaJsonValue>,
410 ) -> RpcResult<IotaMoveViewCallResults> {
411 let MoveFunctionName {
412 package,
413 module,
414 function,
415 } = function_name.as_str().parse().map_err(IndexerError::from)?;
416 let sender = IotaAddress::ZERO;
417 let tx_kind = self
418 .transaction_builder
419 .move_view_call_tx_kind(
420 package,
421 &module,
422 &function,
423 type_args.unwrap_or_default(),
424 arguments,
425 )
426 .await
427 .map_err(IndexerError::from)?;
428 let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(IndexerError::from)?);
429 let dev_inspect_results = self
430 .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
431 .await?;
432 Ok(IotaMoveViewCallResults::from_dev_inspect_results(
433 self.package_resolver.package_store().clone(),
434 dev_inspect_results,
435 )
436 .await
437 .map_err(IndexerError::from)?)
438 }
439}
440
441#[async_trait]
442impl WriteApiServer for OptimisticWriteApi {
443 async fn execute_transaction_block(
444 &self,
445 tx_bytes: Base64,
446 signatures: Vec<Base64>,
447 options: Option<IotaTransactionBlockResponseOptions>,
448 _request_type: Option<ExecuteTransactionRequestType>,
449 ) -> RpcResult<IotaTransactionBlockResponse> {
450 let ingestion_path = self
451 .optimistic_tx_executor
452 .execute_and_index_transaction(tx_bytes, signatures)
453 .await?;
454 Ok(self
455 .build_response(ingestion_path, options.unwrap_or_default())
456 .await?)
457 }
458
459 async fn dev_inspect_transaction_block(
460 &self,
461 sender_address: IotaAddress,
462 tx_bytes: Base64,
463 gas_price: Option<BigInt<u64>>,
464 epoch: Option<BigInt<u64>>,
465 additional_args: Option<DevInspectArgs>,
466 ) -> RpcResult<DevInspectResults> {
467 self.write_api
468 .dev_inspect_transaction_block(
469 sender_address,
470 tx_bytes,
471 gas_price,
472 epoch,
473 additional_args,
474 )
475 .await
476 }
477
478 async fn dry_run_transaction_block(
479 &self,
480 tx_bytes: Base64,
481 ) -> RpcResult<DryRunTransactionBlockResponse> {
482 self.write_api.dry_run_transaction_block(tx_bytes).await
483 }
484
485 async fn view_function_call(
486 &self,
487 function_name: String,
488 type_args: Option<Vec<IotaTypeTag>>,
489 arguments: Vec<IotaJsonValue>,
490 ) -> RpcResult<IotaMoveViewCallResults> {
491 let chain = self
492 .optimistic_tx_executor
493 .read
494 .get_chain_identifier_in_blocking_task()
495 .await?
496 .chain();
497 if !matches!(chain, Chain::Unknown) {
498 return Err(ApiError::UnsupportedFeature(format!(
499 "View calls are not yet supported on {}",
500 chain.as_str()
501 ))
502 .into());
503 }
504
505 self.write_api
506 .view_function_call(function_name, type_args, arguments)
507 .await
508 }
509}
510
511impl IotaRpcModule for WriteApi {
512 fn rpc(self) -> RpcModule<Self> {
513 self.into_rpc()
514 }
515
516 fn rpc_doc_module() -> Module {
517 iota_json_rpc_api::WriteApiOpenRpc::module_doc()
518 }
519}
520
521impl IotaRpcModule for OptimisticWriteApi {
522 fn rpc(self) -> RpcModule<Self> {
523 self.into_rpc()
524 }
525
526 fn rpc_doc_module() -> Module {
527 iota_json_rpc_api::WriteApiOpenRpc::module_doc()
528 }
529}
530
531pub struct TxObjectResolver {
537 object_cache: InMemObjectCache,
538 reader: Arc<IndexerReader>,
539}
540
541impl TxObjectResolver {
542 pub fn new(objects: &[&Object], reader: Arc<IndexerReader>) -> Self {
543 let mut object_cache = InMemObjectCache::new();
544 for obj in objects {
545 object_cache.insert_object(<&Object>::clone(obj).clone());
546 }
547 Self {
548 object_cache,
549 reader,
550 }
551 }
552
553 async fn get_past_object_read_with_retry(
554 &self,
555 id: ObjectId,
556 version: SequenceNumber,
557 ) -> IndexerResult<PastObjectRead> {
558 let backoff = backoff::ExponentialBackoff {
559 initial_interval: Duration::from_millis(100),
560 max_elapsed_time: Some(Duration::from_secs(3)),
561 multiplier: 2.0,
562 ..Default::default()
563 };
564
565 backoff::future::retry(backoff, || async {
566 self.reader
567 .get_past_object_read_with_fallback(id, version, false)
568 .await
569 .map_err(backoff::Error::transient)
570 })
571 .await
572 }
573
574 pub(crate) async fn get_changes(
575 &self,
576 tx: &TransactionData,
577 effects: &TransactionEffects,
578 tx_digest: &TransactionDigest,
579 ) -> IndexerResult<(
580 Vec<iota_json_rpc_types::BalanceChange>,
581 Vec<IndexedObjectChange>,
582 )> {
583 let object_changes: Vec<_> = get_object_changes(
584 self,
585 tx.sender(),
586 effects.modified_at_versions(),
587 effects.all_changed_objects(),
588 effects.all_removed_objects(),
589 )
590 .await?
591 .into_iter()
592 .map(IndexedObjectChange::from)
593 .collect();
594 let balance_changes = get_balance_changes_from_effect(
595 self,
596 effects,
597 tx.input_objects().unwrap_or_else(|e| {
598 panic!("checkpointed tx {tx_digest} has invalid input objects: {e}")
599 }),
600 None,
601 )
602 .await?;
603 Ok((balance_changes, object_changes))
604 }
605}
606
607#[async_trait]
608impl ObjectProvider for TxObjectResolver {
609 type Error = IndexerError;
610
611 async fn get_object(
612 &self,
613 id: &ObjectId,
614 version: &SequenceNumber,
615 ) -> Result<Object, Self::Error> {
616 if let Some(o) = self.object_cache.get(id, Some(version)) {
618 return Ok(o.clone());
619 }
620
621 let past_read = self.get_past_object_read_with_retry(*id, *version).await?;
622
623 past_read.into_object().map_err(|e| {
624 IndexerError::Generic(format!(
625 "object {id} at version {version} not found in cache or indexer DB: {e}"
626 ))
627 })
628 }
629
630 async fn find_object_lt_or_eq_version(
631 &self,
632 id: &ObjectId,
633 version: &SequenceNumber,
634 ) -> Result<Option<Object>, Self::Error> {
635 if let Some(o) = self.object_cache.get(id, Some(version)) {
637 return Ok(Some(o.clone()));
638 }
639
640 if let Some(o) = self.object_cache.get(id, None) {
642 if o.version() <= *version {
643 return Ok(Some(o.clone()));
644 }
645 }
646
647 self.get_past_object_read_with_retry(*id, *version)
648 .await
649 .map(|past_read| past_read.into_object().ok())
650 }
651}