iota_rpc_loadgen/
load_test.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    error::Error,
7    time::{Duration, Instant},
8};
9
10use tokio::sync::{mpsc, mpsc::Sender};
11use tracing::error;
12
13use crate::payload::{Command, Payload, Processor, SignerInfo};
14
15struct WorkerThread<R: Processor + Send + Sync + Clone> {
16    processor: R,
17    payload: Payload,
18}
19
20impl<R: Processor + Send + Sync + Clone> WorkerThread<R> {
21    async fn run(&self) -> usize {
22        let mut successful_commands = 0;
23        match self.processor.apply(&self.payload).await {
24            Ok(()) => successful_commands += 1,
25            Err(e) => error!("Thread returns error: {e}"),
26        }
27        successful_commands
28    }
29}
30
31pub struct LoadTestConfig {
32    // TODO: support multiple commands
33    pub command: Command,
34    pub num_threads: usize,
35    /// should divide tasks across multiple threads
36    pub divide_tasks: bool,
37    pub signer_info: Option<SignerInfo>,
38    pub num_chunks_per_thread: usize,
39    pub max_repeat: usize,
40}
41
42pub(crate) struct LoadTest<R: Processor + Send + Sync + Clone> {
43    pub processor: R,
44    pub config: LoadTestConfig,
45}
46
47impl<R: Processor + Send + Sync + Clone + 'static> LoadTest<R> {
48    pub(crate) async fn run(&self) -> Result<(), Box<dyn Error>> {
49        let start_time = Instant::now();
50        let payloads = self.processor.prepare(&self.config).await?;
51        let (tx, mut rx) = mpsc::channel(payloads.len());
52
53        self.run_workers(tx, payloads).await;
54
55        // Collect the results from the worker threads
56        let mut num_successful_commands = 0;
57        while let Some(num_successful) = rx.recv().await {
58            num_successful_commands += num_successful;
59        }
60
61        let elapsed_time = start_time.elapsed();
62        // TODO(chris): clean up this logic
63        let total_commands = num_successful_commands
64            * (self.config.max_repeat + 1)
65            * self.config.num_chunks_per_thread;
66
67        println!(
68            "Total successful commands: {}, total time {:?}, commands per second {:.2}",
69            total_commands,
70            elapsed_time,
71            get_tps(total_commands, elapsed_time),
72        );
73
74        self.processor.dump_cache_to_file(&self.config);
75
76        Ok(())
77    }
78
79    async fn run_workers(&self, tx: Sender<usize>, payloads: Vec<Payload>) {
80        println!("Running with {} threads...", payloads.len());
81        for payload in payloads.iter() {
82            let tx = tx.clone();
83            let worker_thread = WorkerThread {
84                processor: self.processor.clone(),
85                payload: payload.clone(),
86            };
87            tokio::spawn(async move {
88                let num_successful_commands = worker_thread.run().await;
89                tx.send(num_successful_commands).await.unwrap();
90            });
91        }
92    }
93}
94
95fn get_tps(num: usize, duration: Duration) -> f64 {
96    num as f64 / duration.as_secs_f64()
97}