iota_aws_orchestrator/
benchmark.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fmt::{Debug, Display},
7    hash::Hash,
8    str::FromStr,
9    time::Duration,
10};
11
12use serde::{Deserialize, Serialize, de::DeserializeOwned};
13
14use crate::{faults::FaultsType, measurement::MeasurementsCollection};
15
16pub trait BenchmarkType:
17    Serialize
18    + DeserializeOwned
19    + Default
20    + Clone
21    + FromStr
22    + Display
23    + Debug
24    + PartialEq
25    + Eq
26    + Hash
27    + PartialOrd
28    + Ord
29    + FromStr
30{
31}
32
33/// The benchmark parameters for a run.
34#[derive(Serialize, Deserialize, Clone)]
35pub struct BenchmarkParameters<T> {
36    /// The type of benchmark to run.
37    pub benchmark_type: T,
38    /// The committee size.
39    pub nodes: usize,
40    /// The number of (crash-)faults.
41    pub faults: FaultsType,
42    /// The total load (tx/s) to submit to the system.
43    pub load: usize,
44    /// The duration of the benchmark.
45    pub duration: Duration,
46}
47
48impl<T: BenchmarkType> Default for BenchmarkParameters<T> {
49    fn default() -> Self {
50        Self {
51            benchmark_type: T::default(),
52            nodes: 4,
53            faults: FaultsType::default(),
54            load: 500,
55            duration: Duration::from_secs(60),
56        }
57    }
58}
59
60impl<T: BenchmarkType> Debug for BenchmarkParameters<T> {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        write!(
63            f,
64            "{:?}-{:?}-{}-{}",
65            self.benchmark_type, self.faults, self.nodes, self.load
66        )
67    }
68}
69
70impl<T> Display for BenchmarkParameters<T> {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        write!(
73            f,
74            "{} nodes ({}) - {} tx/s",
75            self.nodes, self.faults, self.load
76        )
77    }
78}
79
80impl<T> BenchmarkParameters<T> {
81    /// Make a new benchmark parameters.
82    pub fn new(
83        benchmark_type: T,
84        nodes: usize,
85        faults: FaultsType,
86        load: usize,
87        duration: Duration,
88    ) -> Self {
89        Self {
90            benchmark_type,
91            nodes,
92            faults,
93            load,
94            duration,
95        }
96    }
97}
98
99/// The load type to submit to the nodes.
100pub enum LoadType {
101    /// Submit a fixed set of loads (one per benchmark run).
102    Fixed(Vec<usize>),
103
104    /// Search for the breaking point of the L-graph.
105    // TODO: Doesn't work very well, use tps regression as additional signal.
106    Search {
107        /// The initial load to test (and use a baseline).
108        starting_load: usize,
109        /// The maximum number of iterations before converging on a breaking
110        /// point.
111        max_iterations: usize,
112    },
113}
114
115/// Generate benchmark parameters (one set of parameters per run).
116// TODO: The rusty thing to do would be to implement Iter.
117pub struct BenchmarkParametersGenerator<T> {
118    /// The type of benchmark to run.
119    benchmark_type: T,
120    /// The committee size.
121    pub nodes: usize,
122    /// The load type.
123    load_type: LoadType,
124    /// The number of faulty nodes.
125    pub faults: FaultsType,
126    /// The duration of the benchmark.
127    duration: Duration,
128    /// The load of the next benchmark run.
129    next_load: Option<usize>,
130    /// Temporary hold a lower bound of the breaking point.
131    lower_bound_result: Option<MeasurementsCollection<T>>,
132    /// Temporary hold an upper bound of the breaking point.
133    upper_bound_result: Option<MeasurementsCollection<T>>,
134    /// The current number of iterations.
135    iterations: usize,
136}
137
138impl<T: BenchmarkType> Iterator for BenchmarkParametersGenerator<T> {
139    type Item = BenchmarkParameters<T>;
140
141    /// Return the next set of benchmark parameters to run.
142    fn next(&mut self) -> Option<Self::Item> {
143        self.next_load.map(|load| {
144            BenchmarkParameters::new(
145                self.benchmark_type.clone(),
146                self.nodes,
147                self.faults.clone(),
148                load,
149                self.duration,
150            )
151        })
152    }
153}
154
155impl<T: BenchmarkType> BenchmarkParametersGenerator<T> {
156    /// The default benchmark duration.
157    const DEFAULT_DURATION: Duration = Duration::from_secs(180);
158
159    /// make a new generator.
160    pub fn new(nodes: usize, mut load_type: LoadType) -> Self {
161        let next_load = match &mut load_type {
162            LoadType::Fixed(loads) => {
163                if loads.is_empty() {
164                    None
165                } else {
166                    Some(loads.remove(0))
167                }
168            }
169            LoadType::Search { starting_load, .. } => Some(*starting_load),
170        };
171        Self {
172            benchmark_type: T::default(),
173            nodes,
174            load_type,
175            faults: FaultsType::default(),
176            duration: Self::DEFAULT_DURATION,
177            next_load,
178            lower_bound_result: None,
179            upper_bound_result: None,
180            iterations: 0,
181        }
182    }
183
184    /// Set the benchmark type.
185    pub fn with_benchmark_type(mut self, benchmark_type: T) -> Self {
186        self.benchmark_type = benchmark_type;
187        self
188    }
189
190    /// Set crash-recovery pattern and the number of faulty nodes.
191    pub fn with_faults(mut self, faults: FaultsType) -> Self {
192        self.faults = faults;
193        self
194    }
195
196    /// Set a custom benchmark duration.
197    pub fn with_custom_duration(mut self, duration: Duration) -> Self {
198        self.duration = duration;
199        self
200    }
201
202    /// Detects whether the latest benchmark parameters run the system out of
203    /// capacity.
204    fn out_of_capacity(
205        last_result: &MeasurementsCollection<T>,
206        new_result: &MeasurementsCollection<T>,
207    ) -> bool {
208        // We consider the system is out of capacity if the latency increased by over 5x
209        // with respect to the latest run.
210        let threshold = last_result.aggregate_average_latency() * 5;
211        let high_latency = new_result.aggregate_average_latency() > threshold;
212
213        // Or if the throughput is less than 2/3 of the input rate.
214        let last_load = new_result.transaction_load() as u64;
215        let no_throughput_increase = new_result.aggregate_tps() < (2 * last_load / 3);
216
217        high_latency || no_throughput_increase
218    }
219
220    /// Register a new benchmark measurements collection. These results are used
221    /// to determine whether the system reached its breaking point.
222    pub fn register_result(&mut self, result: MeasurementsCollection<T>) {
223        self.next_load = match &mut self.load_type {
224            LoadType::Fixed(loads) => {
225                if loads.is_empty() {
226                    None
227                } else {
228                    Some(loads.remove(0))
229                }
230            }
231            LoadType::Search { max_iterations, .. } => {
232                // Terminate the search.
233                if self.iterations >= *max_iterations {
234                    None
235
236                // Search for the breaking point.
237                } else {
238                    self.iterations += 1;
239                    match (&mut self.lower_bound_result, &mut self.upper_bound_result) {
240                        (None, None) => {
241                            let next = result.transaction_load() * 2;
242                            self.lower_bound_result = Some(result);
243                            Some(next)
244                        }
245                        (Some(lower), None) => {
246                            if Self::out_of_capacity(lower, &result) {
247                                let next =
248                                    (lower.transaction_load() + result.transaction_load()) / 2;
249                                self.upper_bound_result = Some(result);
250                                Some(next)
251                            } else {
252                                let next = result.transaction_load() * 2;
253                                *lower = result;
254                                Some(next)
255                            }
256                        }
257                        (Some(lower), Some(upper)) => {
258                            if Self::out_of_capacity(lower, &result) {
259                                *upper = result;
260                            } else {
261                                *lower = result;
262                            }
263                            Some((lower.transaction_load() + upper.transaction_load()) / 2)
264                        }
265                        _ => panic!("Benchmark parameters generator is in an incoherent state"),
266                    }
267                }
268            }
269        };
270    }
271}
272
273#[cfg(test)]
274pub mod test {
275    use std::{fmt::Display, str::FromStr};
276
277    use serde::{Deserialize, Serialize};
278
279    use super::{BenchmarkParametersGenerator, BenchmarkType, LoadType};
280    use crate::{
281        measurement::{Measurement, MeasurementsCollection},
282        settings::Settings,
283    };
284
285    /// Mock benchmark type for unit tests.
286    #[derive(
287        Serialize, Deserialize, Debug, Clone, PartialEq, PartialOrd, Eq, Ord, Hash, Default,
288    )]
289    pub struct TestBenchmarkType;
290
291    impl Display for TestBenchmarkType {
292        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293            write!(f, "TestBenchmarkType")
294        }
295    }
296
297    impl FromStr for TestBenchmarkType {
298        type Err = ();
299
300        fn from_str(_s: &str) -> Result<Self, Self::Err> {
301            Ok(Self {})
302        }
303    }
304
305    impl BenchmarkType for TestBenchmarkType {}
306
307    #[test]
308    fn set_lower_bound() {
309        let settings = Settings::new_for_test();
310        let nodes = 4;
311        let load = LoadType::Search {
312            starting_load: 100,
313            max_iterations: 10,
314        };
315        let mut generator = BenchmarkParametersGenerator::<TestBenchmarkType>::new(nodes, load);
316        let parameters = generator.next().unwrap();
317
318        let collection = MeasurementsCollection::new(&settings, parameters);
319        generator.register_result(collection);
320
321        let next_parameters = generator.next();
322        assert!(next_parameters.is_some());
323        assert_eq!(next_parameters.unwrap().load, 200);
324
325        assert!(generator.lower_bound_result.is_some());
326        assert_eq!(
327            generator.lower_bound_result.unwrap().transaction_load(),
328            100
329        );
330        assert!(generator.upper_bound_result.is_none());
331    }
332
333    #[test]
334    fn set_upper_bound() {
335        let settings = Settings::new_for_test();
336        let nodes = 4;
337        let load = LoadType::Search {
338            starting_load: 100,
339            max_iterations: 10,
340        };
341        let mut generator = BenchmarkParametersGenerator::<TestBenchmarkType>::new(nodes, load);
342        let first_parameters = generator.next().unwrap();
343
344        // Register a first result (zero latency). This sets the lower bound.
345        let collection = MeasurementsCollection::new(&settings, first_parameters);
346        generator.register_result(collection);
347        let second_parameters = generator.next().unwrap();
348
349        // Register a second result (with positive latency). This sets the upper bound.
350        let mut collection = MeasurementsCollection::new(&settings, second_parameters);
351        let measurement = Measurement::new_for_test();
352        collection.scrapers.insert(1, vec![measurement]);
353        generator.register_result(collection);
354
355        // Ensure the next load is between the upper and the lower bound.
356        let third_parameters = generator.next();
357        assert!(third_parameters.is_some());
358        assert_eq!(third_parameters.unwrap().load, 150);
359
360        assert!(generator.lower_bound_result.is_some());
361        assert_eq!(
362            generator.lower_bound_result.unwrap().transaction_load(),
363            100
364        );
365        assert!(generator.upper_bound_result.is_some());
366        assert_eq!(
367            generator.upper_bound_result.unwrap().transaction_load(),
368            200
369        );
370    }
371
372    #[test]
373    fn max_iterations() {
374        let settings = Settings::new_for_test();
375        let nodes = 4;
376        let load = LoadType::Search {
377            starting_load: 100,
378            max_iterations: 0,
379        };
380        let mut generator = BenchmarkParametersGenerator::<TestBenchmarkType>::new(nodes, load);
381        let parameters = generator.next().unwrap();
382
383        let collection = MeasurementsCollection::new(&settings, parameters);
384        generator.register_result(collection);
385
386        let next_parameters = generator.next();
387        assert!(next_parameters.is_none());
388    }
389}