iota_rpc_loadgen/
load_test.rs1use std::{
6 error::Error,
7 time::{Duration, Instant},
8};
9
10use tokio::sync::{mpsc, mpsc::Sender};
11use tracing::error;
12
13use crate::payload::{Command, Payload, Processor, SignerInfo};
14
15struct WorkerThread<R: Processor + Send + Sync + Clone> {
16 processor: R,
17 payload: Payload,
18}
19
20impl<R: Processor + Send + Sync + Clone> WorkerThread<R> {
21 async fn run(&self) -> usize {
22 let mut successful_commands = 0;
23 match self.processor.apply(&self.payload).await {
24 Ok(()) => successful_commands += 1,
25 Err(e) => error!("Thread returns error: {e}"),
26 }
27 successful_commands
28 }
29}
30
31pub struct LoadTestConfig {
32 pub command: Command,
34 pub num_threads: usize,
35 pub divide_tasks: bool,
37 pub signer_info: Option<SignerInfo>,
38 pub num_chunks_per_thread: usize,
39 pub max_repeat: usize,
40}
41
42pub(crate) struct LoadTest<R: Processor + Send + Sync + Clone> {
43 pub processor: R,
44 pub config: LoadTestConfig,
45}
46
47impl<R: Processor + Send + Sync + Clone + 'static> LoadTest<R> {
48 pub(crate) async fn run(&self) -> Result<(), Box<dyn Error>> {
49 let start_time = Instant::now();
50 let payloads = self.processor.prepare(&self.config).await?;
51 let (tx, mut rx) = mpsc::channel(payloads.len());
52
53 self.run_workers(tx, payloads).await;
54
55 let mut num_successful_commands = 0;
57 while let Some(num_successful) = rx.recv().await {
58 num_successful_commands += num_successful;
59 }
60
61 let elapsed_time = start_time.elapsed();
62 let total_commands = num_successful_commands
64 * (self.config.max_repeat + 1)
65 * self.config.num_chunks_per_thread;
66
67 println!(
68 "Total successful commands: {}, total time {:?}, commands per second {:.2}",
69 total_commands,
70 elapsed_time,
71 get_tps(total_commands, elapsed_time),
72 );
73
74 self.processor.dump_cache_to_file(&self.config);
75
76 Ok(())
77 }
78
79 async fn run_workers(&self, tx: Sender<usize>, payloads: Vec<Payload>) {
80 println!("Running with {} threads...", payloads.len());
81 for payload in payloads.iter() {
82 let tx = tx.clone();
83 let worker_thread = WorkerThread {
84 processor: self.processor.clone(),
85 payload: payload.clone(),
86 };
87 tokio::spawn(async move {
88 let num_successful_commands = worker_thread.run().await;
89 tx.send(num_successful_commands).await.unwrap();
90 });
91 }
92 }
93}
94
95fn get_tps(num: usize, duration: Duration) -> f64 {
96 num as f64 / duration.as_secs_f64()
97}