iota_surfer/
surfer_task.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use iota_core::authority::authority_store_tables::LiveObject;
8use iota_types::{
9    base_types::{IotaAddress, ObjectRef},
10    object::Owner,
11};
12use rand::{Rng, SeedableRng, rngs::StdRng};
13use test_cluster::TestCluster;
14use tokio::sync::{RwLock, watch};
15
16use crate::{
17    surf_strategy::SurfStrategy,
18    surfer_state::{ImmObjects, OwnedObjects, SharedObjects, SurfStatistics, SurferState},
19};
20
21pub struct SurferTask {
22    pub state: SurferState,
23    pub surf_strategy: SurfStrategy,
24    pub exit_rcv: watch::Receiver<()>,
25}
26
27impl SurferTask {
28    pub async fn create_surfer_tasks(
29        cluster: Arc<TestCluster>,
30        seed: u64,
31        exit_rcv: watch::Receiver<()>,
32        skip_accounts: usize,
33        surf_strategy: SurfStrategy,
34    ) -> Vec<SurferTask> {
35        let mut rng = StdRng::seed_from_u64(seed);
36        let immutable_objects: ImmObjects = Arc::new(RwLock::new(HashMap::new()));
37        let shared_objects: SharedObjects = Arc::new(RwLock::new(HashMap::new()));
38
39        let mut accounts: HashMap<IotaAddress, (Option<ObjectRef>, OwnedObjects)> = cluster
40            .get_addresses()
41            .iter()
42            .skip(skip_accounts)
43            .map(|address| (*address, (None, HashMap::new())))
44            .collect();
45        let node = cluster
46            .swarm
47            .all_nodes()
48            .flat_map(|node| node.get_node_handle())
49            .next()
50            .unwrap();
51        let all_live_objects: Vec<_> = node.with(|node| {
52            node.state()
53                .get_accumulator_store()
54                .iter_cached_live_object_set_for_testing()
55                .collect()
56        });
57        for obj in all_live_objects {
58            match obj {
59                LiveObject::Normal(obj) => {
60                    if let Some(struct_tag) = obj.struct_tag() {
61                        let obj_ref = obj.compute_object_reference();
62                        match obj.owner {
63                            Owner::Immutable => {
64                                immutable_objects
65                                    .write()
66                                    .await
67                                    .entry(struct_tag)
68                                    .or_default()
69                                    .push(obj_ref);
70                            }
71                            Owner::Shared {
72                                initial_shared_version,
73                            } => {
74                                shared_objects
75                                    .write()
76                                    .await
77                                    .entry(struct_tag)
78                                    .or_default()
79                                    .push((obj_ref.0, initial_shared_version));
80                            }
81                            Owner::AddressOwner(address) => {
82                                if let Some((gas_object, owned_objects)) =
83                                    accounts.get_mut(&address)
84                                {
85                                    if obj.is_gas_coin() && gas_object.is_none() {
86                                        gas_object.replace(obj_ref);
87                                    } else {
88                                        owned_objects
89                                            .entry(struct_tag)
90                                            .or_default()
91                                            .insert(obj_ref);
92                                    }
93                                }
94                            }
95                            Owner::ObjectOwner(_) => (),
96                        }
97                    }
98                }
99                LiveObject::Wrapped(_) => unreachable!("Explicitly skipped wrapped objects"),
100            }
101        }
102        let entry_functions = Arc::new(RwLock::new(vec![]));
103        accounts
104            .into_iter()
105            .enumerate()
106            .map(|(id, (address, (gas_object, owned_objects)))| {
107                let seed = rng.gen::<u64>();
108                let state_rng = StdRng::seed_from_u64(seed);
109                let state = SurferState::new(
110                    id,
111                    cluster.clone(),
112                    state_rng,
113                    address,
114                    gas_object.unwrap(),
115                    owned_objects,
116                    immutable_objects.clone(),
117                    shared_objects.clone(),
118                    entry_functions.clone(),
119                );
120                SurferTask {
121                    state,
122                    surf_strategy: surf_strategy.clone(),
123                    exit_rcv: exit_rcv.clone(),
124                }
125            })
126            .collect()
127    }
128
129    pub async fn surf(mut self) -> SurfStatistics {
130        loop {
131            let entry_functions = self.state.entry_functions.read().await.clone();
132
133            tokio::select! {
134                _ = self.surf_strategy
135                .surf_for_a_while(&mut self.state, entry_functions) => {
136                    continue;
137                }
138
139                _ = self.exit_rcv.changed() => {
140                    return self.state.stats;
141                }
142            }
143        }
144    }
145}