iota_surfer/
surfer_state.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{HashMap, HashSet},
7    path::Path,
8    sync::Arc,
9    time::Duration,
10};
11
12use indexmap::IndexSet;
13use iota_json_rpc_types::{IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI};
14use iota_move_build::BuildConfig;
15use iota_protocol_config::{Chain, ProtocolConfig};
16use iota_types::{
17    IOTA_FRAMEWORK_ADDRESS, Identifier,
18    base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
19    execution_config_utils::to_binary_config,
20    object::{Object, Owner},
21    storage::WriteKind,
22    transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData},
23};
24use move_binary_format::{file_format::Visibility, normalized};
25use move_core_types::{identifier::IdentStr, language_storage::StructTag};
26use rand::rngs::StdRng;
27use test_cluster::TestCluster;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info};
30
31type Type = normalized::Type<normalized::ArcIdentifier>;
32
33#[derive(Debug, Clone)]
34pub struct EntryFunction {
35    pub package: ObjectID,
36    pub module: String,
37    pub function: String,
38    pub parameters: Vec<Type>,
39}
40
41#[derive(Debug, Default)]
42pub struct SurfStatistics {
43    pub num_successful_transactions: u64,
44    pub num_failed_transactions: u64,
45    pub num_owned_obj_transactions: u64,
46    pub num_shared_obj_transactions: u64,
47    pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
48}
49
50impl SurfStatistics {
51    pub fn record_transaction(
52        &mut self,
53        has_shared_object: bool,
54        tx_succeeded: bool,
55        package: ObjectID,
56        module: String,
57        function: String,
58    ) {
59        if tx_succeeded {
60            self.num_successful_transactions += 1;
61        } else {
62            self.num_failed_transactions += 1;
63        }
64        if has_shared_object {
65            self.num_shared_obj_transactions += 1;
66        } else {
67            self.num_owned_obj_transactions += 1;
68        }
69        self.unique_move_functions_called
70            .insert((package, module, function));
71    }
72
73    pub fn aggregate(stats: Vec<Self>) -> Self {
74        let mut result = Self::default();
75        for stat in stats {
76            result.num_successful_transactions += stat.num_successful_transactions;
77            result.num_failed_transactions += stat.num_failed_transactions;
78            result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
79            result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
80            result
81                .unique_move_functions_called
82                .extend(stat.unique_move_functions_called);
83        }
84        result
85    }
86
87    pub fn print_stats(&self) {
88        info!(
89            "Executed {} transactions, {} succeeded, {} failed",
90            self.num_successful_transactions + self.num_failed_transactions,
91            self.num_successful_transactions,
92            self.num_failed_transactions
93        );
94        info!(
95            "{} are owned object transactions, {} are shared object transactions",
96            self.num_owned_obj_transactions, self.num_shared_obj_transactions
97        );
98        info!(
99            "Unique move functions called: {}",
100            self.unique_move_functions_called.len()
101        );
102    }
103}
104
105pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
106
107pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
108
109/// Map from StructTag to a vector of shared objects, where each shared object
110/// is a tuple of (object ID, initial shared version).
111pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<(ObjectID, SequenceNumber)>>>>;
112
113pub struct SurferState {
114    pub pool: Arc<RwLock<normalized::ArcPool>>,
115    pub id: usize,
116    pub cluster: Arc<TestCluster>,
117    pub rng: StdRng,
118
119    pub address: IotaAddress,
120    pub gas_object: ObjectRef,
121    pub owned_objects: OwnedObjects,
122    pub immutable_objects: ImmObjects,
123    pub shared_objects: SharedObjects,
124    pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
125
126    pub stats: SurfStatistics,
127}
128
129impl SurferState {
130    pub fn new(
131        id: usize,
132        cluster: Arc<TestCluster>,
133        rng: StdRng,
134        address: IotaAddress,
135        gas_object: ObjectRef,
136        owned_objects: OwnedObjects,
137        immutable_objects: ImmObjects,
138        shared_objects: SharedObjects,
139        entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
140    ) -> Self {
141        Self {
142            pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
143            id,
144            cluster,
145            rng,
146            address,
147            gas_object,
148            owned_objects,
149            immutable_objects,
150            shared_objects,
151            entry_functions,
152            stats: Default::default(),
153        }
154    }
155
156    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
157    pub async fn execute_move_transaction(
158        &mut self,
159        package: ObjectID,
160        module: String,
161        function: String,
162        args: Vec<CallArg>,
163    ) {
164        let rgp = self.cluster.get_reference_gas_price().await;
165        let use_shared_object = args
166            .iter()
167            .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
168        let tx_data = TransactionData::new_move_call(
169            self.address,
170            package,
171            Identifier::new(module.as_str()).unwrap(),
172            Identifier::new(function.as_str()).unwrap(),
173            vec![],
174            self.gas_object,
175            args,
176            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
177            rgp,
178        )
179        .unwrap();
180        let tx = self.cluster.wallet.sign_transaction(&tx_data);
181        let response = loop {
182            match self
183                .cluster
184                .wallet
185                .execute_transaction_may_fail(tx.clone())
186                .await
187            {
188                Ok(effects) => break effects,
189                Err(e) => {
190                    error!("Error executing transaction: {:?}", e);
191                    tokio::time::sleep(Duration::from_secs(1)).await;
192                }
193            }
194        };
195        debug!(
196            "Successfully executed transaction {:?} with response {:?}",
197            tx, response
198        );
199        let effects = response.effects.unwrap();
200        info!(
201            "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
202            self.address,
203            module,
204            function,
205            effects.status()
206        );
207        self.stats.record_transaction(
208            use_shared_object,
209            effects.status().is_ok(),
210            package,
211            module,
212            function,
213        );
214        self.process_tx_effects(&effects).await;
215    }
216
217    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
218    async fn process_tx_effects(&mut self, effects: &IotaTransactionBlockEffects) {
219        for (owned_ref, write_kind) in effects.all_changed_objects() {
220            if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
221                // For object owned objects, we don't need to do anything.
222                // We also cannot read them because in the case of shared objects, there can be
223                // races and the child object may no longer exist.
224                continue;
225            }
226            let obj_ref = owned_ref.reference.to_object_ref();
227            let object = self
228                .cluster
229                .get_object_from_fullnode_store(&obj_ref.0)
230                .await
231                .unwrap();
232            if object.is_package() {
233                self.discover_entry_functions(object).await;
234                continue;
235            }
236            let struct_tag = object.struct_tag().unwrap();
237            match owned_ref.owner {
238                Owner::Immutable => {
239                    self.immutable_objects
240                        .write()
241                        .await
242                        .entry(struct_tag)
243                        .or_default()
244                        .push(obj_ref);
245                }
246                Owner::AddressOwner(address) => {
247                    if address == self.address {
248                        self.owned_objects
249                            .entry(struct_tag)
250                            .or_default()
251                            .insert(obj_ref);
252                    }
253                }
254                Owner::ObjectOwner(_) => (),
255                Owner::Shared {
256                    initial_shared_version,
257                } => {
258                    if write_kind != WriteKind::Mutate {
259                        self.shared_objects
260                            .write()
261                            .await
262                            .entry(struct_tag)
263                            .or_default()
264                            .push((obj_ref.0, initial_shared_version));
265                    }
266                    // We do not need to insert it if it's a Mutate, because it
267                    // means we should already have it in
268                    // the inventory.
269                }
270            }
271            if obj_ref.0 == self.gas_object.0 {
272                self.gas_object = obj_ref;
273            }
274        }
275    }
276
277    async fn discover_entry_functions(&self, package: Object) {
278        let package_id = package.id();
279        let move_package = package.into_inner().data.try_into_package().unwrap();
280        let proto_version = self.cluster.highest_protocol_version();
281        let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
282        let binary_config = to_binary_config(&config);
283        let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
284        let entry_functions: Vec<_> = move_package
285            .normalize(pool, &binary_config, /* include code */ false)
286            .unwrap()
287            .into_iter()
288            .flat_map(|(module_name, module)| {
289                module
290                    .functions
291                    .into_iter()
292                    .filter_map(|(func_name, func)| {
293                        // Either public function or entry function is callable.
294                        if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
295                            return None;
296                        }
297                        // Surfer doesn't support chaining transactions in a programmable
298                        // transaction yet.
299                        if !func.return_.is_empty() {
300                            return None;
301                        }
302                        // Surfer doesn't support type parameter yet.
303                        if !func.type_parameters.is_empty() {
304                            return None;
305                        }
306                        let mut parameters = (*func.parameters).clone();
307                        if let Some(last_param) = parameters.last().as_ref() {
308                            if is_type_tx_context(last_param) {
309                                parameters.pop();
310                            }
311                        }
312                        Some(EntryFunction {
313                            package: package_id,
314                            module: module_name.clone(),
315                            function: func_name.to_string(),
316                            parameters: parameters
317                                .into_iter()
318                                .map(|rc_ty| (*rc_ty).clone())
319                                .collect(),
320                        })
321                    })
322                    .collect::<Vec<_>>()
323            })
324            .collect();
325        info!(
326            "Number of entry functions discovered: {:?}",
327            entry_functions.len()
328        );
329        debug!("Entry functions: {:?}", entry_functions);
330        self.entry_functions.write().await.extend(entry_functions);
331    }
332
333    #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
334    pub async fn publish_package(&mut self, path: &Path) {
335        let rgp = self.cluster.get_reference_gas_price().await;
336        let package = BuildConfig::new_for_testing().build(path).unwrap();
337        let modules = package.get_package_bytes(false);
338        let tx_data = TransactionData::new_module(
339            self.address,
340            self.gas_object,
341            modules,
342            package.dependency_ids.published.values().cloned().collect(),
343            TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
344            rgp,
345        );
346        let tx = self.cluster.wallet.sign_transaction(&tx_data);
347        let response = loop {
348            match self
349                .cluster
350                .wallet
351                .execute_transaction_may_fail(tx.clone())
352                .await
353            {
354                Ok(response) => {
355                    break response;
356                }
357                Err(err) => {
358                    error!("Failed to publish package: {:?}", err);
359                    tokio::time::sleep(Duration::from_secs(1)).await;
360                }
361            }
362        };
363        info!("Successfully published package in {:?}", path);
364        self.process_tx_effects(&response.effects.unwrap()).await;
365    }
366
367    pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
368        self.owned_objects
369            .get(type_tag)
370            .map(|objects| objects.len())
371            .unwrap_or(0)
372    }
373
374    pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
375        self.immutable_objects
376            .read()
377            .await
378            .get(type_tag)
379            .map(|objects| objects.len())
380            .unwrap_or(0)
381    }
382
383    pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
384        self.shared_objects
385            .read()
386            .await
387            .get(type_tag)
388            .map(|objects| objects.len())
389            .unwrap_or(0)
390    }
391
392    pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
393        self.owned_objects
394            .get_mut(type_tag)
395            .unwrap()
396            .swap_remove_index(n)
397            .unwrap()
398    }
399
400    pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
401        self.immutable_objects.read().await.get(type_tag).unwrap()[n]
402    }
403
404    pub async fn choose_nth_shared_object(
405        &self,
406        type_tag: &StructTag,
407        n: usize,
408    ) -> (ObjectID, SequenceNumber) {
409        self.shared_objects.read().await.get(type_tag).unwrap()[n]
410    }
411}
412
413fn is_type_tx_context(ty: &Type) -> bool {
414    match ty {
415        Type::Reference(_, inner) => match inner.as_ref() {
416            Type::Datatype(dt) => {
417                dt.module.address == IOTA_FRAMEWORK_ADDRESS
418                    && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
419                    && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
420                    && dt.type_arguments.is_empty()
421            }
422            _ => false,
423        },
424        _ => false,
425    }
426}