1use std::{
6 cmp::{max, min},
7 hash::Hasher,
8 sync::{
9 Weak,
10 atomic::{AtomicBool, AtomicU32, Ordering},
11 },
12 time::{Duration, SystemTime, UNIX_EPOCH},
13};
14
15use iota_config::node::AuthorityOverloadConfig;
16use iota_metrics::monitored_scope;
17use iota_types::{
18 digests::TransactionDigest,
19 error::{IotaError, IotaResult},
20 fp_bail,
21};
22use tokio::time::sleep;
23use tracing::{debug, info};
24use twox_hash::XxHash64;
25
26use crate::authority::AuthorityState;
27
28#[derive(Default)]
29pub struct AuthorityOverloadInfo {
30 pub is_overload: AtomicBool,
32
33 pub load_shedding_percentage: AtomicU32,
35}
36
37impl AuthorityOverloadInfo {
38 pub fn set_overload(&self, load_shedding_percentage: u32) {
39 self.is_overload.store(true, Ordering::Relaxed);
40 self.load_shedding_percentage
41 .store(min(load_shedding_percentage, 100), Ordering::Relaxed);
42 }
43
44 pub fn clear_overload(&self) {
45 self.is_overload.store(false, Ordering::Relaxed);
46 self.load_shedding_percentage.store(0, Ordering::Relaxed);
47 }
48}
49
50const STEADY_OVERLOAD_REDUCTION_PERCENTAGE: u32 = 10;
51const EXECUTION_RATE_RATIO_FOR_COMPARISON: f64 = 0.95;
52const ADDITIONAL_LOAD_SHEDDING: f64 = 0.02;
53
54const SEED_UPDATE_DURATION_SECS: u64 = 30;
57
58pub async fn overload_monitor(
61 authority_state: Weak<AuthorityState>,
62 config: AuthorityOverloadConfig,
63) {
64 info!("Starting system overload monitor.");
65
66 loop {
67 let authority_exist = check_authority_overload(&authority_state, &config);
68 if !authority_exist {
69 break;
71 }
72 sleep(config.overload_monitor_interval).await;
73 }
74
75 info!("Shut down system overload monitor.");
76}
77
78fn check_authority_overload(
81 authority_state: &Weak<AuthorityState>,
82 config: &AuthorityOverloadConfig,
83) -> bool {
84 let _scope = monitored_scope("OverloadMonitor::check_authority_overload");
85 let authority_arc = authority_state.upgrade();
86 if authority_arc.is_none() {
87 return false;
89 }
90
91 let authority = authority_arc.unwrap();
92 let queueing_latency = authority
93 .metrics
94 .execution_queueing_latency
95 .latency()
96 .unwrap_or_default();
97 let txn_ready_rate = authority.metrics.txn_ready_rate_tracker.lock().rate();
98 let execution_rate = authority.metrics.execution_rate_tracker.lock().rate();
99
100 debug!(
101 "Check authority overload signal, queueing latency {:?}, ready rate {:?}, execution rate {:?}.",
102 queueing_latency, txn_ready_rate, execution_rate
103 );
104
105 let (is_overload, load_shedding_percentage) = check_overload_signals(
106 config,
107 authority
108 .overload_info
109 .load_shedding_percentage
110 .load(Ordering::Relaxed),
111 queueing_latency,
112 txn_ready_rate,
113 execution_rate,
114 );
115
116 if is_overload {
117 authority
118 .overload_info
119 .set_overload(load_shedding_percentage);
120 } else {
121 authority.overload_info.clear_overload();
122 }
123
124 authority
125 .metrics
126 .authority_overload_status
127 .set(is_overload as i64);
128 authority
129 .metrics
130 .authority_load_shedding_percentage
131 .set(load_shedding_percentage as i64);
132 true
133}
134
135fn calculate_load_shedding_percentage(txn_ready_rate: f64, execution_rate: f64) -> u32 {
138 if txn_ready_rate < 1e-10 {
142 return 0;
143 }
144
145 if execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON > txn_ready_rate {
148 return 0;
149 }
150
151 (((1.0 - execution_rate * EXECUTION_RATE_RATIO_FOR_COMPARISON / txn_ready_rate)
155 + ADDITIONAL_LOAD_SHEDDING)
156 .min(1.0)
157 * 100.0)
158 .round() as u32
159}
160
161fn check_overload_signals(
171 config: &AuthorityOverloadConfig,
172 current_load_shedding_percentage: u32,
173 queueing_latency: Duration,
174 txn_ready_rate: f64,
175 execution_rate: f64,
176) -> (bool, u32) {
177 let additional_load_shedding_percentage;
181 if queueing_latency > config.execution_queue_latency_hard_limit {
182 let calculated_load_shedding_percentage =
183 calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
184
185 additional_load_shedding_percentage = if calculated_load_shedding_percentage > 0
186 || txn_ready_rate >= config.safe_transaction_ready_rate as f64
187 {
188 max(
189 calculated_load_shedding_percentage,
190 config.min_load_shedding_percentage_above_hard_limit,
191 )
192 } else {
193 0
194 };
195 } else if queueing_latency > config.execution_queue_latency_soft_limit {
196 additional_load_shedding_percentage =
197 calculate_load_shedding_percentage(txn_ready_rate, execution_rate);
198 } else {
199 additional_load_shedding_percentage = 0;
200 }
201
202 let load_shedding_percentage = if additional_load_shedding_percentage > 0 {
204 current_load_shedding_percentage
210 + (100 - current_load_shedding_percentage) * additional_load_shedding_percentage / 100
211 } else if txn_ready_rate > config.safe_transaction_ready_rate as f64
212 && current_load_shedding_percentage > 10
213 {
214 current_load_shedding_percentage - STEADY_OVERLOAD_REDUCTION_PERCENTAGE
218 } else {
219 0
222 };
223
224 let load_shedding_percentage = min(
225 load_shedding_percentage,
226 config.max_load_shedding_percentage,
227 );
228 let overload_status = load_shedding_percentage > 0;
229 (overload_status, load_shedding_percentage)
230}
231
232fn should_reject_tx(
234 load_shedding_percentage: u32,
235 tx_digest: TransactionDigest,
236 temporal_seed: u64,
237) -> bool {
238 let mut hasher = XxHash64::with_seed(temporal_seed);
241 hasher.write(tx_digest.inner());
242 let value = hasher.finish();
243 value % 100 < load_shedding_percentage as u64
244}
245
246pub fn overload_monitor_accept_tx(
248 load_shedding_percentage: u32,
249 tx_digest: TransactionDigest,
250) -> IotaResult {
251 let temporal_seed = SystemTime::now()
257 .duration_since(UNIX_EPOCH)
258 .expect("IOTA did not exist prior to 1970")
259 .as_secs()
260 / SEED_UPDATE_DURATION_SECS;
261
262 if should_reject_tx(load_shedding_percentage, tx_digest, temporal_seed) {
263 fp_bail!(IotaError::ValidatorOverloadedRetryAfter {
267 retry_after_secs: SEED_UPDATE_DURATION_SECS
268 });
269 }
270 Ok(())
271}
272
273#[cfg(test)]
274#[expect(clippy::disallowed_methods)] mod tests {
277 use std::sync::Arc;
278
279 use iota_macros::sim_test;
280 use rand::{
281 Rng, SeedableRng,
282 rngs::{OsRng, StdRng},
283 };
284 use tokio::{
285 sync::{
286 mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
287 oneshot,
288 },
289 task::JoinHandle,
290 time::{Instant, MissedTickBehavior, interval},
291 };
292
293 use super::*;
294 use crate::authority::test_authority_builder::TestAuthorityBuilder;
295
296 #[test]
297 pub fn test_authority_overload_info() {
298 let overload_info = AuthorityOverloadInfo::default();
299 assert!(!overload_info.is_overload.load(Ordering::Relaxed));
300 assert_eq!(
301 overload_info
302 .load_shedding_percentage
303 .load(Ordering::Relaxed),
304 0
305 );
306
307 {
308 overload_info.set_overload(20);
309 assert!(overload_info.is_overload.load(Ordering::Relaxed));
310 assert_eq!(
311 overload_info
312 .load_shedding_percentage
313 .load(Ordering::Relaxed),
314 20
315 );
316 }
317
318 {
320 overload_info.set_overload(110);
321 assert!(overload_info.is_overload.load(Ordering::Relaxed));
322 assert_eq!(
323 overload_info
324 .load_shedding_percentage
325 .load(Ordering::Relaxed),
326 100
327 );
328 }
329
330 {
331 overload_info.clear_overload();
332 assert!(!overload_info.is_overload.load(Ordering::Relaxed));
333 assert_eq!(
334 overload_info
335 .load_shedding_percentage
336 .load(Ordering::Relaxed),
337 0
338 );
339 }
340 }
341
342 #[test]
343 pub fn test_calculate_load_shedding_ratio() {
344 assert_eq!(calculate_load_shedding_percentage(95.0, 100.1), 0);
345 assert_eq!(calculate_load_shedding_percentage(95.0, 100.0), 2);
346 assert_eq!(calculate_load_shedding_percentage(100.0, 100.0), 7);
347 assert_eq!(calculate_load_shedding_percentage(110.0, 100.0), 16);
348 assert_eq!(calculate_load_shedding_percentage(180.0, 100.0), 49);
349 assert_eq!(calculate_load_shedding_percentage(100.0, 0.0), 100);
350 assert_eq!(calculate_load_shedding_percentage(0.0, 1.0), 0);
351 }
352
353 #[test]
354 pub fn test_check_overload_signals() {
355 let config = AuthorityOverloadConfig {
356 execution_queue_latency_hard_limit: Duration::from_secs(10),
357 execution_queue_latency_soft_limit: Duration::from_secs(1),
358 max_load_shedding_percentage: 90,
359 ..Default::default()
360 };
361
362 assert_eq!(
365 check_overload_signals(&config, 0, Duration::from_millis(500), 1000.0, 10.0),
366 (false, 0)
367 );
368
369 assert_eq!(
372 check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 120.0),
373 (false, 0)
374 );
375
376 assert_eq!(
379 check_overload_signals(&config, 0, Duration::from_secs(2), 100.0, 100.0),
380 (true, 7)
381 );
382
383 assert_eq!(
386 check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 100.0),
387 (true, 50)
388 );
389
390 assert_eq!(
394 check_overload_signals(&config, 0, Duration::from_secs(11), 240.0, 100.0),
395 (true, 62)
396 );
397
398 assert_eq!(
401 check_overload_signals(&config, 0, Duration::from_secs(11), 20.0, 100.0),
402 (false, 0)
403 );
404
405 assert_eq!(
407 check_overload_signals(&config, 0, Duration::from_secs(11), 100.0, 0.0),
408 (true, 90)
409 );
410
411 assert_eq!(
415 check_overload_signals(&config, 50, Duration::from_secs(2), 116.0, 100.0),
416 (true, 60)
417 );
418
419 assert_eq!(
422 check_overload_signals(&config, 90, Duration::from_secs(2), 200.0, 300.0),
423 (true, 80)
424 );
425
426 assert_eq!(
428 check_overload_signals(&config, 50, Duration::from_secs(11), 100.0, 100.0),
429 (true, 75)
430 );
431 }
432
433 #[tokio::test(flavor = "current_thread")]
434 pub async fn test_check_authority_overload() {
435 telemetry_subscribers::init_for_testing();
436
437 let config = AuthorityOverloadConfig {
438 safe_transaction_ready_rate: 0,
439 ..Default::default()
440 };
441 let state = TestAuthorityBuilder::new()
442 .with_authority_overload_config(config.clone())
443 .build()
444 .await;
445
446 for _ in 0..1000 {
448 state
449 .metrics
450 .execution_queueing_latency
451 .report(Duration::from_secs(20));
452 }
453
454 let authority = Arc::downgrade(&state);
457 assert!(check_authority_overload(&authority, &config));
458 assert!(state.overload_info.is_overload.load(Ordering::Relaxed));
459 assert_eq!(
460 state
461 .overload_info
462 .load_shedding_percentage
463 .load(Ordering::Relaxed),
464 config.min_load_shedding_percentage_above_hard_limit
465 );
466
467 let authority = Arc::downgrade(&state);
470 drop(state);
471 assert!(!check_authority_overload(&authority, &config));
472 }
473
474 async fn start_overload_monitor() -> (Arc<AuthorityState>, JoinHandle<()>) {
477 let overload_config = AuthorityOverloadConfig::default();
478 let state = TestAuthorityBuilder::new()
479 .with_authority_overload_config(overload_config.clone())
480 .build()
481 .await;
482 let authority_state = Arc::downgrade(&state);
483 let monitor_handle = tokio::spawn(async move {
484 overload_monitor(authority_state, overload_config).await;
485 });
486 (state, monitor_handle)
487 }
488
489 fn start_load_generator(
493 steady_rate: f64,
494 tx: UnboundedSender<Instant>,
495 mut burst_rx: UnboundedReceiver<u32>,
496 authority: Arc<AuthorityState>,
497 enable_load_shedding: bool,
498 total_requests_arc: Arc<AtomicU32>,
499 dropped_requests_arc: Arc<AtomicU32>,
500 ) -> JoinHandle<()> {
501 tokio::spawn(async move {
502 let mut interval = interval(Duration::from_secs_f64(1.0 / steady_rate));
503 let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
504 let mut total_requests: u32 = 0;
505 let mut total_dropped_requests: u32 = 0;
506
507 let mut do_send =
509 |enable_load_shedding: bool, authority: Arc<AuthorityState>| -> bool {
510 if enable_load_shedding {
511 let shedding_percentage = authority
512 .overload_info
513 .load_shedding_percentage
514 .load(Ordering::Relaxed);
515 !(shedding_percentage > 0 && rng.gen_range(0..100) < shedding_percentage)
516 } else {
517 true
518 }
519 };
520
521 loop {
522 tokio::select! {
523 now = interval.tick() => {
524 total_requests += 1;
525 if do_send(enable_load_shedding, authority.clone()) {
526 if tx.send(now).is_err() {
527 info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
528 total_requests_arc.store(total_requests, Ordering::SeqCst);
529 dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
530 return;
531 }
532 authority.metrics.txn_ready_rate_tracker.lock().record();
533 } else {
534 total_dropped_requests += 1;
535 }
536 }
537 Some(burst) = burst_rx.recv() => {
538 let now = Instant::now();
539 total_requests += burst;
540 for _ in 0..burst {
541 if do_send(enable_load_shedding, authority.clone()) {
542 if tx.send(now).is_err() {
543 info!("Load generator stopping. Total requests {:?}, total dropped requests {:?}.", total_requests, total_dropped_requests);
544 total_requests_arc.store(total_requests, Ordering::SeqCst);
545 dropped_requests_arc.store(total_dropped_requests, Ordering::SeqCst);
546 return;
547 }
548 authority.metrics.txn_ready_rate_tracker.lock().record();
549 } else {
550 total_dropped_requests += 1;
551 }
552 }
553 }
554 }
555 }
556 })
557 }
558
559 fn start_executor(
562 execution_rate: f64,
563 mut rx: UnboundedReceiver<Instant>,
564 mut stop_rx: oneshot::Receiver<()>,
565 authority: Arc<AuthorityState>,
566 ) -> JoinHandle<()> {
567 tokio::spawn(async move {
568 let mut interval = interval(Duration::from_secs_f64(1.0 / execution_rate));
569 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
570 loop {
571 tokio::select! {
572 Some(start_time) = rx.recv() => {
573 authority.metrics.execution_rate_tracker.lock().record();
574 authority.metrics.execution_queueing_latency.report(start_time.elapsed());
575 interval.tick().await;
576 }
577 _ = &mut stop_rx => {
578 info!("Executor stopping");
579 return;
580 }
581 }
582 }
583 })
584 }
585
586 async fn sleep_and_print_stats(state: Arc<AuthorityState>, seconds: u32) {
588 for _ in 0..seconds {
589 info!(
590 "Overload: {:?}. Shedding percentage: {:?}. Queue: {:?}, Ready rate: {:?}. Exec rate: {:?}.",
591 state.overload_info.is_overload.load(Ordering::Relaxed),
592 state
593 .overload_info
594 .load_shedding_percentage
595 .load(Ordering::Relaxed),
596 state.metrics.execution_queueing_latency.latency(),
597 state.metrics.txn_ready_rate_tracker.lock().rate(),
598 state.metrics.execution_rate_tracker.lock().rate(),
599 );
600 sleep(Duration::from_secs(1)).await;
601 }
602 }
603
604 async fn run_consistent_workload_test(
608 generator_rate: f64,
609 executor_rate: f64,
610 min_dropping_rate: f64,
611 max_dropping_rate: f64,
612 ) {
613 let (state, monitor_handle) = start_overload_monitor().await;
614
615 let (tx, rx) = unbounded_channel();
616 let (_burst_tx, burst_rx) = unbounded_channel();
617 let total_requests = Arc::new(AtomicU32::new(0));
618 let dropped_requests = Arc::new(AtomicU32::new(0));
619 let load_generator = start_load_generator(
620 generator_rate,
621 tx.clone(),
622 burst_rx,
623 state.clone(),
624 true,
625 total_requests.clone(),
626 dropped_requests.clone(),
627 );
628
629 let (stop_tx, stop_rx) = oneshot::channel();
630 let executor = start_executor(executor_rate, rx, stop_rx, state.clone());
631
632 sleep_and_print_stats(state.clone(), 300).await;
633
634 stop_tx.send(()).unwrap();
635 let _ = tokio::join!(load_generator, executor);
636
637 let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
638 / total_requests.load(Ordering::SeqCst) as f64;
639 assert!(min_dropping_rate <= dropped_ratio);
640 assert!(dropped_ratio <= max_dropping_rate);
641
642 monitor_handle.abort();
643 let _ = monitor_handle.await;
644 }
645
646 #[tokio::test(flavor = "current_thread", start_paused = true)]
649 pub async fn test_workload_consistent_no_overload() {
650 telemetry_subscribers::init_for_testing();
651 run_consistent_workload_test(900.0, 1000.0, 0.0, 0.0).await;
652 }
653
654 #[tokio::test(flavor = "current_thread", start_paused = true)]
657 pub async fn test_workload_consistent_slightly_overload() {
658 telemetry_subscribers::init_for_testing();
659 run_consistent_workload_test(1100.0, 1000.0, 0.05, 0.25).await;
661 }
662
663 #[tokio::test(flavor = "current_thread", start_paused = true)]
666 pub async fn test_workload_consistent_overload() {
667 telemetry_subscribers::init_for_testing();
668 run_consistent_workload_test(3000.0, 1000.0, 0.6, 0.8).await;
670 }
671
672 #[tokio::test(flavor = "current_thread", start_paused = true)]
675 pub async fn test_workload_single_spike() {
676 telemetry_subscribers::init_for_testing();
677 let (state, monitor_handle) = start_overload_monitor().await;
678
679 let (tx, rx) = unbounded_channel();
680 let (burst_tx, burst_rx) = unbounded_channel();
681 let total_requests = Arc::new(AtomicU32::new(0));
682 let dropped_requests = Arc::new(AtomicU32::new(0));
683 let load_generator = start_load_generator(
684 10.0,
685 tx.clone(),
686 burst_rx,
687 state.clone(),
688 true,
689 total_requests.clone(),
690 dropped_requests.clone(),
691 );
692
693 let (stop_tx, stop_rx) = oneshot::channel();
694 let executor = start_executor(1000.0, rx, stop_rx, state.clone());
695
696 sleep_and_print_stats(state.clone(), 10).await;
697 burst_tx.send(5000).unwrap();
699 sleep_and_print_stats(state.clone(), 20).await;
700
701 stop_tx.send(()).unwrap();
702 let _ = tokio::join!(load_generator, executor);
703
704 assert_eq!(dropped_requests.load(Ordering::SeqCst), 0);
706
707 monitor_handle.abort();
708 let _ = monitor_handle.await;
709 }
710
711 #[tokio::test(flavor = "current_thread", start_paused = true)]
714 pub async fn test_workload_consistent_short_spike() {
715 telemetry_subscribers::init_for_testing();
716 let (state, monitor_handle) = start_overload_monitor().await;
717
718 let (tx, rx) = unbounded_channel();
719 let (burst_tx, burst_rx) = unbounded_channel();
720 let total_requests = Arc::new(AtomicU32::new(0));
721 let dropped_requests = Arc::new(AtomicU32::new(0));
722 let load_generator = start_load_generator(
723 10.0,
724 tx.clone(),
725 burst_rx,
726 state.clone(),
727 true,
728 total_requests.clone(),
729 dropped_requests.clone(),
730 );
731
732 let (stop_tx, stop_rx) = oneshot::channel();
733 let executor = start_executor(1000.0, rx, stop_rx, state.clone());
734
735 sleep_and_print_stats(state.clone(), 15).await;
736 for _ in 0..16 {
737 burst_tx.send(10000).unwrap();
739 sleep_and_print_stats(state.clone(), 5).await;
740 }
741
742 stop_tx.send(()).unwrap();
743 let _ = tokio::join!(load_generator, executor);
744 let dropped_ratio = dropped_requests.load(Ordering::SeqCst) as f64
745 / total_requests.load(Ordering::SeqCst) as f64;
746
747 assert!(0.4 < dropped_ratio);
750 assert!(dropped_ratio < 0.6);
751
752 monitor_handle.abort();
753 let _ = monitor_handle.await;
754 }
755
756 #[test]
759 fn test_txn_rejection_rate() {
760 for rejection_percentage in 0..=100 {
761 let mut reject_count = 0;
762 for _ in 0..10000 {
763 let digest = TransactionDigest::random();
764 if should_reject_tx(rejection_percentage, digest, 28455473) {
765 reject_count += 1;
766 }
767 }
768
769 debug!(
770 "Rejection percentage: {:?}, reject count: {:?}.",
771 rejection_percentage, reject_count
772 );
773 assert!(rejection_percentage as f32 / 100.0 - 0.03 < reject_count as f32 / 10000.0);
775 assert!(reject_count as f32 / 10000.0 < rejection_percentage as f32 / 100.0 + 0.03);
776 }
777 }
778
779 #[sim_test]
782 async fn test_txn_rejection_over_time() {
783 let start_time = Instant::now();
784 let mut digest = TransactionDigest::random();
785 let mut temporal_seed = 1708108277 / SEED_UPDATE_DURATION_SECS;
786 let load_shedding_percentage = 50;
787
788 while !should_reject_tx(load_shedding_percentage, digest, temporal_seed)
790 && start_time.elapsed() < Duration::from_secs(30)
791 {
792 digest = TransactionDigest::random();
793 }
794
795 for _ in 0..100 {
797 assert!(should_reject_tx(
798 load_shedding_percentage,
799 digest,
800 temporal_seed
801 ));
802 }
803
804 temporal_seed += 1;
806 while should_reject_tx(load_shedding_percentage, digest, temporal_seed)
807 && start_time.elapsed() < Duration::from_secs(30)
808 {
809 temporal_seed += 1;
810 }
811
812 assert!(start_time.elapsed() < Duration::from_secs(30));
814 }
815}