iota_surfer/
surfer_task.rs1use std::{collections::HashMap, sync::Arc};
6
7use iota_core::authority::authority_store_tables::LiveObject;
8use iota_types::{
9 base_types::{IotaAddress, ObjectRef},
10 object::Owner,
11};
12use rand::{Rng, SeedableRng, rngs::StdRng};
13use test_cluster::TestCluster;
14use tokio::sync::{RwLock, watch};
15
16use crate::{
17 surf_strategy::SurfStrategy,
18 surfer_state::{ImmObjects, OwnedObjects, SharedObjects, SurfStatistics, SurferState},
19};
20
21pub struct SurferTask {
22 pub state: SurferState,
23 pub surf_strategy: SurfStrategy,
24 pub exit_rcv: watch::Receiver<()>,
25}
26
27impl SurferTask {
28 pub async fn create_surfer_tasks(
29 cluster: Arc<TestCluster>,
30 seed: u64,
31 exit_rcv: watch::Receiver<()>,
32 skip_accounts: usize,
33 surf_strategy: SurfStrategy,
34 ) -> Vec<SurferTask> {
35 let mut rng = StdRng::seed_from_u64(seed);
36 let immutable_objects: ImmObjects = Arc::new(RwLock::new(HashMap::new()));
37 let shared_objects: SharedObjects = Arc::new(RwLock::new(HashMap::new()));
38
39 let mut accounts: HashMap<IotaAddress, (Option<ObjectRef>, OwnedObjects)> = cluster
40 .get_addresses()
41 .iter()
42 .skip(skip_accounts)
43 .map(|address| (*address, (None, HashMap::new())))
44 .collect();
45 let node = cluster
46 .swarm
47 .all_nodes()
48 .flat_map(|node| node.get_node_handle())
49 .next()
50 .unwrap();
51 let all_live_objects: Vec<_> = node.with(|node| {
52 node.state()
53 .get_accumulator_store()
54 .iter_cached_live_object_set_for_testing()
55 .collect()
56 });
57 for obj in all_live_objects {
58 match obj {
59 LiveObject::Normal(obj) => {
60 if let Some(struct_tag) = obj.struct_tag() {
61 let obj_ref = obj.compute_object_reference();
62 match obj.owner {
63 Owner::Immutable => {
64 immutable_objects
65 .write()
66 .await
67 .entry(struct_tag)
68 .or_default()
69 .push(obj_ref);
70 }
71 Owner::Shared {
72 initial_shared_version,
73 } => {
74 shared_objects
75 .write()
76 .await
77 .entry(struct_tag)
78 .or_default()
79 .push((obj_ref.0, initial_shared_version));
80 }
81 Owner::AddressOwner(address) => {
82 if let Some((gas_object, owned_objects)) =
83 accounts.get_mut(&address)
84 {
85 if obj.is_gas_coin() && gas_object.is_none() {
86 gas_object.replace(obj_ref);
87 } else {
88 owned_objects
89 .entry(struct_tag)
90 .or_default()
91 .insert(obj_ref);
92 }
93 }
94 }
95 Owner::ObjectOwner(_) => (),
96 }
97 }
98 }
99 LiveObject::Wrapped(_) => unreachable!("Explicitly skipped wrapped objects"),
100 }
101 }
102 let entry_functions = Arc::new(RwLock::new(vec![]));
103 accounts
104 .into_iter()
105 .enumerate()
106 .map(|(id, (address, (gas_object, owned_objects)))| {
107 let seed = rng.gen::<u64>();
108 let state_rng = StdRng::seed_from_u64(seed);
109 let state = SurferState::new(
110 id,
111 cluster.clone(),
112 state_rng,
113 address,
114 gas_object.unwrap(),
115 owned_objects,
116 immutable_objects.clone(),
117 shared_objects.clone(),
118 entry_functions.clone(),
119 );
120 SurferTask {
121 state,
122 surf_strategy: surf_strategy.clone(),
123 exit_rcv: exit_rcv.clone(),
124 }
125 })
126 .collect()
127 }
128
129 pub async fn surf(mut self) -> SurfStatistics {
130 loop {
131 let entry_functions = self.state.entry_functions.read().await.clone();
132
133 tokio::select! {
134 _ = self.surf_strategy
135 .surf_for_a_while(&mut self.state, entry_functions) => {
136 continue;
137 }
138
139 _ = self.exit_rcv.changed() => {
140 return self.state.stats;
141 }
142 }
143 }
144 }
145}