iota_surfer/
surfer_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet},
7    path::Path,
8    sync::Arc,
9    time::Duration,
10};
11
12use indexmap::IndexSet;
13use iota_json_rpc_types::{IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI};
14use iota_move_build::BuildConfig;
15use iota_protocol_config::{Chain, ProtocolConfig};
16use iota_types::{
17    IOTA_FRAMEWORK_ADDRESS, Identifier,
18    base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
19    execution_config_utils::to_binary_config,
20    object::{Object, Owner},
21    storage::WriteKind,
22    transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData},
23};
24use move_binary_format::{file_format::Visibility, normalized::Type};
25use move_core_types::language_storage::StructTag;
26use rand::rngs::StdRng;
27use test_cluster::TestCluster;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info};
30
31#[derive(Debug, Clone)]
32pub struct EntryFunction {
33    pub package: ObjectID,
34    pub module: String,
35    pub function: String,
36    pub parameters: Vec<Type>,
37}
38
39#[derive(Debug, Default)]
40pub struct SurfStatistics {
41    pub num_successful_transactions: u64,
42    pub num_failed_transactions: u64,
43    pub num_owned_obj_transactions: u64,
44    pub num_shared_obj_transactions: u64,
45    pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
46}
47
48impl SurfStatistics {
49    pub fn record_transaction(
50        &mut self,
51        has_shared_object: bool,
52        tx_succeeded: bool,
53        package: ObjectID,
54        module: String,
55        function: String,
56    ) {
57        if tx_succeeded {
58            self.num_successful_transactions += 1;
59        } else {
60            self.num_failed_transactions += 1;
61        }
62        if has_shared_object {
63            self.num_shared_obj_transactions += 1;
64        } else {
65            self.num_owned_obj_transactions += 1;
66        }
67        self.unique_move_functions_called
68            .insert((package, module, function));
69    }
70
71    pub fn aggregate(stats: Vec<Self>) -> Self {
72        let mut result = Self::default();
73        for stat in stats {
74            result.num_successful_transactions += stat.num_successful_transactions;
75            result.num_failed_transactions += stat.num_failed_transactions;
76            result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
77            result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
78            result
79                .unique_move_functions_called
80                .extend(stat.unique_move_functions_called);
81        }
82        result
83    }
84
85    pub fn print_stats(&self) {
86        info!(
87            "Executed {} transactions, {} succeeded, {} failed",
88            self.num_successful_transactions + self.num_failed_transactions,
89            self.num_successful_transactions,
90            self.num_failed_transactions
91        );
92        info!(
93            "{} are owned object transactions, {} are shared object transactions",
94            self.num_owned_obj_transactions, self.num_shared_obj_transactions
95        );
96        info!(
97            "Unique move functions called: {}",
98            self.unique_move_functions_called.len()
99        );
100    }
101}
102
103pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
104
105pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
106
107/// Map from StructTag to a vector of shared objects, where each shared object
108/// is a tuple of (object ID, initial shared version).
109pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<(ObjectID, SequenceNumber)>>>>;
110
111pub struct SurferState {
112    pub id: usize,
113    pub cluster: Arc<TestCluster>,
114    pub rng: StdRng,
115
116    pub address: IotaAddress,
117    pub gas_object: ObjectRef,
118    pub owned_objects: OwnedObjects,
119    pub immutable_objects: ImmObjects,
120    pub shared_objects: SharedObjects,
121    pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
122
123    pub stats: SurfStatistics,
124}
125
126impl SurferState {
127    pub fn new(
128        id: usize,
129        cluster: Arc<TestCluster>,
130        rng: StdRng,
131        address: IotaAddress,
132        gas_object: ObjectRef,
133        owned_objects: OwnedObjects,
134        immutable_objects: ImmObjects,
135        shared_objects: SharedObjects,
136        entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
137    ) -> Self {
138        Self {
139            id,
140            cluster,
141            rng,
142            address,
143            gas_object,
144            owned_objects,
145            immutable_objects,
146            shared_objects,
147            entry_functions,
148            stats: Default::default(),
149        }
150    }
151
152    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
153    pub async fn execute_move_transaction(
154        &mut self,
155        package: ObjectID,
156        module: String,
157        function: String,
158        args: Vec<CallArg>,
159    ) {
160        let rgp = self.cluster.get_reference_gas_price().await;
161        let use_shared_object = args
162            .iter()
163            .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
164        let tx_data = TransactionData::new_move_call(
165            self.address,
166            package,
167            Identifier::new(module.as_str()).unwrap(),
168            Identifier::new(function.as_str()).unwrap(),
169            vec![],
170            self.gas_object,
171            args,
172            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
173            rgp,
174        )
175        .unwrap();
176        let tx = self.cluster.wallet.sign_transaction(&tx_data);
177        let response = loop {
178            match self
179                .cluster
180                .wallet
181                .execute_transaction_may_fail(tx.clone())
182                .await
183            {
184                Ok(effects) => break effects,
185                Err(e) => {
186                    error!("Error executing transaction: {:?}", e);
187                    tokio::time::sleep(Duration::from_secs(1)).await;
188                }
189            }
190        };
191        debug!(
192            "Successfully executed transaction {:?} with response {:?}",
193            tx, response
194        );
195        let effects = response.effects.unwrap();
196        info!(
197            "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
198            self.address,
199            module,
200            function,
201            effects.status()
202        );
203        self.stats.record_transaction(
204            use_shared_object,
205            effects.status().is_ok(),
206            package,
207            module,
208            function,
209        );
210        self.process_tx_effects(&effects).await;
211    }
212
213    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
214    async fn process_tx_effects(&mut self, effects: &IotaTransactionBlockEffects) {
215        for (owned_ref, write_kind) in effects.all_changed_objects() {
216            if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
217                // For object owned objects, we don't need to do anything.
218                // We also cannot read them because in the case of shared objects, there can be
219                // races and the child object may no longer exist.
220                continue;
221            }
222            let obj_ref = owned_ref.reference.to_object_ref();
223            let object = self
224                .cluster
225                .get_object_from_fullnode_store(&obj_ref.0)
226                .await
227                .unwrap();
228            if object.is_package() {
229                self.discover_entry_functions(object).await;
230                continue;
231            }
232            let struct_tag = object.struct_tag().unwrap();
233            match owned_ref.owner {
234                Owner::Immutable => {
235                    self.immutable_objects
236                        .write()
237                        .await
238                        .entry(struct_tag)
239                        .or_default()
240                        .push(obj_ref);
241                }
242                Owner::AddressOwner(address) => {
243                    if address == self.address {
244                        self.owned_objects
245                            .entry(struct_tag)
246                            .or_default()
247                            .insert(obj_ref);
248                    }
249                }
250                Owner::ObjectOwner(_) => (),
251                Owner::Shared {
252                    initial_shared_version,
253                } => {
254                    if write_kind != WriteKind::Mutate {
255                        self.shared_objects
256                            .write()
257                            .await
258                            .entry(struct_tag)
259                            .or_default()
260                            .push((obj_ref.0, initial_shared_version));
261                    }
262                    // We do not need to insert it if it's a Mutate, because it
263                    // means we should already have it in
264                    // the inventory.
265                }
266            }
267            if obj_ref.0 == self.gas_object.0 {
268                self.gas_object = obj_ref;
269            }
270        }
271    }
272
273    async fn discover_entry_functions(&self, package: Object) {
274        let package_id = package.id();
275        let move_package = package.into_inner().data.try_into_package().unwrap();
276        let proto_version = self.cluster.highest_protocol_version();
277        let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
278        let binary_config = to_binary_config(&config);
279        let entry_functions: Vec<_> = move_package
280            .normalize(&binary_config)
281            .unwrap()
282            .into_iter()
283            .flat_map(|(module_name, module)| {
284                module
285                    .functions
286                    .into_iter()
287                    .filter_map(|(func_name, func)| {
288                        // Either public function or entry function is callable.
289                        if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
290                            return None;
291                        }
292                        // Surfer doesn't support chaining transactions in a programmable
293                        // transaction yet.
294                        if !func.return_.is_empty() {
295                            return None;
296                        }
297                        // Surfer doesn't support type parameter yet.
298                        if !func.type_parameters.is_empty() {
299                            return None;
300                        }
301                        let mut parameters = func.parameters;
302                        if let Some(last_param) = parameters.last().as_ref() {
303                            if is_type_tx_context(last_param) {
304                                parameters.pop();
305                            }
306                        }
307                        Some(EntryFunction {
308                            package: package_id,
309                            module: module_name.clone(),
310                            function: func_name.to_string(),
311                            parameters,
312                        })
313                    })
314                    .collect::<Vec<_>>()
315            })
316            .collect();
317        info!(
318            "Number of entry functions discovered: {:?}",
319            entry_functions.len()
320        );
321        debug!("Entry functions: {:?}", entry_functions);
322        self.entry_functions.write().await.extend(entry_functions);
323    }
324
325    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
326    pub async fn publish_package(&mut self, path: &Path) {
327        let rgp = self.cluster.get_reference_gas_price().await;
328        let package = BuildConfig::new_for_testing().build(path).unwrap();
329        let modules = package.get_package_bytes(false);
330        let tx_data = TransactionData::new_module(
331            self.address,
332            self.gas_object,
333            modules,
334            package.dependency_ids.published.values().cloned().collect(),
335            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
336            rgp,
337        );
338        let tx = self.cluster.wallet.sign_transaction(&tx_data);
339        let response = loop {
340            match self
341                .cluster
342                .wallet
343                .execute_transaction_may_fail(tx.clone())
344                .await
345            {
346                Ok(response) => {
347                    break response;
348                }
349                Err(err) => {
350                    error!("Failed to publish package: {:?}", err);
351                    tokio::time::sleep(Duration::from_secs(1)).await;
352                }
353            }
354        };
355        info!("Successfully published package in {:?}", path);
356        self.process_tx_effects(&response.effects.unwrap()).await;
357    }
358
359    pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
360        self.owned_objects
361            .get(type_tag)
362            .map(|objects| objects.len())
363            .unwrap_or(0)
364    }
365
366    pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
367        self.immutable_objects
368            .read()
369            .await
370            .get(type_tag)
371            .map(|objects| objects.len())
372            .unwrap_or(0)
373    }
374
375    pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
376        self.shared_objects
377            .read()
378            .await
379            .get(type_tag)
380            .map(|objects| objects.len())
381            .unwrap_or(0)
382    }
383
384    pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
385        self.owned_objects
386            .get_mut(type_tag)
387            .unwrap()
388            .swap_remove_index(n)
389            .unwrap()
390    }
391
392    pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
393        self.immutable_objects.read().await.get(type_tag).unwrap()[n]
394    }
395
396    pub async fn choose_nth_shared_object(
397        &self,
398        type_tag: &StructTag,
399        n: usize,
400    ) -> (ObjectID, SequenceNumber) {
401        self.shared_objects.read().await.get(type_tag).unwrap()[n]
402    }
403}
404
405fn is_type_tx_context(ty: &Type) -> bool {
406    match ty {
407        Type::Reference(inner) | Type::MutableReference(inner) => match inner.as_ref() {
408            Type::Struct {
409                address,
410                module,
411                name,
412                type_arguments,
413            } => {
414                address == &IOTA_FRAMEWORK_ADDRESS
415                    && module == &Identifier::new("tx_context").unwrap()
416                    && name == &Identifier::new("TxContext").unwrap()
417                    && type_arguments.is_empty()
418            }
419            _ => false,
420        },
421        _ => false,
422    }
423}