1use std::{
6 collections::VecDeque,
7 default::Default,
8 sync::atomic::{AtomicU64, Ordering},
9};
10
11use parking_lot::Mutex;
12use tokio::time::{Duration, Instant};
13
14pub struct LatencyObserver {
15 data: Mutex<LatencyObserverInner>,
16 latency_ms: AtomicU64,
17}
18
19#[derive(Default)]
20struct LatencyObserverInner {
21 points: VecDeque<Duration>,
22 sum: Duration,
23}
24
25impl LatencyObserver {
26 pub fn new() -> Self {
27 Self {
28 data: Mutex::new(LatencyObserverInner::default()),
29 latency_ms: AtomicU64::new(u64::MAX),
30 }
31 }
32
33 pub fn report(&self, latency: Duration) {
34 const EXPECTED_SAMPLES: usize = 128;
35 let mut data = self.data.lock();
36 data.points.push_back(latency);
37 data.sum += latency;
38 if data.points.len() < EXPECTED_SAMPLES {
39 return;
41 }
42 while data.points.len() > EXPECTED_SAMPLES {
43 let pop = data.points.pop_front().expect("data vector is not empty");
44 data.sum -= pop; }
47 let latency = data.sum.as_millis() as u64 / data.points.len() as u64;
48 self.latency_ms.store(latency, Ordering::Relaxed);
49 }
50
51 pub fn latency(&self) -> Option<Duration> {
52 let latency = self.latency_ms.load(Ordering::Relaxed);
53 if latency == u64::MAX {
54 None
56 } else {
57 Some(Duration::from_millis(latency))
58 }
59 }
60}
61
62impl Default for LatencyObserver {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68pub struct RateTracker {
73 event_buffer: Vec<u64>,
76 window_duration: Duration,
77 total_bins: usize,
78
79 start_time: Instant,
84
85 global_bin_index: u64,
87}
88
89const BIN_DURATION: Duration = Duration::from_millis(100);
90
91impl RateTracker {
92 pub fn new(window_duration: Duration) -> Self {
95 assert!(window_duration > BIN_DURATION);
96 let total_bins = (window_duration.as_millis() / BIN_DURATION.as_millis()) as usize;
97 RateTracker {
98 event_buffer: vec![0; total_bins],
99 window_duration,
100 total_bins,
101 start_time: Instant::now(),
102 global_bin_index: 0,
103 }
104 }
105
106 pub fn record_at_time(&mut self, now: Instant) {
108 self.update_window(now);
109 let current_bin_index = self.get_bin_index(now) as usize;
110 if current_bin_index + self.total_bins <= self.global_bin_index as usize {
111 return;
113 }
114
115 self.event_buffer[current_bin_index % self.total_bins] += 1;
116 }
117
118 pub fn record(&mut self) {
120 self.record_at_time(Instant::now());
121 }
122
123 pub fn rate(&mut self) -> f64 {
125 let now = Instant::now();
126 self.update_window(now);
127 self.event_buffer.iter().sum::<u64>() as f64 / self.window_duration.as_secs_f64()
128 }
129
130 fn get_bin_index(&self, now: Instant) -> u64 {
132 (now.duration_since(self.start_time).as_millis() / BIN_DURATION.as_millis()) as u64
133 }
134
135 fn update_window(&mut self, now: Instant) {
139 let current_bin_index = self.get_bin_index(now);
140 if self.global_bin_index >= current_bin_index {
141 return;
143 }
144
145 for bin_index in (self.global_bin_index + 1)..=current_bin_index {
146 let index_in_buffer = bin_index as usize % self.total_bins;
149 self.event_buffer[index_in_buffer] = 0;
150 }
151 self.global_bin_index = current_bin_index;
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use rand::{Rng, SeedableRng, rngs::StdRng};
158 use tokio::time::advance;
159
160 use super::*;
161
162 #[tokio::test(flavor = "current_thread", start_paused = true)]
163 pub async fn test_rate_tracker_basic() {
164 let mut tracker = RateTracker::new(Duration::from_secs(1));
166 assert_eq!(tracker.rate(), 0.0);
167 tracker.record();
168 tracker.record();
169 tracker.record();
170 assert_eq!(tracker.rate(), 3.0);
171
172 advance(Duration::from_millis(200)).await;
173 tracker.record();
174 tracker.record();
175 tracker.record();
176 assert_eq!(tracker.rate(), 6.0);
177
178 advance(Duration::from_millis(800)).await;
179 assert_eq!(tracker.rate(), 3.0);
180
181 advance(Duration::from_millis(200)).await;
182 assert_eq!(tracker.rate(), 0.0);
183 }
184
185 #[tokio::test(flavor = "current_thread", start_paused = true)]
187 pub async fn test_rate_tracker_window() {
188 let seed = [0; 32];
189 let mut rng = StdRng::from_seed(seed);
190 let random_windows: Vec<u64> = (0..10).map(|_| rng.gen_range(1..=60)).collect();
191 for window in random_windows {
192 let mut tracker = RateTracker::new(Duration::from_secs(window));
193 for _ in 0..23 {
194 tracker.record();
195 }
196 assert_eq!(tracker.rate(), 23.0 / window as f64);
197 advance(Duration::from_secs(window)).await;
198 assert_eq!(tracker.rate(), 0.0);
199 }
200 }
201
202 #[tokio::test(flavor = "current_thread", start_paused = true)]
204 pub async fn test_rate_tracker_rolling_window() {
205 let mut tracker = RateTracker::new(Duration::from_secs(1));
206 for i in 0..10 {
208 tracker.record();
209 assert_eq!(tracker.rate(), (i + 1) as f64);
210 advance(Duration::from_millis(100)).await;
211 }
212
213 for i in 0..10 {
215 tracker.record();
216 advance(Duration::from_millis(50)).await;
217 tracker.record();
218 assert_eq!(tracker.rate(), 11.0 + i as f64);
219 advance(Duration::from_millis(50)).await;
220 }
221
222 for i in 0..10 {
224 assert_eq!(tracker.rate(), 20.0 - (i as f64 + 1.0) * 2.0);
225 advance(Duration::from_millis(100)).await;
226 }
227 assert_eq!(tracker.rate(), 0.0);
228 }
229
230 #[tokio::test(flavor = "current_thread", start_paused = true)]
233 pub async fn test_rate_tracker_outside_of_window() {
234 let mut tracker = RateTracker::new(Duration::from_secs(1));
235 advance(Duration::from_secs(60)).await;
236 tracker.record();
237 tracker.record();
238 tracker.record();
239 assert_eq!(tracker.rate(), 3.0);
240 tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
241 tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
242 tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
243 assert_eq!(tracker.rate(), 3.0);
244 }
245}