1use std::{
6 collections::{HashMap, HashSet},
7 path::Path,
8 sync::Arc,
9 time::Duration,
10};
11
12use indexmap::IndexSet;
13use iota_json_rpc_types::{IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI};
14use iota_move_build::BuildConfig;
15use iota_protocol_config::{Chain, ProtocolConfig};
16use iota_types::{
17 IOTA_FRAMEWORK_ADDRESS, Identifier,
18 base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
19 execution_config_utils::to_binary_config,
20 object::{Object, Owner},
21 storage::WriteKind,
22 transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData},
23};
24use move_binary_format::{file_format::Visibility, normalized::Type};
25use move_core_types::language_storage::StructTag;
26use rand::rngs::StdRng;
27use test_cluster::TestCluster;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info};
30
31#[derive(Debug, Clone)]
32pub struct EntryFunction {
33 pub package: ObjectID,
34 pub module: String,
35 pub function: String,
36 pub parameters: Vec<Type>,
37}
38
39#[derive(Debug, Default)]
40pub struct SurfStatistics {
41 pub num_successful_transactions: u64,
42 pub num_failed_transactions: u64,
43 pub num_owned_obj_transactions: u64,
44 pub num_shared_obj_transactions: u64,
45 pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
46}
47
48impl SurfStatistics {
49 pub fn record_transaction(
50 &mut self,
51 has_shared_object: bool,
52 tx_succeeded: bool,
53 package: ObjectID,
54 module: String,
55 function: String,
56 ) {
57 if tx_succeeded {
58 self.num_successful_transactions += 1;
59 } else {
60 self.num_failed_transactions += 1;
61 }
62 if has_shared_object {
63 self.num_shared_obj_transactions += 1;
64 } else {
65 self.num_owned_obj_transactions += 1;
66 }
67 self.unique_move_functions_called
68 .insert((package, module, function));
69 }
70
71 pub fn aggregate(stats: Vec<Self>) -> Self {
72 let mut result = Self::default();
73 for stat in stats {
74 result.num_successful_transactions += stat.num_successful_transactions;
75 result.num_failed_transactions += stat.num_failed_transactions;
76 result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
77 result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
78 result
79 .unique_move_functions_called
80 .extend(stat.unique_move_functions_called);
81 }
82 result
83 }
84
85 pub fn print_stats(&self) {
86 info!(
87 "Executed {} transactions, {} succeeded, {} failed",
88 self.num_successful_transactions + self.num_failed_transactions,
89 self.num_successful_transactions,
90 self.num_failed_transactions
91 );
92 info!(
93 "{} are owned object transactions, {} are shared object transactions",
94 self.num_owned_obj_transactions, self.num_shared_obj_transactions
95 );
96 info!(
97 "Unique move functions called: {}",
98 self.unique_move_functions_called.len()
99 );
100 }
101}
102
103pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
104
105pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
106
107pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<(ObjectID, SequenceNumber)>>>>;
110
111pub struct SurferState {
112 pub id: usize,
113 pub cluster: Arc<TestCluster>,
114 pub rng: StdRng,
115
116 pub address: IotaAddress,
117 pub gas_object: ObjectRef,
118 pub owned_objects: OwnedObjects,
119 pub immutable_objects: ImmObjects,
120 pub shared_objects: SharedObjects,
121 pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
122
123 pub stats: SurfStatistics,
124}
125
126impl SurferState {
127 pub fn new(
128 id: usize,
129 cluster: Arc<TestCluster>,
130 rng: StdRng,
131 address: IotaAddress,
132 gas_object: ObjectRef,
133 owned_objects: OwnedObjects,
134 immutable_objects: ImmObjects,
135 shared_objects: SharedObjects,
136 entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
137 ) -> Self {
138 Self {
139 id,
140 cluster,
141 rng,
142 address,
143 gas_object,
144 owned_objects,
145 immutable_objects,
146 shared_objects,
147 entry_functions,
148 stats: Default::default(),
149 }
150 }
151
152 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
153 pub async fn execute_move_transaction(
154 &mut self,
155 package: ObjectID,
156 module: String,
157 function: String,
158 args: Vec<CallArg>,
159 ) {
160 let rgp = self.cluster.get_reference_gas_price().await;
161 let use_shared_object = args
162 .iter()
163 .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
164 let tx_data = TransactionData::new_move_call(
165 self.address,
166 package,
167 Identifier::new(module.as_str()).unwrap(),
168 Identifier::new(function.as_str()).unwrap(),
169 vec![],
170 self.gas_object,
171 args,
172 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
173 rgp,
174 )
175 .unwrap();
176 let tx = self.cluster.wallet.sign_transaction(&tx_data);
177 let response = loop {
178 match self
179 .cluster
180 .wallet
181 .execute_transaction_may_fail(tx.clone())
182 .await
183 {
184 Ok(effects) => break effects,
185 Err(e) => {
186 error!("Error executing transaction: {:?}", e);
187 tokio::time::sleep(Duration::from_secs(1)).await;
188 }
189 }
190 };
191 debug!(
192 "Successfully executed transaction {:?} with response {:?}",
193 tx, response
194 );
195 let effects = response.effects.unwrap();
196 info!(
197 "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
198 self.address,
199 module,
200 function,
201 effects.status()
202 );
203 self.stats.record_transaction(
204 use_shared_object,
205 effects.status().is_ok(),
206 package,
207 module,
208 function,
209 );
210 self.process_tx_effects(&effects).await;
211 }
212
213 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
214 async fn process_tx_effects(&mut self, effects: &IotaTransactionBlockEffects) {
215 for (owned_ref, write_kind) in effects.all_changed_objects() {
216 if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
217 continue;
221 }
222 let obj_ref = owned_ref.reference.to_object_ref();
223 let object = self
224 .cluster
225 .get_object_from_fullnode_store(&obj_ref.0)
226 .await
227 .unwrap();
228 if object.is_package() {
229 self.discover_entry_functions(object).await;
230 continue;
231 }
232 let struct_tag = object.struct_tag().unwrap();
233 match owned_ref.owner {
234 Owner::Immutable => {
235 self.immutable_objects
236 .write()
237 .await
238 .entry(struct_tag)
239 .or_default()
240 .push(obj_ref);
241 }
242 Owner::AddressOwner(address) => {
243 if address == self.address {
244 self.owned_objects
245 .entry(struct_tag)
246 .or_default()
247 .insert(obj_ref);
248 }
249 }
250 Owner::ObjectOwner(_) => (),
251 Owner::Shared {
252 initial_shared_version,
253 } => {
254 if write_kind != WriteKind::Mutate {
255 self.shared_objects
256 .write()
257 .await
258 .entry(struct_tag)
259 .or_default()
260 .push((obj_ref.0, initial_shared_version));
261 }
262 }
266 }
267 if obj_ref.0 == self.gas_object.0 {
268 self.gas_object = obj_ref;
269 }
270 }
271 }
272
273 async fn discover_entry_functions(&self, package: Object) {
274 let package_id = package.id();
275 let move_package = package.into_inner().data.try_into_package().unwrap();
276 let proto_version = self.cluster.highest_protocol_version();
277 let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
278 let binary_config = to_binary_config(&config);
279 let entry_functions: Vec<_> = move_package
280 .normalize(&binary_config)
281 .unwrap()
282 .into_iter()
283 .flat_map(|(module_name, module)| {
284 module
285 .functions
286 .into_iter()
287 .filter_map(|(func_name, func)| {
288 if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
290 return None;
291 }
292 if !func.return_.is_empty() {
295 return None;
296 }
297 if !func.type_parameters.is_empty() {
299 return None;
300 }
301 let mut parameters = func.parameters;
302 if let Some(last_param) = parameters.last().as_ref() {
303 if is_type_tx_context(last_param) {
304 parameters.pop();
305 }
306 }
307 Some(EntryFunction {
308 package: package_id,
309 module: module_name.clone(),
310 function: func_name.to_string(),
311 parameters,
312 })
313 })
314 .collect::<Vec<_>>()
315 })
316 .collect();
317 info!(
318 "Number of entry functions discovered: {:?}",
319 entry_functions.len()
320 );
321 debug!("Entry functions: {:?}", entry_functions);
322 self.entry_functions.write().await.extend(entry_functions);
323 }
324
325 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
326 pub async fn publish_package(&mut self, path: &Path) {
327 let rgp = self.cluster.get_reference_gas_price().await;
328 let package = BuildConfig::new_for_testing().build(path).unwrap();
329 let modules = package.get_package_bytes(false);
330 let tx_data = TransactionData::new_module(
331 self.address,
332 self.gas_object,
333 modules,
334 package.dependency_ids.published.values().cloned().collect(),
335 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
336 rgp,
337 );
338 let tx = self.cluster.wallet.sign_transaction(&tx_data);
339 let response = loop {
340 match self
341 .cluster
342 .wallet
343 .execute_transaction_may_fail(tx.clone())
344 .await
345 {
346 Ok(response) => {
347 break response;
348 }
349 Err(err) => {
350 error!("Failed to publish package: {:?}", err);
351 tokio::time::sleep(Duration::from_secs(1)).await;
352 }
353 }
354 };
355 info!("Successfully published package in {:?}", path);
356 self.process_tx_effects(&response.effects.unwrap()).await;
357 }
358
359 pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
360 self.owned_objects
361 .get(type_tag)
362 .map(|objects| objects.len())
363 .unwrap_or(0)
364 }
365
366 pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
367 self.immutable_objects
368 .read()
369 .await
370 .get(type_tag)
371 .map(|objects| objects.len())
372 .unwrap_or(0)
373 }
374
375 pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
376 self.shared_objects
377 .read()
378 .await
379 .get(type_tag)
380 .map(|objects| objects.len())
381 .unwrap_or(0)
382 }
383
384 pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
385 self.owned_objects
386 .get_mut(type_tag)
387 .unwrap()
388 .swap_remove_index(n)
389 .unwrap()
390 }
391
392 pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
393 self.immutable_objects.read().await.get(type_tag).unwrap()[n]
394 }
395
396 pub async fn choose_nth_shared_object(
397 &self,
398 type_tag: &StructTag,
399 n: usize,
400 ) -> (ObjectID, SequenceNumber) {
401 self.shared_objects.read().await.get(type_tag).unwrap()[n]
402 }
403}
404
405fn is_type_tx_context(ty: &Type) -> bool {
406 match ty {
407 Type::Reference(inner) | Type::MutableReference(inner) => match inner.as_ref() {
408 Type::Struct {
409 address,
410 module,
411 name,
412 type_arguments,
413 } => {
414 address == &IOTA_FRAMEWORK_ADDRESS
415 && module == &Identifier::new("tx_context").unwrap()
416 && name == &Identifier::new("TxContext").unwrap()
417 && type_arguments.is_empty()
418 }
419 _ => false,
420 },
421 _ => false,
422 }
423}