iota_types/
traffic_control.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::path::PathBuf;
6
7use serde::{Deserialize, Serialize};
8use serde_with::serde_as;
9
10// These values set to loosely attempt to limit
11// memory usage for a single sketch to ~20MB
12// For reference, see
13// https://github.com/jedisct1/rust-count-min-sketch/blob/master/src/lib.rs
14pub const DEFAULT_SKETCH_CAPACITY: usize = 50_000;
15pub const DEFAULT_SKETCH_PROBABILITY: f64 = 0.999;
16pub const DEFAULT_SKETCH_TOLERANCE: f64 = 0.2;
17use rand::distributions::Distribution;
18
19const TRAFFIC_SINK_TIMEOUT_SEC: u64 = 300;
20
21/// The source that should be used to identify the client's
22/// IP address. To be used to configure cases where a node has
23/// infra running in front of the node that is separate from the
24/// protocol, such as a load balancer. Note that this is not the
25/// same as the client type (e.g a direct client vs a proxy client,
26/// as in the case of a fullnode driving requests from many clients).
27///
28/// For x-forwarded-for, the usize parameter is the number of forwarding
29/// hops between the client and the node for requests going your infra
30/// or infra provider. Example:
31///
32/// ```ignore
33///     (client) -> { (global proxy) -> (regional proxy) -> (node) }
34/// ```
35///
36/// where
37///
38/// ```ignore
39///     { <server>, ... }
40/// ```
41///
42/// are controlled by the Node operator / their cloud provider.
43/// In this case, we set:
44///
45/// ```ignore
46/// policy-config:
47///    client-id-source:
48///      x-forwarded-for: 2
49///    ...
50/// ```
51///
52/// NOTE: x-forwarded-for: 0 is a special case value that can be used by Node
53/// operators to discover the number of hops that should be configured. To use:
54///
55/// 1. Set `x-forwarded-for: 0` for the `client-id-source` in the config.
56/// 2. Run the node and query any endpoint (AuthorityServer for validator, or
57///    json rpc for rpc node) from a known IP address.
58/// 3. Search for lines containing `x-forwarded-for` in the logs. The log lines
59///    should contain the contents of the `x-forwarded-for` header, if present,
60///    or a corresponding error if not.
61/// 4. The value for number of hops is derived from any such log line that
62///    contains your known IP address, and is defined as 1 + the number of IP
63///    addresses in the `x-forwarded-for` that occur **after** the known client
64///    IP address. Example:
65///
66/// ```ignore
67///     [<known client IP>] <--- number of hops is 1
68///     ["1.2.3.4", <known client IP>, "5.6.7.8", "9.10.11.12"] <--- number of hops is 3
69/// ```
70#[derive(Clone, Debug, Deserialize, Serialize, Default)]
71#[serde(rename_all = "kebab-case")]
72pub enum ClientIdSource {
73    #[default]
74    SocketAddr,
75    XForwardedFor(usize),
76}
77
78#[derive(Clone, Debug, Deserialize, Serialize)]
79pub struct Weight(f32);
80
81impl Weight {
82    pub fn new(value: f32) -> Result<Self, &'static str> {
83        if (0.0..=1.0).contains(&value) {
84            Ok(Self(value))
85        } else {
86            Err("Weight must be between 0.0 and 1.0")
87        }
88    }
89
90    pub fn one() -> Self {
91        Self(1.0)
92    }
93
94    pub fn zero() -> Self {
95        Self(0.0)
96    }
97
98    pub fn value(&self) -> f32 {
99        self.0
100    }
101
102    pub fn is_sampled(&self) -> bool {
103        let mut rng = rand::thread_rng();
104        let sample = rand::distributions::Uniform::new(0.0, 1.0).sample(&mut rng);
105        sample <= self.value()
106    }
107}
108
109impl PartialEq for Weight {
110    fn eq(&self, other: &Self) -> bool {
111        self.value() == other.value()
112    }
113}
114
115#[serde_as]
116#[derive(Clone, Debug, Deserialize, Serialize)]
117#[serde(rename_all = "kebab-case")]
118pub struct RemoteFirewallConfig {
119    pub remote_fw_url: String,
120    pub destination_port: u16,
121    #[serde(default)]
122    pub delegate_spam_blocking: bool,
123    #[serde(default)]
124    pub delegate_error_blocking: bool,
125    #[serde(default = "default_drain_path")]
126    pub drain_path: PathBuf,
127    /// Time in secs, after which no registered ingress traffic
128    /// will trigger dead mans switch to drain any firewalls
129    #[serde(default = "default_drain_timeout")]
130    pub drain_timeout_secs: u64,
131}
132
133fn default_drain_path() -> PathBuf {
134    PathBuf::from("/tmp/drain")
135}
136
137fn default_drain_timeout() -> u64 {
138    TRAFFIC_SINK_TIMEOUT_SEC
139}
140
141#[serde_as]
142#[derive(Clone, Debug, Deserialize, Serialize)]
143#[serde(rename_all = "kebab-case")]
144pub struct FreqThresholdConfig {
145    #[serde(default = "default_client_threshold")]
146    pub client_threshold: u64,
147    #[serde(default = "default_proxied_client_threshold")]
148    pub proxied_client_threshold: u64,
149    #[serde(default = "default_window_size_secs")]
150    pub window_size_secs: u64,
151    #[serde(default = "default_update_interval_secs")]
152    pub update_interval_secs: u64,
153    #[serde(default = "default_sketch_capacity")]
154    pub sketch_capacity: usize,
155    #[serde(default = "default_sketch_probability")]
156    pub sketch_probability: f64,
157    #[serde(default = "default_sketch_tolerance")]
158    pub sketch_tolerance: f64,
159}
160
161impl Default for FreqThresholdConfig {
162    fn default() -> Self {
163        Self {
164            client_threshold: default_client_threshold(),
165            proxied_client_threshold: default_proxied_client_threshold(),
166            window_size_secs: default_window_size_secs(),
167            update_interval_secs: default_update_interval_secs(),
168            sketch_capacity: default_sketch_capacity(),
169            sketch_probability: default_sketch_probability(),
170            sketch_tolerance: default_sketch_tolerance(),
171        }
172    }
173}
174
175fn default_client_threshold() -> u64 {
176    // by default only block client with unreasonably
177    // high qps, as a client could be a single fullnode proxying
178    // the majority of traffic from many behaving clients in normal
179    // operations. If used as a spam policy, all requests would
180    // count against this threshold within the window time. In
181    // practice this should always be set
182    1_000_000
183}
184
185fn default_proxied_client_threshold() -> u64 {
186    10
187}
188
189fn default_window_size_secs() -> u64 {
190    30
191}
192
193fn default_update_interval_secs() -> u64 {
194    5
195}
196
197fn default_sketch_capacity() -> usize {
198    DEFAULT_SKETCH_CAPACITY
199}
200
201fn default_sketch_probability() -> f64 {
202    DEFAULT_SKETCH_PROBABILITY
203}
204
205fn default_sketch_tolerance() -> f64 {
206    DEFAULT_SKETCH_TOLERANCE
207}
208
209// Serializable representation of policy types, used in config
210// in order to easily change in tests or to killswitch
211#[derive(Clone, Serialize, Deserialize, Debug, Default)]
212pub enum PolicyType {
213    /// Does nothing
214    #[default]
215    NoOp,
216
217    /// Blocks connection_ip after reaching a tally frequency (tallies per
218    /// second) of `threshold`, as calculated over an average window of
219    /// `window_size_secs` with granularity of `update_interval_secs`
220    FreqThreshold(FreqThresholdConfig),
221
222    // Below this point are test policies, and thus should not be used in production
223    /// Simple policy that adds connection_ip to blocklist when the same
224    /// connection_ip is encountered in tally N times. If used in an error
225    /// policy, this would trigger after N errors
226    TestNConnIP(u64),
227    /// Test policy that panics when invoked. To be used as an error policy in
228    /// tests that do not expect request errors in order to verify that the
229    /// error policy is not invoked
230    TestPanicOnInvocation,
231}
232
233#[serde_as]
234#[derive(Clone, Debug, Deserialize, Serialize)]
235#[serde(rename_all = "kebab-case")]
236pub struct PolicyConfig {
237    #[serde(default = "default_client_id_source")]
238    pub client_id_source: ClientIdSource,
239    #[serde(default = "default_connection_blocklist_ttl_sec")]
240    pub connection_blocklist_ttl_sec: u64,
241    #[serde(default)]
242    pub proxy_blocklist_ttl_sec: u64,
243    #[serde(default)]
244    pub spam_policy_type: PolicyType,
245    #[serde(default)]
246    pub error_policy_type: PolicyType,
247    #[serde(default = "default_channel_capacity")]
248    pub channel_capacity: usize,
249    #[serde(default = "default_spam_sample_rate")]
250    /// Note that this sample policy is applied on top of the
251    /// endpoint-specific sample policy (not configurable) which
252    /// weighs endpoints by the relative effort required to serve
253    /// them. Therefore a sample rate of N will yield an actual
254    /// sample rate <= N.
255    pub spam_sample_rate: Weight,
256    #[serde(default = "default_dry_run")]
257    pub dry_run: bool,
258}
259
260impl Default for PolicyConfig {
261    fn default() -> Self {
262        Self {
263            client_id_source: default_client_id_source(),
264            connection_blocklist_ttl_sec: 0,
265            proxy_blocklist_ttl_sec: 0,
266            spam_policy_type: PolicyType::NoOp,
267            error_policy_type: PolicyType::NoOp,
268            channel_capacity: 100,
269            spam_sample_rate: default_spam_sample_rate(),
270            dry_run: default_dry_run(),
271        }
272    }
273}
274
275pub fn default_client_id_source() -> ClientIdSource {
276    ClientIdSource::SocketAddr
277}
278
279pub fn default_connection_blocklist_ttl_sec() -> u64 {
280    60
281}
282pub fn default_channel_capacity() -> usize {
283    100
284}
285
286pub fn default_dry_run() -> bool {
287    true
288}
289
290pub fn default_spam_sample_rate() -> Weight {
291    Weight::new(0.2).unwrap()
292}