1use std::{
6 collections::{HashMap, HashSet},
7 path::Path,
8 sync::Arc,
9 time::Duration,
10};
11
12use indexmap::IndexSet;
13use iota_json_rpc_types::{IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI};
14use iota_move_build::BuildConfig;
15use iota_protocol_config::{Chain, ProtocolConfig};
16use iota_types::{
17 IOTA_FRAMEWORK_ADDRESS, Identifier,
18 base_types::{IotaAddress, ObjectID, ObjectRef, SequenceNumber},
19 execution_config_utils::to_binary_config,
20 object::{Object, Owner},
21 storage::WriteKind,
22 transaction::{CallArg, ObjectArg, TEST_ONLY_GAS_UNIT_FOR_PUBLISH, TransactionData},
23};
24use move_binary_format::{file_format::Visibility, normalized};
25use move_core_types::{identifier::IdentStr, language_storage::StructTag};
26use rand::rngs::StdRng;
27use test_cluster::TestCluster;
28use tokio::sync::RwLock;
29use tracing::{debug, error, info};
30
31type Type = normalized::Type<normalized::ArcIdentifier>;
32
33#[derive(Debug, Clone)]
34pub struct EntryFunction {
35 pub package: ObjectID,
36 pub module: String,
37 pub function: String,
38 pub parameters: Vec<Type>,
39}
40
41#[derive(Debug, Default)]
42pub struct SurfStatistics {
43 pub num_successful_transactions: u64,
44 pub num_failed_transactions: u64,
45 pub num_owned_obj_transactions: u64,
46 pub num_shared_obj_transactions: u64,
47 pub unique_move_functions_called: HashSet<(ObjectID, String, String)>,
48}
49
50impl SurfStatistics {
51 pub fn record_transaction(
52 &mut self,
53 has_shared_object: bool,
54 tx_succeeded: bool,
55 package: ObjectID,
56 module: String,
57 function: String,
58 ) {
59 if tx_succeeded {
60 self.num_successful_transactions += 1;
61 } else {
62 self.num_failed_transactions += 1;
63 }
64 if has_shared_object {
65 self.num_shared_obj_transactions += 1;
66 } else {
67 self.num_owned_obj_transactions += 1;
68 }
69 self.unique_move_functions_called
70 .insert((package, module, function));
71 }
72
73 pub fn aggregate(stats: Vec<Self>) -> Self {
74 let mut result = Self::default();
75 for stat in stats {
76 result.num_successful_transactions += stat.num_successful_transactions;
77 result.num_failed_transactions += stat.num_failed_transactions;
78 result.num_owned_obj_transactions += stat.num_owned_obj_transactions;
79 result.num_shared_obj_transactions += stat.num_shared_obj_transactions;
80 result
81 .unique_move_functions_called
82 .extend(stat.unique_move_functions_called);
83 }
84 result
85 }
86
87 pub fn print_stats(&self) {
88 info!(
89 "Executed {} transactions, {} succeeded, {} failed",
90 self.num_successful_transactions + self.num_failed_transactions,
91 self.num_successful_transactions,
92 self.num_failed_transactions
93 );
94 info!(
95 "{} are owned object transactions, {} are shared object transactions",
96 self.num_owned_obj_transactions, self.num_shared_obj_transactions
97 );
98 info!(
99 "Unique move functions called: {}",
100 self.unique_move_functions_called.len()
101 );
102 }
103}
104
105pub type OwnedObjects = HashMap<StructTag, IndexSet<ObjectRef>>;
106
107pub type ImmObjects = Arc<RwLock<HashMap<StructTag, Vec<ObjectRef>>>>;
108
109pub type SharedObjects = Arc<RwLock<HashMap<StructTag, Vec<(ObjectID, SequenceNumber)>>>>;
112
113pub struct SurferState {
114 pub pool: Arc<RwLock<normalized::ArcPool>>,
115 pub id: usize,
116 pub cluster: Arc<TestCluster>,
117 pub rng: StdRng,
118
119 pub address: IotaAddress,
120 pub gas_object: ObjectRef,
121 pub owned_objects: OwnedObjects,
122 pub immutable_objects: ImmObjects,
123 pub shared_objects: SharedObjects,
124 pub entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
125
126 pub stats: SurfStatistics,
127}
128
129impl SurferState {
130 pub fn new(
131 id: usize,
132 cluster: Arc<TestCluster>,
133 rng: StdRng,
134 address: IotaAddress,
135 gas_object: ObjectRef,
136 owned_objects: OwnedObjects,
137 immutable_objects: ImmObjects,
138 shared_objects: SharedObjects,
139 entry_functions: Arc<RwLock<Vec<EntryFunction>>>,
140 ) -> Self {
141 Self {
142 pool: Arc::new(RwLock::new(normalized::ArcPool::new())),
143 id,
144 cluster,
145 rng,
146 address,
147 gas_object,
148 owned_objects,
149 immutable_objects,
150 shared_objects,
151 entry_functions,
152 stats: Default::default(),
153 }
154 }
155
156 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
157 pub async fn execute_move_transaction(
158 &mut self,
159 package: ObjectID,
160 module: String,
161 function: String,
162 args: Vec<CallArg>,
163 ) {
164 let rgp = self.cluster.get_reference_gas_price().await;
165 let use_shared_object = args
166 .iter()
167 .any(|arg| matches!(arg, CallArg::Object(ObjectArg::SharedObject { .. })));
168 let tx_data = TransactionData::new_move_call(
169 self.address,
170 package,
171 Identifier::new(module.as_str()).unwrap(),
172 Identifier::new(function.as_str()).unwrap(),
173 vec![],
174 self.gas_object,
175 args,
176 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
177 rgp,
178 )
179 .unwrap();
180 let tx = self.cluster.wallet.sign_transaction(&tx_data);
181 let response = loop {
182 match self
183 .cluster
184 .wallet
185 .execute_transaction_may_fail(tx.clone())
186 .await
187 {
188 Ok(effects) => break effects,
189 Err(e) => {
190 error!("Error executing transaction: {:?}", e);
191 tokio::time::sleep(Duration::from_secs(1)).await;
192 }
193 }
194 };
195 debug!(
196 "Successfully executed transaction {:?} with response {:?}",
197 tx, response
198 );
199 let effects = response.effects.unwrap();
200 info!(
201 "[{:?}] Calling Move function {:?}::{:?} returned {:?}",
202 self.address,
203 module,
204 function,
205 effects.status()
206 );
207 self.stats.record_transaction(
208 use_shared_object,
209 effects.status().is_ok(),
210 package,
211 module,
212 function,
213 );
214 self.process_tx_effects(&effects).await;
215 }
216
217 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
218 async fn process_tx_effects(&mut self, effects: &IotaTransactionBlockEffects) {
219 for (owned_ref, write_kind) in effects.all_changed_objects() {
220 if matches!(owned_ref.owner, Owner::ObjectOwner(_)) {
221 continue;
225 }
226 let obj_ref = owned_ref.reference.to_object_ref();
227 let object = self
228 .cluster
229 .get_object_from_fullnode_store(&obj_ref.0)
230 .await
231 .unwrap();
232 if object.is_package() {
233 self.discover_entry_functions(object).await;
234 continue;
235 }
236 let struct_tag = object.struct_tag().unwrap();
237 match owned_ref.owner {
238 Owner::Immutable => {
239 self.immutable_objects
240 .write()
241 .await
242 .entry(struct_tag)
243 .or_default()
244 .push(obj_ref);
245 }
246 Owner::AddressOwner(address) => {
247 if address == self.address {
248 self.owned_objects
249 .entry(struct_tag)
250 .or_default()
251 .insert(obj_ref);
252 }
253 }
254 Owner::ObjectOwner(_) => (),
255 Owner::Shared {
256 initial_shared_version,
257 } => {
258 if write_kind != WriteKind::Mutate {
259 self.shared_objects
260 .write()
261 .await
262 .entry(struct_tag)
263 .or_default()
264 .push((obj_ref.0, initial_shared_version));
265 }
266 }
270 }
271 if obj_ref.0 == self.gas_object.0 {
272 self.gas_object = obj_ref;
273 }
274 }
275 }
276
277 async fn discover_entry_functions(&self, package: Object) {
278 let package_id = package.id();
279 let move_package = package.into_inner().data.try_into_package().unwrap();
280 let proto_version = self.cluster.highest_protocol_version();
281 let config = ProtocolConfig::get_for_version(proto_version, Chain::Unknown);
282 let binary_config = to_binary_config(&config);
283 let pool: &mut normalized::ArcPool = &mut *self.pool.write().await;
284 let entry_functions: Vec<_> = move_package
285 .normalize(pool, &binary_config, false)
286 .unwrap()
287 .into_iter()
288 .flat_map(|(module_name, module)| {
289 module
290 .functions
291 .into_iter()
292 .filter_map(|(func_name, func)| {
293 if !matches!(func.visibility, Visibility::Public) && !func.is_entry {
295 return None;
296 }
297 if !func.return_.is_empty() {
300 return None;
301 }
302 if !func.type_parameters.is_empty() {
304 return None;
305 }
306 let mut parameters = (*func.parameters).clone();
307 if let Some(last_param) = parameters.last().as_ref() {
308 if is_type_tx_context(last_param) {
309 parameters.pop();
310 }
311 }
312 Some(EntryFunction {
313 package: package_id,
314 module: module_name.clone(),
315 function: func_name.to_string(),
316 parameters: parameters
317 .into_iter()
318 .map(|rc_ty| (*rc_ty).clone())
319 .collect(),
320 })
321 })
322 .collect::<Vec<_>>()
323 })
324 .collect();
325 info!(
326 "Number of entry functions discovered: {:?}",
327 entry_functions.len()
328 );
329 debug!("Entry functions: {:?}", entry_functions);
330 self.entry_functions.write().await.extend(entry_functions);
331 }
332
333 #[tracing::instrument(skip_all, fields(surfer_id = self.id))]
334 pub async fn publish_package(&mut self, path: &Path) {
335 let rgp = self.cluster.get_reference_gas_price().await;
336 let package = BuildConfig::new_for_testing().build(path).unwrap();
337 let modules = package.get_package_bytes(false);
338 let tx_data = TransactionData::new_module(
339 self.address,
340 self.gas_object,
341 modules,
342 package.dependency_ids.published.values().cloned().collect(),
343 TEST_ONLY_GAS_UNIT_FOR_PUBLISH * rgp,
344 rgp,
345 );
346 let tx = self.cluster.wallet.sign_transaction(&tx_data);
347 let response = loop {
348 match self
349 .cluster
350 .wallet
351 .execute_transaction_may_fail(tx.clone())
352 .await
353 {
354 Ok(response) => {
355 break response;
356 }
357 Err(err) => {
358 error!("Failed to publish package: {:?}", err);
359 tokio::time::sleep(Duration::from_secs(1)).await;
360 }
361 }
362 };
363 info!("Successfully published package in {:?}", path);
364 self.process_tx_effects(&response.effects.unwrap()).await;
365 }
366
367 pub fn matching_owned_objects_count(&self, type_tag: &StructTag) -> usize {
368 self.owned_objects
369 .get(type_tag)
370 .map(|objects| objects.len())
371 .unwrap_or(0)
372 }
373
374 pub async fn matching_immutable_objects_count(&self, type_tag: &StructTag) -> usize {
375 self.immutable_objects
376 .read()
377 .await
378 .get(type_tag)
379 .map(|objects| objects.len())
380 .unwrap_or(0)
381 }
382
383 pub async fn matching_shared_objects_count(&self, type_tag: &StructTag) -> usize {
384 self.shared_objects
385 .read()
386 .await
387 .get(type_tag)
388 .map(|objects| objects.len())
389 .unwrap_or(0)
390 }
391
392 pub fn choose_nth_owned_object(&mut self, type_tag: &StructTag, n: usize) -> ObjectRef {
393 self.owned_objects
394 .get_mut(type_tag)
395 .unwrap()
396 .swap_remove_index(n)
397 .unwrap()
398 }
399
400 pub async fn choose_nth_immutable_object(&self, type_tag: &StructTag, n: usize) -> ObjectRef {
401 self.immutable_objects.read().await.get(type_tag).unwrap()[n]
402 }
403
404 pub async fn choose_nth_shared_object(
405 &self,
406 type_tag: &StructTag,
407 n: usize,
408 ) -> (ObjectID, SequenceNumber) {
409 self.shared_objects.read().await.get(type_tag).unwrap()[n]
410 }
411}
412
413fn is_type_tx_context(ty: &Type) -> bool {
414 match ty {
415 Type::Reference(_, inner) => match inner.as_ref() {
416 Type::Datatype(dt) => {
417 dt.module.address == IOTA_FRAMEWORK_ADDRESS
418 && dt.module.name.as_ident_str() == IdentStr::new("tx_context").unwrap()
419 && dt.name.as_ident_str() == IdentStr::new("TxContext").unwrap()
420 && dt.type_arguments.is_empty()
421 }
422 _ => false,
423 },
424 _ => false,
425 }
426}