iota_core/
metrics.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::VecDeque,
7    default::Default,
8    sync::atomic::{AtomicU64, Ordering},
9};
10
11use parking_lot::Mutex;
12use tokio::time::{Duration, Instant};
13
14pub struct LatencyObserver {
15    data: Mutex<LatencyObserverInner>,
16    latency_ms: AtomicU64,
17}
18
19#[derive(Default)]
20struct LatencyObserverInner {
21    points: VecDeque<Duration>,
22    sum: Duration,
23}
24
25impl LatencyObserver {
26    pub fn new() -> Self {
27        Self {
28            data: Mutex::new(LatencyObserverInner::default()),
29            latency_ms: AtomicU64::new(u64::MAX),
30        }
31    }
32
33    pub fn report(&self, latency: Duration) {
34        const EXPECTED_SAMPLES: usize = 128;
35        let mut data = self.data.lock();
36        data.points.push_back(latency);
37        data.sum += latency;
38        if data.points.len() < EXPECTED_SAMPLES {
39            // Do not initialize average latency until there are enough samples.
40            return;
41        }
42        while data.points.len() > EXPECTED_SAMPLES {
43            let pop = data.points.pop_front().expect("data vector is not empty");
44            data.sum -= pop; // This does not underflow because of how running
45            // sum is calculated
46        }
47        let latency = data.sum.as_millis() as u64 / data.points.len() as u64;
48        self.latency_ms.store(latency, Ordering::Relaxed);
49    }
50
51    pub fn latency(&self) -> Option<Duration> {
52        let latency = self.latency_ms.load(Ordering::Relaxed);
53        if latency == u64::MAX {
54            // Not initialized yet (not enough data points)
55            None
56        } else {
57            Some(Duration::from_millis(latency))
58        }
59    }
60}
61
62impl Default for LatencyObserver {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68/// RateTracker tracks events in a rolling window, and calculates the rate of
69/// events. Internally, the tracker divides the tracking window into multiple
70/// BIN_DURATION, and counts events in each BIN_DURATION in a fixed sized
71/// buffer.
72pub struct RateTracker {
73    // Counts the number of events by bins. Each bin is BIN_DURATION long within window_duration.
74    // The size of the buffer = window_duration / BIN_DURATION.
75    event_buffer: Vec<u64>,
76    window_duration: Duration,
77    total_bins: usize,
78
79    // We use the event time and the tracker start time to calculate the bin that a event
80    // belongs to.
81    // event_global_bin_index = (event_time - start_time) / BIN_DURATION.
82    // event_index_in_buffer = event_global_bin_index % buffer_size.
83    start_time: Instant,
84
85    // Last updated global bin index. This tracks the end of the rolling window.
86    global_bin_index: u64,
87}
88
89const BIN_DURATION: Duration = Duration::from_millis(100);
90
91impl RateTracker {
92    /// Create a new RateTracker to track event rate (events/seconds) in
93    /// `window_duration`.
94    pub fn new(window_duration: Duration) -> Self {
95        assert!(window_duration > BIN_DURATION);
96        let total_bins = (window_duration.as_millis() / BIN_DURATION.as_millis()) as usize;
97        RateTracker {
98            event_buffer: vec![0; total_bins],
99            window_duration,
100            total_bins,
101            start_time: Instant::now(),
102            global_bin_index: 0,
103        }
104    }
105
106    /// Records an event at time `now`.
107    pub fn record_at_time(&mut self, now: Instant) {
108        self.update_window(now);
109        let current_bin_index = self.get_bin_index(now) as usize;
110        if current_bin_index + self.total_bins <= self.global_bin_index as usize {
111            // The bin associated with `now` has passed the rolling window.
112            return;
113        }
114
115        self.event_buffer[current_bin_index % self.total_bins] += 1;
116    }
117
118    /// Records an event at current time.
119    pub fn record(&mut self) {
120        self.record_at_time(Instant::now());
121    }
122
123    /// Returns the rate of events.
124    pub fn rate(&mut self) -> f64 {
125        let now = Instant::now();
126        self.update_window(now);
127        self.event_buffer.iter().sum::<u64>() as f64 / self.window_duration.as_secs_f64()
128    }
129
130    // Given a time `now`, returns the bin index since `start_time`.
131    fn get_bin_index(&self, now: Instant) -> u64 {
132        (now.duration_since(self.start_time).as_millis() / BIN_DURATION.as_millis()) as u64
133    }
134
135    // Updates the rolling window to accommodate the time of interests, `now`. That
136    // is, remove any event counts happened prior to (`now` -
137    // `window_duration`).
138    fn update_window(&mut self, now: Instant) {
139        let current_bin_index = self.get_bin_index(now);
140        if self.global_bin_index >= current_bin_index {
141            // The rolling doesn't move.
142            return;
143        }
144
145        for bin_index in (self.global_bin_index + 1)..=current_bin_index {
146            // Time has elapsed from global_bin_index to current_bin_index. Clear all the
147            // buffer counter associated with them.
148            let index_in_buffer = bin_index as usize % self.total_bins;
149            self.event_buffer[index_in_buffer] = 0;
150        }
151        self.global_bin_index = current_bin_index;
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use rand::{Rng, SeedableRng, rngs::StdRng};
158    use tokio::time::advance;
159
160    use super::*;
161
162    #[tokio::test(flavor = "current_thread", start_paused = true)]
163    pub async fn test_rate_tracker_basic() {
164        // 1 sec rolling window.
165        let mut tracker = RateTracker::new(Duration::from_secs(1));
166        assert_eq!(tracker.rate(), 0.0);
167        tracker.record();
168        tracker.record();
169        tracker.record();
170        assert_eq!(tracker.rate(), 3.0);
171
172        advance(Duration::from_millis(200)).await;
173        tracker.record();
174        tracker.record();
175        tracker.record();
176        assert_eq!(tracker.rate(), 6.0);
177
178        advance(Duration::from_millis(800)).await;
179        assert_eq!(tracker.rate(), 3.0);
180
181        advance(Duration::from_millis(200)).await;
182        assert_eq!(tracker.rate(), 0.0);
183    }
184
185    // Tests rate calculation using different window duration.
186    #[tokio::test(flavor = "current_thread", start_paused = true)]
187    pub async fn test_rate_tracker_window() {
188        let seed = [0; 32];
189        let mut rng = StdRng::from_seed(seed);
190        let random_windows: Vec<u64> = (0..10).map(|_| rng.gen_range(1..=60)).collect();
191        for window in random_windows {
192            let mut tracker = RateTracker::new(Duration::from_secs(window));
193            for _ in 0..23 {
194                tracker.record();
195            }
196            assert_eq!(tracker.rate(), 23.0 / window as f64);
197            advance(Duration::from_secs(window)).await;
198            assert_eq!(tracker.rate(), 0.0);
199        }
200    }
201
202    // Tests rate calculation when window moves continuously.
203    #[tokio::test(flavor = "current_thread", start_paused = true)]
204    pub async fn test_rate_tracker_rolling_window() {
205        let mut tracker = RateTracker::new(Duration::from_secs(1));
206        // Generate event every 100ms.
207        for i in 0..10 {
208            tracker.record();
209            assert_eq!(tracker.rate(), (i + 1) as f64);
210            advance(Duration::from_millis(100)).await;
211        }
212
213        // Generate event every 50ms.
214        for i in 0..10 {
215            tracker.record();
216            advance(Duration::from_millis(50)).await;
217            tracker.record();
218            assert_eq!(tracker.rate(), 11.0 + i as f64);
219            advance(Duration::from_millis(50)).await;
220        }
221
222        // Rate gradually returns to 0.
223        for i in 0..10 {
224            assert_eq!(tracker.rate(), 20.0 - (i as f64 + 1.0) * 2.0);
225            advance(Duration::from_millis(100)).await;
226        }
227        assert_eq!(tracker.rate(), 0.0);
228    }
229
230    // Tests that events happened prior to tracking window shouldn't affect the
231    // rate.
232    #[tokio::test(flavor = "current_thread", start_paused = true)]
233    pub async fn test_rate_tracker_outside_of_window() {
234        let mut tracker = RateTracker::new(Duration::from_secs(1));
235        advance(Duration::from_secs(60)).await;
236        tracker.record();
237        tracker.record();
238        tracker.record();
239        assert_eq!(tracker.rate(), 3.0);
240        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
241        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
242        tracker.record_at_time(Instant::now() - Duration::from_millis(1100));
243        assert_eq!(tracker.rate(), 3.0);
244    }
245}