1use std::{
6 fmt::{Debug, Display},
7 time::Duration,
8};
9
10use serde::{Deserialize, Serialize};
11
12use crate::client::Instance;
13
14#[derive(Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
15pub enum FaultsType {
16 Permanent { faults: usize },
18 CrashRecovery {
20 max_faults: usize,
21 interval: Duration,
22 },
23}
24
25impl Default for FaultsType {
26 fn default() -> Self {
27 Self::Permanent { faults: 0 }
28 }
29}
30
31impl Debug for FaultsType {
32 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33 match self {
34 Self::Permanent { faults } => write!(f, "{faults}"),
35 Self::CrashRecovery {
36 max_faults,
37 interval,
38 } => write!(f, "{max_faults}-{}cr", interval.as_secs()),
39 }
40 }
41}
42
43impl Display for FaultsType {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 Self::Permanent { faults } => {
47 if *faults == 0 {
48 write!(f, "no faults")
49 } else {
50 write!(f, "{faults} crashed")
51 }
52 }
53 Self::CrashRecovery {
54 max_faults,
55 interval,
56 } => write!(f, "{max_faults} crash-recovery, {}s", interval.as_secs()),
57 }
58 }
59}
60
61#[derive(Default)]
64pub struct CrashRecoveryAction {
65 pub boot: Vec<Instance>,
67 pub kill: Vec<Instance>,
69}
70
71impl Display for CrashRecoveryAction {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 let booted = self.boot.len();
74 let killed = self.kill.len();
75
76 if self.boot.is_empty() {
77 write!(f, "{killed} node(s) killed")
78 } else if self.kill.is_empty() {
79 write!(f, "{booted} node(s) recovered")
80 } else {
81 write!(f, "{killed} node(s) killed and {booted} node(s) recovered")
82 }
83 }
84}
85
86impl CrashRecoveryAction {
87 pub fn boot(instances: Vec<Instance>) -> Self {
88 Self {
89 boot: instances,
90 kill: Vec::new(),
91 }
92 }
93
94 pub fn kill(instances: Vec<Instance>) -> Self {
95 Self {
96 boot: Vec::new(),
97 kill: instances,
98 }
99 }
100
101 pub fn no_op() -> Self {
102 Self::default()
103 }
104}
105
106pub struct CrashRecoverySchedule {
107 faults_type: FaultsType,
109 instances: Vec<Instance>,
111 dead: usize,
113}
114
115impl CrashRecoverySchedule {
116 pub fn new(faults_type: FaultsType, instances: Vec<Instance>) -> Self {
117 Self {
118 faults_type,
119 instances,
120 dead: 0,
121 }
122 }
123 pub fn update(&mut self) -> CrashRecoveryAction {
124 match &self.faults_type {
125 FaultsType::Permanent { faults } => {
127 if self.dead == 0 {
128 self.dead = *faults;
129 CrashRecoveryAction::kill(self.instances.clone().drain(0..*faults).collect())
130 } else {
131 CrashRecoveryAction::no_op()
132 }
133 }
134
135 FaultsType::CrashRecovery { max_faults, .. } => {
137 let min_faults = max_faults / 3;
138
139 if self.dead == *max_faults {
141 let instances: Vec<_> = self.instances.clone().drain(0..*max_faults).collect();
142 self.dead = 0;
143 CrashRecoveryAction::boot(instances)
144 }
145 else {
147 let (l, h) = if self.dead == 0 && min_faults != 0 {
148 (0, min_faults)
149 } else if self.dead == min_faults && min_faults != 0 {
150 (min_faults, 2 * min_faults)
151 } else {
152 (2 * min_faults, *max_faults)
153 };
154
155 let instances: Vec<_> = self.instances.clone().drain(l..h).collect();
156 self.dead += h - l;
157 CrashRecoveryAction::kill(instances)
158 }
159 }
160 }
161 }
162}
163
164#[cfg(test)]
165mod faults_tests {
166 use std::time::Duration;
167
168 use super::{CrashRecoverySchedule, FaultsType};
169 use crate::client::Instance;
170
171 #[test]
172 fn crash_recovery_1_fault() {
173 let max_faults = 1;
174 let interval = Duration::from_secs(60);
175 let faulty = (0..max_faults)
176 .map(|i| Instance::new_for_test(i.to_string()))
177 .collect();
178 let mut schedule = CrashRecoverySchedule::new(
179 FaultsType::CrashRecovery {
180 max_faults,
181 interval,
182 },
183 faulty,
184 );
185
186 let action = schedule.update();
187 assert_eq!(action.boot.len(), 0);
188 assert_eq!(action.kill.len(), 1);
189
190 let action = schedule.update();
191 assert_eq!(action.boot.len(), 1);
192 assert_eq!(action.kill.len(), 0);
193
194 let action = schedule.update();
195 assert_eq!(action.boot.len(), 0);
196 assert_eq!(action.kill.len(), 1);
197
198 let action = schedule.update();
199 assert_eq!(action.boot.len(), 1);
200 assert_eq!(action.kill.len(), 0);
201 }
202
203 #[test]
204 fn crash_recovery_2_faults() {
205 let max_faults = 2;
206 let interval = Duration::from_secs(60);
207 let faulty = (0..max_faults)
208 .map(|i| Instance::new_for_test(i.to_string()))
209 .collect();
210 let mut schedule = CrashRecoverySchedule::new(
211 FaultsType::CrashRecovery {
212 max_faults,
213 interval,
214 },
215 faulty,
216 );
217
218 let action = schedule.update();
219 assert_eq!(action.boot.len(), 0);
220 assert_eq!(action.kill.len(), 2);
221
222 let action = schedule.update();
223 assert_eq!(action.boot.len(), 2);
224 assert_eq!(action.kill.len(), 0);
225
226 let action = schedule.update();
227 assert_eq!(action.boot.len(), 0);
228 assert_eq!(action.kill.len(), 2);
229
230 let action = schedule.update();
231 assert_eq!(action.boot.len(), 2);
232 assert_eq!(action.kill.len(), 0);
233 }
234
235 #[test]
236 fn crash_recovery() {
237 let interval = Duration::from_secs(60);
238 for i in 3..33 {
239 let max_faults = i;
240 let min_faults = max_faults / 3;
241
242 let instances = (0..max_faults)
243 .map(|i| Instance::new_for_test(i.to_string()))
244 .collect();
245 let mut schedule = CrashRecoverySchedule::new(
246 FaultsType::CrashRecovery {
247 max_faults,
248 interval,
249 },
250 instances,
251 );
252
253 let action = schedule.update();
254 assert_eq!(action.boot.len(), 0);
255 assert_eq!(action.kill.len(), min_faults);
256
257 let action = schedule.update();
258 assert_eq!(action.boot.len(), 0);
259 assert_eq!(action.kill.len(), min_faults);
260
261 let action = schedule.update();
262 assert_eq!(action.boot.len(), 0);
263 assert_eq!(action.kill.len(), max_faults - 2 * min_faults);
264
265 let action = schedule.update();
266 assert_eq!(action.boot.len(), max_faults);
267 assert_eq!(action.kill.len(), 0);
268
269 let action = schedule.update();
270 assert_eq!(action.boot.len(), 0);
271 assert_eq!(action.kill.len(), min_faults);
272 }
273 }
274}