iota_aws_orchestrator/
faults.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    fmt::{Debug, Display},
7    time::Duration,
8};
9
10use serde::{Deserialize, Serialize};
11
12use crate::client::Instance;
13
14#[derive(Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
15pub enum FaultsType {
16    /// Permanently crash the maximum number of nodes from the beginning.
17    Permanent { faults: usize },
18    /// Progressively crash and recover nodes.
19    CrashRecovery {
20        max_faults: usize,
21        interval: Duration,
22    },
23}
24
25impl Default for FaultsType {
26    fn default() -> Self {
27        Self::Permanent { faults: 0 }
28    }
29}
30
31impl Debug for FaultsType {
32    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33        match self {
34            Self::Permanent { faults } => write!(f, "{faults}"),
35            Self::CrashRecovery {
36                max_faults,
37                interval,
38            } => write!(f, "{max_faults}-{}cr", interval.as_secs()),
39        }
40    }
41}
42
43impl Display for FaultsType {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::Permanent { faults } => {
47                if *faults == 0 {
48                    write!(f, "no faults")
49                } else {
50                    write!(f, "{faults} crashed")
51                }
52            }
53            Self::CrashRecovery {
54                max_faults,
55                interval,
56            } => write!(f, "{max_faults} crash-recovery, {}s", interval.as_secs()),
57        }
58    }
59}
60
61/// The actions to apply to the testbed, i.e., which instances to crash and
62/// recover.
63#[derive(Default)]
64pub struct CrashRecoveryAction {
65    /// The instances to boot.
66    pub boot: Vec<Instance>,
67    /// The instances to kill.
68    pub kill: Vec<Instance>,
69}
70
71impl Display for CrashRecoveryAction {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        let booted = self.boot.len();
74        let killed = self.kill.len();
75
76        if self.boot.is_empty() {
77            write!(f, "{killed} node(s) killed")
78        } else if self.kill.is_empty() {
79            write!(f, "{booted} node(s) recovered")
80        } else {
81            write!(f, "{killed} node(s) killed and {booted} node(s) recovered")
82        }
83    }
84}
85
86impl CrashRecoveryAction {
87    pub fn boot(instances: Vec<Instance>) -> Self {
88        Self {
89            boot: instances,
90            kill: Vec::new(),
91        }
92    }
93
94    pub fn kill(instances: Vec<Instance>) -> Self {
95        Self {
96            boot: Vec::new(),
97            kill: instances,
98        }
99    }
100
101    pub fn no_op() -> Self {
102        Self::default()
103    }
104}
105
106pub struct CrashRecoverySchedule {
107    /// The number of faulty nodes and the crash-recovery pattern to follow.
108    faults_type: FaultsType,
109    /// The available instances.
110    instances: Vec<Instance>,
111    /// The current number of dead nodes.
112    dead: usize,
113}
114
115impl CrashRecoverySchedule {
116    pub fn new(faults_type: FaultsType, instances: Vec<Instance>) -> Self {
117        Self {
118            faults_type,
119            instances,
120            dead: 0,
121        }
122    }
123    pub fn update(&mut self) -> CrashRecoveryAction {
124        match &self.faults_type {
125            // Permanently crash the specified number of nodes.
126            FaultsType::Permanent { faults } => {
127                if self.dead == 0 {
128                    self.dead = *faults;
129                    CrashRecoveryAction::kill(self.instances.clone().drain(0..*faults).collect())
130                } else {
131                    CrashRecoveryAction::no_op()
132                }
133            }
134
135            // Periodically crash and recover nodes.
136            FaultsType::CrashRecovery { max_faults, .. } => {
137                let min_faults = max_faults / 3;
138
139                // Recover all nodes if we already crashed them all.
140                if self.dead == *max_faults {
141                    let instances: Vec<_> = self.instances.clone().drain(0..*max_faults).collect();
142                    self.dead = 0;
143                    CrashRecoveryAction::boot(instances)
144                }
145                // Otherwise crash a few nodes at the time.
146                else {
147                    let (l, h) = if self.dead == 0 && min_faults != 0 {
148                        (0, min_faults)
149                    } else if self.dead == min_faults && min_faults != 0 {
150                        (min_faults, 2 * min_faults)
151                    } else {
152                        (2 * min_faults, *max_faults)
153                    };
154
155                    let instances: Vec<_> = self.instances.clone().drain(l..h).collect();
156                    self.dead += h - l;
157                    CrashRecoveryAction::kill(instances)
158                }
159            }
160        }
161    }
162}
163
164#[cfg(test)]
165mod faults_tests {
166    use std::time::Duration;
167
168    use super::{CrashRecoverySchedule, FaultsType};
169    use crate::client::Instance;
170
171    #[test]
172    fn crash_recovery_1_fault() {
173        let max_faults = 1;
174        let interval = Duration::from_secs(60);
175        let faulty = (0..max_faults)
176            .map(|i| Instance::new_for_test(i.to_string()))
177            .collect();
178        let mut schedule = CrashRecoverySchedule::new(
179            FaultsType::CrashRecovery {
180                max_faults,
181                interval,
182            },
183            faulty,
184        );
185
186        let action = schedule.update();
187        assert_eq!(action.boot.len(), 0);
188        assert_eq!(action.kill.len(), 1);
189
190        let action = schedule.update();
191        assert_eq!(action.boot.len(), 1);
192        assert_eq!(action.kill.len(), 0);
193
194        let action = schedule.update();
195        assert_eq!(action.boot.len(), 0);
196        assert_eq!(action.kill.len(), 1);
197
198        let action = schedule.update();
199        assert_eq!(action.boot.len(), 1);
200        assert_eq!(action.kill.len(), 0);
201    }
202
203    #[test]
204    fn crash_recovery_2_faults() {
205        let max_faults = 2;
206        let interval = Duration::from_secs(60);
207        let faulty = (0..max_faults)
208            .map(|i| Instance::new_for_test(i.to_string()))
209            .collect();
210        let mut schedule = CrashRecoverySchedule::new(
211            FaultsType::CrashRecovery {
212                max_faults,
213                interval,
214            },
215            faulty,
216        );
217
218        let action = schedule.update();
219        assert_eq!(action.boot.len(), 0);
220        assert_eq!(action.kill.len(), 2);
221
222        let action = schedule.update();
223        assert_eq!(action.boot.len(), 2);
224        assert_eq!(action.kill.len(), 0);
225
226        let action = schedule.update();
227        assert_eq!(action.boot.len(), 0);
228        assert_eq!(action.kill.len(), 2);
229
230        let action = schedule.update();
231        assert_eq!(action.boot.len(), 2);
232        assert_eq!(action.kill.len(), 0);
233    }
234
235    #[test]
236    fn crash_recovery() {
237        let interval = Duration::from_secs(60);
238        for i in 3..33 {
239            let max_faults = i;
240            let min_faults = max_faults / 3;
241
242            let instances = (0..max_faults)
243                .map(|i| Instance::new_for_test(i.to_string()))
244                .collect();
245            let mut schedule = CrashRecoverySchedule::new(
246                FaultsType::CrashRecovery {
247                    max_faults,
248                    interval,
249                },
250                instances,
251            );
252
253            let action = schedule.update();
254            assert_eq!(action.boot.len(), 0);
255            assert_eq!(action.kill.len(), min_faults);
256
257            let action = schedule.update();
258            assert_eq!(action.boot.len(), 0);
259            assert_eq!(action.kill.len(), min_faults);
260
261            let action = schedule.update();
262            assert_eq!(action.boot.len(), 0);
263            assert_eq!(action.kill.len(), max_faults - 2 * min_faults);
264
265            let action = schedule.update();
266            assert_eq!(action.boot.len(), max_faults);
267            assert_eq!(action.kill.len(), 0);
268
269            let action = schedule.update();
270            assert_eq!(action.boot.len(), 0);
271            assert_eq!(action.kill.len(), min_faults);
272        }
273    }
274}