Skip to main content

iota_indexer/apis/
write_api.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use fastcrypto::encoding::Base64;
9use futures::{FutureExt, TryFutureExt};
10use iota_grpc_client::{
11    Client as GrpcClient, ReadMask,
12    read_mask_fields::{EpochField, SimulateExecutedTransactionField, SimulateField},
13};
14use iota_json::IotaJsonValue;
15use iota_json_rpc::{
16    IotaRpcModule, ObjectProvider, get_balance_changes_from_effect, get_object_changes,
17};
18use iota_json_rpc_api::WriteApiServer;
19use iota_json_rpc_types::{
20    DevInspectArgs, DevInspectResults, DryRunTransactionBlockResponse,
21    ExecuteTransactionRequestType, IotaMoveViewCallResults, IotaTransactionBlock,
22    IotaTransactionBlockEffects, IotaTransactionBlockResponse, IotaTransactionBlockResponseOptions,
23    IotaTypeTag, MoveFunctionName,
24};
25use iota_open_rpc::Module;
26use iota_package_resolver::{PackageStore, Resolver};
27use iota_protocol_config::Chain;
28use iota_sdk_types::ObjectId;
29use iota_transaction_builder::TransactionBuilder;
30use iota_types::{
31    base_types::{IotaAddress, SequenceNumber},
32    digests::TransactionDigest,
33    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEffectsExt},
34    error::ExecutionError,
35    iota_serde::BigInt,
36    object::{Object, PastObjectRead},
37    signature::GenericSignature,
38    transaction::{
39        GasData, SenderSignedData, TransactionData, TransactionDataAPI, TransactionDataV1,
40        TransactionExpiration, TransactionKind,
41    },
42};
43use jsonrpsee::{RpcModule, core::RpcResult};
44
45use crate::{
46    apis::error::Error as ApiError,
47    errors::{IndexerError, IndexerResult},
48    ingestion::primary::prepare::InMemObjectCache,
49    models::transactions::{StoredTransaction, tx_events_to_iota_tx_events},
50    optimistic_indexing::{IngestionPath, OptimisticTransactionExecutor},
51    read::IndexerReader,
52    store::package_resolver::IndexerStorePackageResolver,
53    types::{IndexedObjectChange, grpc_conversion},
54};
55
56// As an optimization, we're trying to request only the fields we actually need.
57const DRY_RUN_TRANSACTION_READ_MASK: &[&str] = &[
58    SimulateExecutedTransactionField::SIGNATURES_BCS,
59    SimulateExecutedTransactionField::EFFECTS_BCS,
60    SimulateExecutedTransactionField::EVENTS_EVENTS_BCS,
61    SimulateExecutedTransactionField::INPUT_OBJECTS_BCS,
62    SimulateExecutedTransactionField::OUTPUT_OBJECTS_BCS,
63    SimulateField::SUGGESTED_GAS_PRICE,
64    SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_SOURCE,
65];
66const DEV_INSPECT_TRANSACTION_READ_MASK: &[&str] = &[
67    SimulateExecutedTransactionField::EFFECTS_BCS,
68    SimulateExecutedTransactionField::EVENTS_EVENTS_BCS,
69    SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_BCS_KIND,
70    SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_SOURCE,
71    SimulateField::EXECUTION_RESULT_EXECUTION_ERROR_COMMAND_INDEX,
72    SimulateField::EXECUTION_RESULT_COMMAND_RESULTS_MUTATED_BY_REF,
73    SimulateField::EXECUTION_RESULT_COMMAND_RESULTS_RETURN_VALUES,
74];
75
76#[derive(Clone)]
77pub struct WriteApi {
78    fullnode_grpc_client: GrpcClient,
79    transaction_builder: TransactionBuilder,
80    package_resolver: Arc<Resolver<IndexerStorePackageResolver>>,
81    reader: Arc<IndexerReader>,
82}
83
84#[derive(Clone)]
85pub struct OptimisticWriteApi {
86    write_api: WriteApi,
87    optimistic_tx_executor: OptimisticTransactionExecutor,
88}
89
90impl WriteApi {
91    pub fn new(fullnode_grpc_client: GrpcClient, reader: IndexerReader) -> Self {
92        let package_resolver = IndexerStorePackageResolver::new(reader.get_pool());
93        let data_reader = Arc::new(reader);
94        Self {
95            reader: data_reader.clone(),
96            fullnode_grpc_client,
97            transaction_builder: TransactionBuilder::new(data_reader),
98            package_resolver: Arc::new(Resolver::new(package_resolver)),
99        }
100    }
101
102    async fn dry_run_transaction_block_impl(
103        &self,
104        tx_bytes: Base64,
105        package_resolver: &Arc<Resolver<impl PackageStore>>,
106    ) -> IndexerResult<DryRunTransactionBlockResponse> {
107        let transaction_data = bcs::from_bytes::<TransactionData>(&tx_bytes.to_vec()?)?;
108        let tx_digest = transaction_data.digest();
109
110        let simulate_tx_response = self
111            .fullnode_grpc_client
112            .simulate_transaction(
113                transaction_data.clone(),
114                false,
115                Some(ReadMask::from(DRY_RUN_TRANSACTION_READ_MASK)),
116            )
117            .await?
118            .into_inner();
119
120        let executed_transaction = simulate_tx_response.executed_transaction()?;
121        let execution_error_source = simulate_tx_response
122            .execution_error()
123            .and_then(|e| e.source.clone());
124        let suggested_gas_price = simulate_tx_response.suggested_gas_price;
125
126        let input_objects = grpc_conversion::objects(executed_transaction.input_objects()?)?;
127        let output_objects = grpc_conversion::objects(executed_transaction.output_objects()?)?;
128
129        let objects = input_objects
130            .iter()
131            .chain(output_objects.iter())
132            .collect::<Vec<_>>();
133
134        let tx_effects: TransactionEffects = executed_transaction.effects()?.effects()?;
135
136        let tx_signatures = executed_transaction
137            .signatures()?
138            .signatures
139            .iter()
140            .map(|s| -> IndexerResult<_> { Ok(GenericSignature::try_from(s.signature()?)?) })
141            .collect::<IndexerResult<Vec<GenericSignature>>>()?;
142
143        let sender_signed_data = SenderSignedData::new(transaction_data.clone(), tx_signatures);
144
145        let tx_events = executed_transaction.events()?.events()?;
146
147        let in_mem_tx_changes = TxObjectResolver::new(&objects, self.reader.clone());
148
149        // as a minor optimization we will run concurrently the following four futures
150        let fut1 = in_mem_tx_changes
151            .get_changes(&transaction_data, &tx_effects, &tx_digest)
152            .map_ok(|(balance_changes, object_changes)| {
153                (
154                    balance_changes,
155                    object_changes
156                        .into_iter()
157                        .map(iota_json_rpc_types::ObjectChange::from)
158                        .collect::<Vec<_>>(),
159                )
160            });
161
162        let fut2 = IotaTransactionBlock::try_from_with_package_resolver(
163            sender_signed_data,
164            package_resolver,
165            tx_digest,
166        )
167        .map_err(Into::into);
168
169        // timestamp is None because it represent a checkpoint one, on a dry run
170        // operation we don't have this information.
171        let fut3 = tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, None);
172
173        let fut4 = IotaTransactionBlockEffects::from_native_with_clever_error(
174            tx_effects.clone(),
175            package_resolver,
176        )
177        .map(Ok);
178
179        let ((balance_changes, object_changes), transaction_block, events, effects) =
180            futures::future::try_join4(fut1, fut2, fut3, fut4).await?;
181
182        Ok(DryRunTransactionBlockResponse {
183            effects,
184            events,
185            object_changes,
186            balance_changes,
187            input: transaction_block.data,
188            suggested_gas_price,
189            execution_error_source,
190        })
191    }
192
193    async fn dev_inspect_transaction_block_impl(
194        &self,
195        sender_address: IotaAddress,
196        tx_bytes: Base64,
197        gas_price: Option<BigInt<u64>>,
198        additional_args: Option<DevInspectArgs>,
199        package_resolver: &Arc<Resolver<impl PackageStore>>,
200    ) -> IndexerResult<DevInspectResults> {
201        let DevInspectArgs {
202            gas_sponsor,
203            gas_budget,
204            gas_objects,
205            show_raw_txn_data_and_effects,
206            skip_checks,
207        } = additional_args.unwrap_or_default();
208
209        let show_raw_txn_data_and_effects = show_raw_txn_data_and_effects.unwrap_or(false);
210        let skip_checks = skip_checks.unwrap_or(true);
211
212        let (price, budget) = match (gas_price, gas_budget) {
213            (Some(price), Some(budget)) => (price.into_inner(), budget),
214            (price, budget) => {
215                let (ref_price, max_gas) = self.reference_gas_price_and_max_tx_gas().await?;
216                (
217                    price.map(BigInt::into_inner).unwrap_or(ref_price),
218                    budget.unwrap_or(max_gas),
219                )
220            }
221        };
222
223        let owner = gas_sponsor.unwrap_or(sender_address);
224        let payment = gas_objects.unwrap_or_default();
225
226        let kind = bcs::from_bytes::<TransactionKind>(&tx_bytes.to_vec()?)?;
227
228        let transaction_data = TransactionData::V1(TransactionDataV1 {
229            kind,
230            sender: sender_address,
231            gas_payment: GasData {
232                objects: payment,
233                owner,
234                price,
235                budget,
236            },
237            expiration: TransactionExpiration::None,
238        });
239
240        let raw_txn_data = show_raw_txn_data_and_effects
241            .then(|| bcs::to_bytes(&transaction_data))
242            .transpose()?
243            .unwrap_or_default();
244
245        let simulate_tx_response = self
246            .fullnode_grpc_client
247            .simulate_transaction(
248                transaction_data,
249                skip_checks,
250                Some(ReadMask::from(DEV_INSPECT_TRANSACTION_READ_MASK)),
251            )
252            .await?
253            .into_inner();
254
255        let executed_transaction = simulate_tx_response.executed_transaction()?;
256
257        let tx_effects: TransactionEffects = executed_transaction.effects()?.effects()?;
258
259        let raw_effects = show_raw_txn_data_and_effects
260            .then(|| bcs::to_bytes(&tx_effects))
261            .transpose()?
262            .unwrap_or_default();
263
264        let tx_events = executed_transaction.events()?.events()?;
265
266        let tx_digest = *tx_effects.transaction_digest();
267        // timestamp is None because it represent a checkpoint one, on a dev inspect
268        // operation we don't have this information.
269        let events =
270            tx_events_to_iota_tx_events(tx_events, package_resolver, tx_digest, None).await?;
271
272        let execution_error = simulate_tx_response
273            .execution_error()
274            .map(|execution_error| -> IndexerResult<_> {
275                let exec_err = execution_error.error_kind()?;
276                let source = execution_error
277                    .source
278                    .clone()
279                    .map(|s| -> Box<dyn std::error::Error + Send + Sync> { s.into() });
280
281                let mut error = ExecutionError::new(exec_err, source);
282                if let Some(command_index) = execution_error.command_index {
283                    error = error.with_command_index(command_index);
284                }
285                Ok(error.to_string())
286            })
287            .transpose()?;
288
289        let results = simulate_tx_response
290            .command_results()
291            .map(|command_results| grpc_conversion::command_results(command_results.clone()))
292            .transpose()?;
293
294        Ok(DevInspectResults {
295            effects: tx_effects.try_into()?,
296            events,
297            results,
298            error: execution_error,
299            raw_txn_data,
300            raw_effects,
301        })
302    }
303
304    /// Gets the reference gas price and max transaction gas from the gRPC API.
305    async fn reference_gas_price_and_max_tx_gas(&self) -> IndexerResult<(u64, u64)> {
306        let epoch = self
307            .fullnode_grpc_client
308            .get_epoch(
309                None, // we're requesting the information for the current epoch.
310                {
311                    let max_tx_gas = EpochField::attribute("max_tx_gas");
312                    Some(ReadMask::from(&[
313                        EpochField::REFERENCE_GAS_PRICE,
314                        &max_tx_gas,
315                    ]))
316                },
317            )
318            .await?
319            .into_inner();
320
321        let max_tx_gas = epoch
322            .protocol_config()?
323            .attributes()?
324            .get("max_tx_gas")
325            .ok_or_else(|| {
326                IndexerError::Grpc("protocol_config's `max_tx_gas` should be available".into())
327            })?
328            .parse::<u64>()
329            .map_err(|e| IndexerError::Grpc(e.to_string()))?;
330
331        Ok((epoch.reference_gas_price(), max_tx_gas))
332    }
333}
334
335impl OptimisticWriteApi {
336    pub fn new(write_api: WriteApi, optimistic_tx_executor: OptimisticTransactionExecutor) -> Self {
337        Self {
338            write_api,
339            optimistic_tx_executor,
340        }
341    }
342
343    async fn build_response(
344        &self,
345        ingestion_path: IngestionPath,
346        options: IotaTransactionBlockResponseOptions,
347    ) -> Result<IotaTransactionBlockResponse, IndexerError> {
348        let package_resolver = self.write_api.package_resolver.clone();
349        let stored_transaction = StoredTransaction::from(ingestion_path);
350        stored_transaction
351            .try_into_iota_transaction_block_response(options, &package_resolver)
352            .await
353    }
354
355    pub fn executor(&self) -> &OptimisticTransactionExecutor {
356        &self.optimistic_tx_executor
357    }
358}
359
360#[async_trait]
361impl WriteApiServer for WriteApi {
362    /// This method will always return an error. The user shall use the
363    /// [`OptimisticWriteApi`] to execute transactions.
364    async fn execute_transaction_block(
365        &self,
366        _tx_bytes: Base64,
367        _signatures: Vec<Base64>,
368        _options: Option<IotaTransactionBlockResponseOptions>,
369        _request_type: Option<ExecuteTransactionRequestType>,
370    ) -> RpcResult<IotaTransactionBlockResponse> {
371        Err(IndexerError::Generic(
372            "execute_transaction_block should be called from OptimisticWriteApi".into(),
373        )
374        .into())
375    }
376
377    async fn dev_inspect_transaction_block(
378        &self,
379        sender_address: IotaAddress,
380        tx_bytes: Base64,
381        gas_price: Option<BigInt<u64>>,
382        _epoch: Option<BigInt<u64>>,
383        additional_args: Option<DevInspectArgs>,
384    ) -> RpcResult<DevInspectResults> {
385        self.dev_inspect_transaction_block_impl(
386            sender_address,
387            tx_bytes,
388            gas_price,
389            additional_args,
390            &self.package_resolver,
391        )
392        .await
393        .map_err(Into::into)
394    }
395
396    async fn dry_run_transaction_block(
397        &self,
398        tx_bytes: Base64,
399    ) -> RpcResult<DryRunTransactionBlockResponse> {
400        self.dry_run_transaction_block_impl(tx_bytes, &self.package_resolver)
401            .await
402            .map_err(Into::into)
403    }
404
405    async fn view_function_call(
406        &self,
407        function_name: String,
408        type_args: Option<Vec<IotaTypeTag>>,
409        arguments: Vec<IotaJsonValue>,
410    ) -> RpcResult<IotaMoveViewCallResults> {
411        let MoveFunctionName {
412            package,
413            module,
414            function,
415        } = function_name.as_str().parse().map_err(IndexerError::from)?;
416        let sender = IotaAddress::ZERO;
417        let tx_kind = self
418            .transaction_builder
419            .move_view_call_tx_kind(
420                package,
421                &module,
422                &function,
423                type_args.unwrap_or_default(),
424                arguments,
425            )
426            .await
427            .map_err(IndexerError::from)?;
428        let tx_bytes = Base64::from_bytes(&bcs::to_bytes(&tx_kind).map_err(IndexerError::from)?);
429        let dev_inspect_results = self
430            .dev_inspect_transaction_block(sender, tx_bytes, None, None, None)
431            .await?;
432        Ok(IotaMoveViewCallResults::from_dev_inspect_results(
433            self.package_resolver.package_store().clone(),
434            dev_inspect_results,
435        )
436        .await
437        .map_err(IndexerError::from)?)
438    }
439}
440
441#[async_trait]
442impl WriteApiServer for OptimisticWriteApi {
443    async fn execute_transaction_block(
444        &self,
445        tx_bytes: Base64,
446        signatures: Vec<Base64>,
447        options: Option<IotaTransactionBlockResponseOptions>,
448        _request_type: Option<ExecuteTransactionRequestType>,
449    ) -> RpcResult<IotaTransactionBlockResponse> {
450        let ingestion_path = self
451            .optimistic_tx_executor
452            .execute_and_index_transaction(tx_bytes, signatures)
453            .await?;
454        Ok(self
455            .build_response(ingestion_path, options.unwrap_or_default())
456            .await?)
457    }
458
459    async fn dev_inspect_transaction_block(
460        &self,
461        sender_address: IotaAddress,
462        tx_bytes: Base64,
463        gas_price: Option<BigInt<u64>>,
464        epoch: Option<BigInt<u64>>,
465        additional_args: Option<DevInspectArgs>,
466    ) -> RpcResult<DevInspectResults> {
467        self.write_api
468            .dev_inspect_transaction_block(
469                sender_address,
470                tx_bytes,
471                gas_price,
472                epoch,
473                additional_args,
474            )
475            .await
476    }
477
478    async fn dry_run_transaction_block(
479        &self,
480        tx_bytes: Base64,
481    ) -> RpcResult<DryRunTransactionBlockResponse> {
482        self.write_api.dry_run_transaction_block(tx_bytes).await
483    }
484
485    async fn view_function_call(
486        &self,
487        function_name: String,
488        type_args: Option<Vec<IotaTypeTag>>,
489        arguments: Vec<IotaJsonValue>,
490    ) -> RpcResult<IotaMoveViewCallResults> {
491        let chain = self
492            .optimistic_tx_executor
493            .read
494            .get_chain_identifier_in_blocking_task()
495            .await?
496            .chain();
497        if !matches!(chain, Chain::Unknown) {
498            return Err(ApiError::UnsupportedFeature(format!(
499                "View calls are not yet supported on {}",
500                chain.as_str()
501            ))
502            .into());
503        }
504
505        self.write_api
506            .view_function_call(function_name, type_args, arguments)
507            .await
508    }
509}
510
511impl IotaRpcModule for WriteApi {
512    fn rpc(self) -> RpcModule<Self> {
513        self.into_rpc()
514    }
515
516    fn rpc_doc_module() -> Module {
517        iota_json_rpc_api::WriteApiOpenRpc::module_doc()
518    }
519}
520
521impl IotaRpcModule for OptimisticWriteApi {
522    fn rpc(self) -> RpcModule<Self> {
523        self.into_rpc()
524    }
525
526    fn rpc_doc_module() -> Module {
527        iota_json_rpc_api::WriteApiOpenRpc::module_doc()
528    }
529}
530
531/// Resolves balance and object changes in dry_run.
532///
533/// Checks the in-memory cache (from the simulate
534/// response) first, then falls back to the indexer's `objects` table for
535/// dynamically loaded objects not included in the response.
536pub struct TxObjectResolver {
537    object_cache: InMemObjectCache,
538    reader: Arc<IndexerReader>,
539}
540
541impl TxObjectResolver {
542    pub fn new(objects: &[&Object], reader: Arc<IndexerReader>) -> Self {
543        let mut object_cache = InMemObjectCache::new();
544        for obj in objects {
545            object_cache.insert_object(<&Object>::clone(obj).clone());
546        }
547        Self {
548            object_cache,
549            reader,
550        }
551    }
552
553    async fn get_past_object_read_with_retry(
554        &self,
555        id: ObjectId,
556        version: SequenceNumber,
557    ) -> IndexerResult<PastObjectRead> {
558        let backoff = backoff::ExponentialBackoff {
559            initial_interval: Duration::from_millis(100),
560            max_elapsed_time: Some(Duration::from_secs(3)),
561            multiplier: 2.0,
562            ..Default::default()
563        };
564
565        backoff::future::retry(backoff, || async {
566            self.reader
567                .get_past_object_read_with_fallback(id, version, false)
568                .await
569                .map_err(backoff::Error::transient)
570        })
571        .await
572    }
573
574    pub(crate) async fn get_changes(
575        &self,
576        tx: &TransactionData,
577        effects: &TransactionEffects,
578        tx_digest: &TransactionDigest,
579    ) -> IndexerResult<(
580        Vec<iota_json_rpc_types::BalanceChange>,
581        Vec<IndexedObjectChange>,
582    )> {
583        let object_changes: Vec<_> = get_object_changes(
584            self,
585            tx.sender(),
586            effects.modified_at_versions(),
587            effects.all_changed_objects(),
588            effects.all_removed_objects(),
589        )
590        .await?
591        .into_iter()
592        .map(IndexedObjectChange::from)
593        .collect();
594        let balance_changes = get_balance_changes_from_effect(
595            self,
596            effects,
597            tx.input_objects().unwrap_or_else(|e| {
598                panic!("checkpointed tx {tx_digest} has invalid input objects: {e}")
599            }),
600            None,
601        )
602        .await?;
603        Ok((balance_changes, object_changes))
604    }
605}
606
607#[async_trait]
608impl ObjectProvider for TxObjectResolver {
609    type Error = IndexerError;
610
611    async fn get_object(
612        &self,
613        id: &ObjectId,
614        version: &SequenceNumber,
615    ) -> Result<Object, Self::Error> {
616        // try in-memory cache first
617        if let Some(o) = self.object_cache.get(id, Some(version)) {
618            return Ok(o.clone());
619        }
620
621        let past_read = self.get_past_object_read_with_retry(*id, *version).await?;
622
623        past_read.into_object().map_err(|e| {
624            IndexerError::Generic(format!(
625                "object {id} at version {version} not found in cache or indexer DB: {e}"
626            ))
627        })
628    }
629
630    async fn find_object_lt_or_eq_version(
631        &self,
632        id: &ObjectId,
633        version: &SequenceNumber,
634    ) -> Result<Option<Object>, Self::Error> {
635        // try exact version in cache
636        if let Some(o) = self.object_cache.get(id, Some(version)) {
637            return Ok(Some(o.clone()));
638        }
639
640        // try latest in cache
641        if let Some(o) = self.object_cache.get(id, None) {
642            if o.version() <= *version {
643                return Ok(Some(o.clone()));
644            }
645        }
646
647        self.get_past_object_read_with_retry(*id, *version)
648            .await
649            .map(|past_read| past_read.into_object().ok())
650    }
651}