iota_core/
overload_monitor.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::{max, min},
7    hash::Hasher,
8    sync::{
9        Weak,
10        atomic::{AtomicBool, AtomicU32, Ordering},
11    },
12    time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use iota_config::node::AuthorityOverloadConfig;
16use iota_metrics::monitored_scope;
17use iota_types::{
18    digests::TransactionDigest,
19    error::{IotaError, IotaResult},
20    fp_bail,
21};
22use tokio::time::sleep;
23use tracing::{debug, info};
24use twox_hash::XxHash64;
25
26use crate::authority::AuthorityState;
27
28#[derive(Default)]
29pub struct AuthorityOverloadInfo {
30    /// Whether the authority is overloaded.
31    pub is_overload: AtomicBool,
32
33    /// The calculated percentage of transactions to drop.
34    pub load_shedding_percentage: AtomicU32,
35}
36
37impl AuthorityOverloadInfo {
38    pub fn set_overload(&self, load_shedding_percentage: u32) {
39        self.is_overload.store(true, Ordering::Relaxed);
40        self.load_shedding_percentage
41            .store(min(load_shedding_percentage, 100), Ordering::Relaxed);
42    }
43
44    pub fn clear_overload(&self) {
45        self.is_overload.store(false, Ordering::Relaxed);
46        self.load_shedding_percentage.store(0, Ordering::Relaxed);
47    }
48}
49
50const STEADY_OVERLOAD_REDUCTION_PERCENTAGE: u32 = 10;
51const EXECUTION_RATE_RATIO_FOR_COMPARISON: f64 = 0.95;
52const ADDITIONAL_LOAD_SHEDDING: f64 = 0.02;
53
54// The update interval of the random seed used to determine whether a txn should
55// be rejected.
56const SEED_UPDATE_DURATION_SECS: u64 = 30;
57
58// Monitors the overload signals in `authority_state` periodically, and updates
59// its `overload_info` when the signals indicates overload.
60pub async fn overload_monitor(
61    authority_state: Weak<AuthorityState>,
62    config: AuthorityOverloadConfig,
63) {
64    info!("Starting system overload monitor.");
65
66    loop {
67        let authority_exist = check_authority_overload(&authority_state, &config);
68        if !authority_exist {
69            // `authority_state` doesn't exist anymore. Quit overload monitor.
70            break;
71        }
72        sleep(config.overload_monitor_interval).await;
73    }
74
75    info!("Shut down system overload monitor.");
76}
77
78// Checks authority overload signals, and updates authority's `overload_info`.
79// Returns whether the authority state exists.
80fn check_authority_overload(
81    authority_state: &Weak<AuthorityState>,
82    config: &AuthorityOverloadConfig,
83) -> bool {
84    let _scope = monitored_scope("OverloadMonitor::check_authority_overload");
85    let authority_arc = authority_state.upgrade();
86    if authority_arc.is_none() {
87        // `authority_state` doesn't exist anymore.
88        return false;
89    }
90
91    let authority = authority_arc.unwrap();
92    let queueing_latency = authority
93        .metrics
94        .execution_queueing_latency
95        .latency()
96        .unwrap_or_default();
97    let txn_ready_rate = authority.metrics.txn_ready_rate_tracker.lock().rate();
98    let execution_rate = authority.metrics.execution_rate_tracker.lock().rate();
99
100    debug!(
101        "Check authority overload signal, queueing latency {:?}, ready rate {:?}, execution rate {:?}.",
102        queueing_latency, txn_ready_rate, execution_rate
103    );
104
105    let (is_overload, load_shedding_percentage) = check_overload_signals(
106        config,
107        authority
108            .overload_info
109            .load_shedding_percentage
110            .load(Ordering::Relaxed),
111        queueing_latency,
112        txn_ready_rate,
113        execution_rate,
114    );
115
116    if is_overload {
117        authority
118            .overload_info
119            .set_overload(load_shedding_percentage);
120    } else {
121        authority.overload_info.clear_overload();
122    }
123
124    authority
125        .metrics
126        .authority_overload_status
127        .set(is_overload as i64);
128    authority
129        .metrics
130        .authority_load_shedding_percentage
131        .set(load_shedding_percentage as i64);
132    true
133}
134
135// Calculates the percentage of transactions to drop in order to reduce
136// execution queue. Returns the integer percentage between 0 and 100.
137fn calculate_load_shedding_percentage(txn_ready_rate: f64, execution_rate: f64) -> u32 {
138    // When transaction ready rate is practically 0, we aren't adding more load to
139    // the execution driver, so no shedding.
140    // TODO: consensus handler or transaction manager can also be overloaded.
141    if txn_ready_rate < 1e-10 {
142        return 0;
143    }
144
145    // Deflate the execution rate to account for the case that execution_rate is
146    // close to txn_ready_rate.
147    if execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON > txn_ready_rate {
148        return 0;
149    }
150
151    // In order to maintain execution queue length, we need to drop at least (1 -
152    // executionRate / readyRate). To reduce the queue length, here we add 10%
153    // more transactions to drop.
154    (((1.0 - execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON / txn_ready_rate)
155        + ADDITIONAL_LOAD_SHEDDING)
156        .min(1.0)
157        * 100.0)
158        .round() as u32
159}
160
161// Given overload signals (`queueing_latency`, `txn_ready_rate`,
162// `execution_rate`), return whether the authority server should enter load
163// shedding mode, and how much percentage of transactions to drop. Note that the
164// final load shedding percentage should also take the current load shedding
165// percentage into consideration. If we are already shedding 40% load, based on
166// the current txn_ready_rate and execution_rate, we need to shed 10% more, the
167// outcome is that we need to shed 40% + (1 - 40%) * 10% = 46%.
168// When txn_ready_rate is less than execution_rate, we gradually reduce load
169// shedding percentage until the queueing latency is back to normal.
170fn check_overload_signals(
171    config: &AuthorityOverloadConfig,
172    current_load_shedding_percentage: u32,
173    queueing_latency: Duration,
174    txn_ready_rate: f64,
175    execution_rate: f64,
176) -> (bool, u32) {
177    // First, we calculate based on the current `txn_ready_rate` and
178    // `execution_rate`, what's the percentage of traffic to shed from
179    // `txn_ready_rate`.
180    let additional_load_shedding_percentage;
181    if queueing_latency > config.execution_queue_latency_hard_limit {
182        let calculated_load_shedding_percentage =
183            calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
184
185        additional_load_shedding_percentage = if calculated_load_shedding_percentage > 0
186            || txn_ready_rate >= config.safe_transaction_ready_rate as f64
187        {
188            max(
189                calculated_load_shedding_percentage,
190                config.min_load_shedding_percentage_above_hard_limit,
191            )
192        } else {
193            0
194        };
195    } else if queueing_latency > config.execution_queue_latency_soft_limit {
196        additional_load_shedding_percentage =
197            calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
198    } else {
199        additional_load_shedding_percentage = 0;
200    }
201
202    // Next, we calculate the new load shedding percentage.
203    let load_shedding_percentage = if additional_load_shedding_percentage > 0 {
204        // When we need to shed more load, since the `txn_ready_rate` is already
205        // influenced by `current_load_shedding_percentage`, we need to
206        // calculate the new load shedding percentage from
207        // `current_load_shedding_percentage` and
208        // `additional_load_shedding_percentage`.
209        current_load_shedding_percentage
210            + (100 - current_load_shedding_percentage) * additional_load_shedding_percentage / 100
211    } else if txn_ready_rate > config.safe_transaction_ready_rate as f64
212        && current_load_shedding_percentage > 10
213    {
214        // We don't need to shed more load. However, the enqueue rate is still not
215        // minimal. We gradually reduce load shedding percentage (10% at a time)
216        // to gracefully accept more load.
217        current_load_shedding_percentage - STEADY_OVERLOAD_REDUCTION_PERCENTAGE
218    } else {
219        // The current transaction ready rate is considered very low. Turn off load
220        // shedding mode.
221        0
222    };
223
224    let load_shedding_percentage = min(
225        load_shedding_percentage,
226        config.max_load_shedding_percentage,
227    );
228    let overload_status = load_shedding_percentage > 0;
229    (overload_status, load_shedding_percentage)
230}
231
232// Return true if we should reject the txn with `tx_digest`.
233fn should_reject_tx(
234    load_shedding_percentage: u32,
235    tx_digest: TransactionDigest,
236    temporal_seed: u64,
237) -> bool {
238    // TODO: we also need to add a secret salt (e.g. first consensus commit in the
239    // current epoch), to prevent gaming the system.
240    let mut hasher = XxHash64::with_seed(temporal_seed);
241    hasher.write(tx_digest.inner());
242    let value = hasher.finish();
243    value % 100 < load_shedding_percentage as u64
244}
245
246// Checks if we can accept the transaction with `tx_digest`.
247pub fn overload_monitor_accept_tx(
248    load_shedding_percentage: u32,
249    tx_digest: TransactionDigest,
250) -> IotaResult {
251    // Derive a random seed from the epoch time for transaction selection. Changing
252    // the seed every `SEED_UPDATE_DURATION_SECS` interval allows rejected
253    // transaction's retry to have a chance to go through in the future.
254    // Also, using the epoch time instead of randomly generating a seed allows that
255    // all validators makes the same decision.
256    let temporal_seed = SystemTime::now()
257        .duration_since(UNIX_EPOCH)
258        .expect("IOTA did not exist prior to 1970")
259        .as_secs()
260        / SEED_UPDATE_DURATION_SECS;
261
262    if should_reject_tx(load_shedding_percentage, tx_digest, temporal_seed) {
263        // TODO: using `SEED_UPDATE_DURATION_SECS` is a safe suggestion that the time
264        // based seed is definitely different by then. However, a shorter
265        // suggestion may be available.
266        fp_bail!(IotaError::ValidatorOverloadedRetryAfter {
267            retry_after_secs: SEED_UPDATE_DURATION_SECS
268        });
269    }
270    Ok(())
271}
272
273#[cfg(test)]
274#[expect(clippy::disallowed_methods)] // allow unbounded_channel() since tests are simulating txn manager execution
275// driver interaction.
276mod tests {
277    use std::sync::Arc;
278
279    use iota_macros::sim_test;
280    use rand::{
281        Rng, SeedableRng,
282        rngs::{OsRng, StdRng},
283    };
284    use tokio::{
285        sync::{
286            mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
287            oneshot,
288        },
289        task::JoinHandle,
290        time::{Instant, MissedTickBehavior, interval},
291    };
292
293    use super::*;
294    use crate::authority::test_authority_builder::TestAuthorityBuilder;
295
296    #[test]
297    pub fn test_authority_overload_info() {
298        let overload_info = AuthorityOverloadInfo::default();
299        assert!(!overload_info.is_overload.load(Ordering::Relaxed));
300        assert_eq!(
301            overload_info
302                .load_shedding_percentage
303                .load(Ordering::Relaxed),
304            0
305        );
306
307        {
308            overload_info.set_overload(20);
309            assert!(overload_info.is_overload.load(Ordering::Relaxed));
310            assert_eq!(
311                overload_info
312                    .load_shedding_percentage
313                    .load(Ordering::Relaxed),
314                20
315            );
316        }
317
318        // Tests that load shedding percentage can't go beyond 100%.
319        {
320            overload_info.set_overload(110);
321            assert!(overload_info.is_overload.load(Ordering::Relaxed));
322            assert_eq!(
323                overload_info
324                    .load_shedding_percentage
325                    .load(Ordering::Relaxed),
326                100
327            );
328        }
329
330        {
331            overload_info.clear_overload();
332            assert!(!overload_info.is_overload.load(Ordering::Relaxed));
333            assert_eq!(
334                overload_info
335                    .load_shedding_percentage
336                    .load(Ordering::Relaxed),
337                0
338            );
339        }
340    }
341
342    #[test]
343    pub fn test_calculate_load_shedding_ratio() {
344        assert_eq!(calculate_load_shedding_percentage(95.0, 100.1), 0);
345        assert_eq!(calculate_load_shedding_percentage(95.0, 100.0), 2);
346        assert_eq!(calculate_load_shedding_percentage(100.0, 100.0), 7);
347        assert_eq!(calculate_load_shedding_percentage(110.0, 100.0), 16);
348        assert_eq!(calculate_load_shedding_percentage(180.0, 100.0), 49);
349        assert_eq!(calculate_load_shedding_percentage(100.0, 0.0), 100);
350        assert_eq!(calculate_load_shedding_percentage(0.0, 1.0), 0);
351    }
352
353    #[test]
354    pub fn test_check_overload_signals() {
355        let config = AuthorityOverloadConfig {
356            execution_queue_latency_hard_limit: Duration::from_secs(10),
357            execution_queue_latency_soft_limit: Duration::from_secs(1),
358            max_load_shedding_percentage: 90,
359            ..Default::default()
360        };
361
362        // When execution queueing latency is within soft limit, don't start overload
363        // protection.
364        assert_eq!(
365            check_overload_signals(&config, 0, Duration::from_millis(500), 1000.0, 10.0),
366            (false, 0)
367        );
368
369        // When execution queueing latency hits soft limit and execution rate is higher,
370        // don't start overload protection.
371        assert_eq!(
372            check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 120.0),
373            (false, 0)
374        );
375
376        // When execution queueing latency hits soft limit, but not hard limit, start
377        // overload protection.
378        assert_eq!(
379            check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 100.0),
380            (true, 7)
381        );
382
383        // When execution queueing latency hits hard limit, start more aggressive
384        // overload protection.
385        assert_eq!(
386            check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 100.0),
387            (true, 50)
388        );
389
390        // When execution queueing latency hits hard limit and calculated shedding
391        // percentage is higher than
392        // min_load_shedding_percentage_above_hard_limit.
393        assert_eq!(
394            check_overload_signals(&config, 0, Duration::from_secs(11), 240.0, 100.0),
395            (true, 62)
396        );
397
398        // When execution queueing latency hits hard limit, but transaction ready rate
399        // is within safe_transaction_ready_rate, don't start overload protection.
400        assert_eq!(
401            check_overload_signals(&config, 0, Duration::from_secs(11), 20.0, 100.0),
402            (false, 0)
403        );
404
405        // Maximum transactions shed is cap by `max_load_shedding_percentage` config.
406        assert_eq!(
407            check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 0.0),
408            (true, 90)
409        );
410
411        // When the system is already shedding 50% of load, and the current txn ready
412        // rate and execution rate require another 20%, the final shedding rate
413        // is 60%.
414        assert_eq!(
415            check_overload_signals(&config, 50, Duration::from_secs(2), 116.0, 100.0),
416            (true, 60)
417        );
418
419        // Load shedding percentage is gradually reduced when txn ready rate is lower
420        // than execution rate.
421        assert_eq!(
422            check_overload_signals(&config, 90, Duration::from_secs(2), 200.0, 300.0),
423            (true, 80)
424        );
425
426        // When queueing delay is above hard limit, we shed additional 50% every time.
427        assert_eq!(
428            check_overload_signals(&config, 50, Duration::from_secs(11), 100.0, 100.0),
429            (true, 75)
430        );
431    }
432
433    #[tokio::test(flavor = "current_thread")]
434    pub async fn test_check_authority_overload() {
435        telemetry_subscribers::init_for_testing();
436
437        let config = AuthorityOverloadConfig {
438            safe_transaction_ready_rate: 0,
439            ..Default::default()
440        };
441        let state = TestAuthorityBuilder::new()
442            .with_authority_overload_config(config.clone())
443            .build()
444            .await;
445
446        // Initialize latency reporter.
447        for _ in 0..1000 {
448            state
449                .metrics
450                .execution_queueing_latency
451                .report(Duration::from_secs(20));
452        }
453
454        // Creates a simple case to see if authority state overload_info can be updated
455        // correctly by check_authority_overload.
456        let authority = Arc::downgrade(&state);
457        assert!(check_authority_overload(&authority, &config));
458        assert!(state.overload_info.is_overload.load(Ordering::Relaxed));
459        assert_eq!(
460            state
461                .overload_info
462                .load_shedding_percentage
463                .load(Ordering::Relaxed),
464            config.min_load_shedding_percentage_above_hard_limit
465        );
466
467        // Checks that check_authority_overload should return false when the input
468        // authority state doesn't exist.
469        let authority = Arc::downgrade(&state);
470        drop(state);
471        assert!(!check_authority_overload(&authority, &config));
472    }
473
474    // Creates an AuthorityState and starts an overload monitor that monitors its
475    // metrics.
476    async fn start_overload_monitor() -> (Arc<AuthorityState>, JoinHandle<()>) {
477        let overload_config = AuthorityOverloadConfig::default();
478        let state = TestAuthorityBuilder::new()
479            .with_authority_overload_config(overload_config.clone())
480            .build()
481            .await;
482        let authority_state = Arc::downgrade(&state);
483        let monitor_handle = tokio::spawn(async move {
484            overload_monitor(authority_state, overload_config).await;
485        });
486        (state, monitor_handle)
487    }
488
489    // Starts a load generator that generates a steady workload, and also allow it
490    // to accept burst of request through `burst_rx`.
491    // Request tracking is done by the overload monitor inside `authority`.
492    fn start_load_generator(
493        steady_rate: f64,
494        tx: UnboundedSender<Instant>,
495        mut burst_rx: UnboundedReceiver<u32>,
496        authority: Arc<AuthorityState>,
497        enable_load_shedding: bool,
498        total_requests_arc: Arc<AtomicU32>,
499        dropped_requests_arc: Arc<AtomicU32>,
500    ) -> JoinHandle<()> {
501        tokio::spawn(async move {
502            let mut interval = interval(Duration::from_secs_f64(1.0 / steady_rate));
503            let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
504            let mut total_requests: u32 = 0;
505            let mut total_dropped_requests: u32 = 0;
506
507            // Helper function to check whether we should send a request.
508            let mut do_send =
509                |enable_load_shedding: bool, authority: Arc<AuthorityState>| -> bool {
510                    if enable_load_shedding {
511                        let shedding_percentage = authority
512                            .overload_info
513                            .load_shedding_percentage
514                            .load(Ordering::Relaxed);
515                        !(shedding_percentage > 0 && rng.gen_range(0..100) < shedding_percentage)
516                    } else {
517                        true
518                    }
519                };
520
521            loop {
522                tokio::select! {
523                    now = interval.tick() => {
524                        total_requests += 1;
525                        if do_send(enable_load_shedding, authority.clone()) {
526                            if tx.send(now).is_err() {
527                                info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
528                                total_requests_arc.store(total_requests, Ordering::SeqCst);
529                                dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
530                                return;
531                            }
532                            authority.metrics.txn_ready_rate_tracker.lock().record();
533                        } else {
534                            total_dropped_requests += 1;
535                        }
536                    }
537                    Some(burst) = burst_rx.recv() => {
538                        let now = Instant::now();
539                        total_requests += burst;
540                        for _ in 0..burst {
541                            if do_send(enable_load_shedding, authority.clone()) {
542                                if tx.send(now).is_err() {
543                                    info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
544                                    total_requests_arc.store(total_requests, Ordering::SeqCst);
545                                    dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
546                                    return;
547                                }
548                                authority.metrics.txn_ready_rate_tracker.lock().record();
549                            } else {
550                                total_dropped_requests += 1;
551                            }
552                        }
553                    }
554                }
555            }
556        })
557    }
558
559    // Starts a request executor that can consume request based on `execution_rate`.
560    // Request tracking is done by the overload monitor inside `authority`.
561    fn start_executor(
562        execution_rate: f64,
563        mut rx: UnboundedReceiver<Instant>,
564        mut stop_rx: oneshot::Receiver<()>,
565        authority: Arc<AuthorityState>,
566    ) -> JoinHandle<()> {
567        tokio::spawn(async move {
568            let mut interval = interval(Duration::from_secs_f64(1.0 / execution_rate));
569            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
570            loop {
571                tokio::select! {
572                    Some(start_time) = rx.recv() => {
573                        authority.metrics.execution_rate_tracker.lock().record();
574                        authority.metrics.execution_queueing_latency.report(start_time.elapsed());
575                        interval.tick().await;
576                    }
577                    _ = &mut stop_rx => {
578                        info!("Executor stopping");
579                        return;
580                    }
581                }
582            }
583        })
584    }
585
586    // Helper fundtion to periodically print the current overload info.
587    async fn sleep_and_print_stats(state: Arc<AuthorityState>, seconds: u32) {
588        for _ in 0..seconds {
589            info!(
590                "Overload: {:?}. Shedding percentage: {:?}. Queue: {:?}, Ready rate: {:?}. Exec rate: {:?}.",
591                state.overload_info.is_overload.load(Ordering::Relaxed),
592                state
593                    .overload_info
594                    .load_shedding_percentage
595                    .load(Ordering::Relaxed),
596                state.metrics.execution_queueing_latency.latency(),
597                state.metrics.txn_ready_rate_tracker.lock().rate(),
598                state.metrics.execution_rate_tracker.lock().rate(),
599            );
600            sleep(Duration::from_secs(1)).await;
601        }
602    }
603
604    // Running a workload with consistent steady `generator_rate` and
605    // `executor_rate`. It checks that the dropped requests should in between
606    // min_dropping_rate and max_dropping_rate.
607    async fn run_consistent_workload_test(
608        generator_rate: f64,
609        executor_rate: f64,
610        min_dropping_rate: f64,
611        max_dropping_rate: f64,
612    ) {
613        let (state, monitor_handle) = start_overload_monitor().await;
614
615        let (tx, rx) = unbounded_channel();
616        let (_burst_tx, burst_rx) = unbounded_channel();
617        let total_requests = Arc::new(AtomicU32::new(0));
618        let dropped_requests = Arc::new(AtomicU32::new(0));
619        let load_generator = start_load_generator(
620            generator_rate,
621            tx.clone(),
622            burst_rx,
623            state.clone(),
624            true,
625            total_requests.clone(),
626            dropped_requests.clone(),
627        );
628
629        let (stop_tx, stop_rx) = oneshot::channel();
630        let executor = start_executor(executor_rate, rx, stop_rx, state.clone());
631
632        sleep_and_print_stats(state.clone(), 300).await;
633
634        stop_tx.send(()).unwrap();
635        let _ = tokio::join!(load_generator, executor);
636
637        let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
638            / total_requests.load(Ordering::SeqCst) as f64;
639        assert!(min_dropping_rate <= dropped_ratio);
640        assert!(dropped_ratio <= max_dropping_rate);
641
642        monitor_handle.abort();
643        let _ = monitor_handle.await;
644    }
645
646    // Tests that when request generation rate is slower than execution rate, no
647    // requests should be dropped.
648    #[tokio::test(flavor = "current_thread", start_paused = true)]
649    pub async fn test_workload_consistent_no_overload() {
650        telemetry_subscribers::init_for_testing();
651        run_consistent_workload_test(900.0, 1000.0, 0.0, 0.0).await;
652    }
653
654    // Tests that when request generation rate is slightly above execution rate, a
655    // small portion of requests should be dropped.
656    #[tokio::test(flavor = "current_thread", start_paused = true)]
657    pub async fn test_workload_consistent_slightly_overload() {
658        telemetry_subscribers::init_for_testing();
659        // Dropping rate should be around 15%.
660        run_consistent_workload_test(1100.0, 1000.0, 0.05, 0.25).await;
661    }
662
663    // Tests that when request generation rate is much higher than execution rate, a
664    // large portion of requests should be dropped.
665    #[tokio::test(flavor = "current_thread", start_paused = true)]
666    pub async fn test_workload_consistent_overload() {
667        telemetry_subscribers::init_for_testing();
668        // Dropping rate should be around 70%.
669        run_consistent_workload_test(3000.0, 1000.0, 0.6, 0.8).await;
670    }
671
672    // Tests that when there is a very short single spike, no request should be
673    // dropped.
674    #[tokio::test(flavor = "current_thread", start_paused = true)]
675    pub async fn test_workload_single_spike() {
676        telemetry_subscribers::init_for_testing();
677        let (state, monitor_handle) = start_overload_monitor().await;
678
679        let (tx, rx) = unbounded_channel();
680        let (burst_tx, burst_rx) = unbounded_channel();
681        let total_requests = Arc::new(AtomicU32::new(0));
682        let dropped_requests = Arc::new(AtomicU32::new(0));
683        let load_generator = start_load_generator(
684            10.0,
685            tx.clone(),
686            burst_rx,
687            state.clone(),
688            true,
689            total_requests.clone(),
690            dropped_requests.clone(),
691        );
692
693        let (stop_tx, stop_rx) = oneshot::channel();
694        let executor = start_executor(1000.0, rx, stop_rx, state.clone());
695
696        sleep_and_print_stats(state.clone(), 10).await;
697        // Send out a burst of 5000 requests.
698        burst_tx.send(5000).unwrap();
699        sleep_and_print_stats(state.clone(), 20).await;
700
701        stop_tx.send(()).unwrap();
702        let _ = tokio::join!(load_generator, executor);
703
704        // No requests should be dropped.
705        assert_eq!(dropped_requests.load(Ordering::SeqCst), 0);
706
707        monitor_handle.abort();
708        let _ = monitor_handle.await;
709    }
710
711    // Tests that when there are regular spikes that keep queueing latency
712    // consistently high, overload monitor should kick in and shed load.
713    #[tokio::test(flavor = "current_thread", start_paused = true)]
714    pub async fn test_workload_consistent_short_spike() {
715        telemetry_subscribers::init_for_testing();
716        let (state, monitor_handle) = start_overload_monitor().await;
717
718        let (tx, rx) = unbounded_channel();
719        let (burst_tx, burst_rx) = unbounded_channel();
720        let total_requests = Arc::new(AtomicU32::new(0));
721        let dropped_requests = Arc::new(AtomicU32::new(0));
722        let load_generator = start_load_generator(
723            10.0,
724            tx.clone(),
725            burst_rx,
726            state.clone(),
727            true,
728            total_requests.clone(),
729            dropped_requests.clone(),
730        );
731
732        let (stop_tx, stop_rx) = oneshot::channel();
733        let executor = start_executor(1000.0, rx, stop_rx, state.clone());
734
735        sleep_and_print_stats(state.clone(), 15).await;
736        for _ in 0..16 {
737            // Regularly send out a burst of request.
738            burst_tx.send(10000).unwrap();
739            sleep_and_print_stats(state.clone(), 5).await;
740        }
741
742        stop_tx.send(()).unwrap();
743        let _ = tokio::join!(load_generator, executor);
744        let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
745            / total_requests.load(Ordering::SeqCst) as f64;
746
747        // We should drop about 50% of request because the burst throughput is about 2x
748        // of execution rate.
749        assert!(0.4 < dropped_ratio);
750        assert!(dropped_ratio < 0.6);
751
752        monitor_handle.abort();
753        let _ = monitor_handle.await;
754    }
755
756    // Tests that the ratio of rejected transactions created randomly matches load
757    // shedding percentage in the overload monitor.
758    #[test]
759    fn test_txn_rejection_rate() {
760        for rejection_percentage in 0..=100 {
761            let mut reject_count = 0;
762            for _ in 0..10000 {
763                let digest = TransactionDigest::random();
764                if should_reject_tx(rejection_percentage, digest, 28455473) {
765                    reject_count += 1;
766                }
767            }
768
769            debug!(
770                "Rejection percentage: {:?}, reject count: {:?}.",
771                rejection_percentage, reject_count
772            );
773            // Give it a 3% fluctuation.
774            assert!(rejection_percentage as f32 / 100.0 - 0.03 < reject_count as f32 / 10000.0);
775            assert!(reject_count as f32 / 10000.0 < rejection_percentage as f32 / 100.0 + 0.03);
776        }
777    }
778
779    // Tests that rejected transaction will have a chance to be accepted in the
780    // future.
781    #[sim_test]
782    async fn test_txn_rejection_over_time() {
783        let start_time = Instant::now();
784        let mut digest = TransactionDigest::random();
785        let mut temporal_seed = 1708108277 / SEED_UPDATE_DURATION_SECS;
786        let load_shedding_percentage = 50;
787
788        // Find a rejected transaction with 50% rejection rate.
789        while !should_reject_tx(load_shedding_percentage, digest, temporal_seed)
790            && start_time.elapsed() < Duration::from_secs(30)
791        {
792            digest = TransactionDigest::random();
793        }
794
795        // It should always be rejected using the current temporal_seed.
796        for _ in 0..100 {
797            assert!(should_reject_tx(
798                load_shedding_percentage,
799                digest,
800                temporal_seed
801            ));
802        }
803
804        // It will be accepted in the future.
805        temporal_seed += 1;
806        while should_reject_tx(load_shedding_percentage, digest, temporal_seed)
807            && start_time.elapsed() < Duration::from_secs(30)
808        {
809            temporal_seed += 1;
810        }
811
812        // Make sure that the tests can finish within 30 seconds.
813        assert!(start_time.elapsed() < Duration::from_secs(30));
814    }
815}