1use std::{
7 net::SocketAddr,
8 sync::{Arc, Mutex},
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use iota_config::genesis::Genesis;
14use iota_metrics::spawn_monitored_task;
15use iota_types::{
16 crypto::AuthorityKeyPair,
17 effects::TransactionEffectsAPI,
18 error::{IotaError, IotaResult},
19 iota_system_state::IotaSystemState,
20 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
21 messages_grpc::{
22 HandleCertificateRequestV1, HandleCertificateResponseV1,
23 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
24 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
25 TransactionInfoRequest, TransactionInfoResponse,
26 },
27 transaction::{Transaction, VerifiedTransaction},
28};
29
30use crate::{
31 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
32 authority_client::AuthorityAPI,
33};
34
35#[derive(Clone, Copy, Default)]
36pub struct LocalAuthorityClientFaultConfig {
37 pub fail_before_handle_transaction: bool,
38 pub fail_after_handle_transaction: bool,
39 pub fail_before_handle_confirmation: bool,
40 pub fail_after_handle_confirmation: bool,
41 pub overload_retry_after_handle_transaction: Option<Duration>,
42}
43
44impl LocalAuthorityClientFaultConfig {
45 pub fn reset(&mut self) {
46 *self = Self::default();
47 }
48}
49
50#[derive(Clone)]
51pub struct LocalAuthorityClient {
52 pub state: Arc<AuthorityState>,
53 pub fault_config: LocalAuthorityClientFaultConfig,
54}
55
56#[async_trait]
57impl AuthorityAPI for LocalAuthorityClient {
58 async fn handle_transaction(
59 &self,
60 transaction: Transaction,
61 _client_addr: Option<SocketAddr>,
62 ) -> Result<HandleTransactionResponse, IotaError> {
63 if self.fault_config.fail_before_handle_transaction {
64 return Err(IotaError::from("Mock error before handle_transaction"));
65 }
66 let state = self.state.clone();
67 let epoch_store = self.state.load_epoch_store_one_call_per_task();
68 let transaction = epoch_store
69 .signature_verifier
70 .verify_tx(transaction.data())
71 .map(|_| VerifiedTransaction::new_from_verified(transaction))?;
72 let result = state.handle_transaction(&epoch_store, transaction).await;
73 if self.fault_config.fail_after_handle_transaction {
74 return Err(IotaError::GenericAuthority {
75 error: "Mock error after handle_transaction".to_owned(),
76 });
77 }
78 if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
79 return Err(IotaError::ValidatorOverloadedRetryAfter {
80 retry_after_secs: duration.as_secs(),
81 });
82 }
83 result
84 }
85
86 async fn handle_certificate_v1(
87 &self,
88 request: HandleCertificateRequestV1,
89 _client_addr: Option<SocketAddr>,
90 ) -> Result<HandleCertificateResponseV1, IotaError> {
91 let state = self.state.clone();
92 let fault_config = self.fault_config;
93 spawn_monitored_task!(Self::handle_certificate(state, request, fault_config))
94 .await
95 .unwrap()
96 }
97
98 async fn handle_soft_bundle_certificates_v1(
99 &self,
100 _request: HandleSoftBundleCertificatesRequestV1,
101 _client_addr: Option<SocketAddr>,
102 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
103 unimplemented!()
104 }
105
106 async fn handle_object_info_request(
107 &self,
108 request: ObjectInfoRequest,
109 ) -> Result<ObjectInfoResponse, IotaError> {
110 let state = self.state.clone();
111 state.handle_object_info_request(request).await
112 }
113
114 async fn handle_transaction_info_request(
116 &self,
117 request: TransactionInfoRequest,
118 ) -> Result<TransactionInfoResponse, IotaError> {
119 let state = self.state.clone();
120 state.handle_transaction_info_request(request).await
121 }
122
123 async fn handle_checkpoint(
124 &self,
125 request: CheckpointRequest,
126 ) -> Result<CheckpointResponse, IotaError> {
127 let state = self.state.clone();
128
129 state.handle_checkpoint_request(&request)
130 }
131
132 async fn handle_system_state_object(
133 &self,
134 _request: SystemStateRequest,
135 ) -> Result<IotaSystemState, IotaError> {
136 self.state.get_iota_system_state_object_for_testing()
137 }
138}
139
140impl LocalAuthorityClient {
141 pub async fn new(secret: AuthorityKeyPair, genesis: &Genesis) -> Self {
142 let state = TestAuthorityBuilder::new()
143 .with_genesis_and_keypair(genesis, &secret)
144 .build()
145 .await;
146 Self {
147 state,
148 fault_config: LocalAuthorityClientFaultConfig::default(),
149 }
150 }
151
152 pub fn new_from_authority(state: Arc<AuthorityState>) -> Self {
153 Self {
154 state,
155 fault_config: LocalAuthorityClientFaultConfig::default(),
156 }
157 }
158
159 async fn handle_certificate(
164 state: Arc<AuthorityState>,
165 request: HandleCertificateRequestV1,
166 fault_config: LocalAuthorityClientFaultConfig,
167 ) -> Result<HandleCertificateResponseV1, IotaError> {
168 if fault_config.fail_before_handle_confirmation {
169 return Err(IotaError::GenericAuthority {
170 error: "Mock error before handle_confirmation_transaction".to_owned(),
171 });
172 }
173 let tx_digest = *request.certificate.digest();
176 let epoch_store = state.epoch_store_for_testing();
177 let signed_effects = match state
178 .get_signed_effects_and_maybe_resign(&tx_digest, &epoch_store)
179 {
180 Ok(Some(effects)) => effects,
181 _ => {
182 let certificate = epoch_store
183 .signature_verifier
184 .verify_cert(request.certificate)
185 .await?;
186 state.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store);
188 let effects = state.notify_read_effects(&certificate).await?;
189 state.sign_effects(effects, &epoch_store)?
190 }
191 }
192 .into_inner();
193
194 let events = if request.include_events {
195 if let Some(digest) = signed_effects.events_digest() {
196 Some(state.get_transaction_events(digest)?)
197 } else {
198 None
199 }
200 } else {
201 None
202 };
203
204 if fault_config.fail_after_handle_confirmation {
205 return Err(IotaError::GenericAuthority {
206 error: "Mock error after handle_confirmation_transaction".to_owned(),
207 });
208 }
209
210 let input_objects = request
211 .include_input_objects
212 .then(|| state.get_transaction_input_objects(&signed_effects))
213 .and_then(Result::ok);
214
215 let output_objects = request
216 .include_output_objects
217 .then(|| state.get_transaction_output_objects(&signed_effects))
218 .and_then(Result::ok);
219
220 Ok(HandleCertificateResponseV1 {
221 signed_effects,
222 events,
223 input_objects,
224 output_objects,
225 auxiliary_data: None, })
227 }
228}
229
230#[derive(Clone)]
231pub struct MockAuthorityApi {
232 delay: Duration,
233 count: Arc<Mutex<u32>>,
234 handle_object_info_request_result: Option<IotaResult<ObjectInfoResponse>>,
235}
236
237impl MockAuthorityApi {
238 pub fn new(delay: Duration, count: Arc<Mutex<u32>>) -> Self {
239 MockAuthorityApi {
240 delay,
241 count,
242 handle_object_info_request_result: None,
243 }
244 }
245
246 pub fn set_handle_object_info_request(&mut self, result: IotaResult<ObjectInfoResponse>) {
247 self.handle_object_info_request_result = Some(result);
248 }
249}
250
251#[async_trait]
252impl AuthorityAPI for MockAuthorityApi {
253 async fn handle_transaction(
255 &self,
256 _transaction: Transaction,
257 _client_addr: Option<SocketAddr>,
258 ) -> Result<HandleTransactionResponse, IotaError> {
259 unimplemented!();
260 }
261
262 async fn handle_certificate_v1(
263 &self,
264 _request: HandleCertificateRequestV1,
265 _client_addr: Option<SocketAddr>,
266 ) -> Result<HandleCertificateResponseV1, IotaError> {
267 unimplemented!()
268 }
269
270 async fn handle_soft_bundle_certificates_v1(
271 &self,
272 _request: HandleSoftBundleCertificatesRequestV1,
273 _client_addr: Option<SocketAddr>,
274 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
275 unimplemented!()
276 }
277
278 async fn handle_object_info_request(
280 &self,
281 _request: ObjectInfoRequest,
282 ) -> Result<ObjectInfoResponse, IotaError> {
283 self.handle_object_info_request_result.clone().unwrap()
284 }
285
286 async fn handle_transaction_info_request(
288 &self,
289 request: TransactionInfoRequest,
290 ) -> Result<TransactionInfoResponse, IotaError> {
291 let count = {
292 let mut count = self.count.lock().unwrap();
293 *count += 1;
294 *count
295 };
296
297 if count < 15 {
299 tokio::time::sleep(self.delay).await;
300 }
301
302 Err(IotaError::TransactionNotFound {
303 digest: request.transaction_digest,
304 })
305 }
306
307 async fn handle_checkpoint(
308 &self,
309 _request: CheckpointRequest,
310 ) -> Result<CheckpointResponse, IotaError> {
311 unimplemented!();
312 }
313
314 async fn handle_system_state_object(
315 &self,
316 _request: SystemStateRequest,
317 ) -> Result<IotaSystemState, IotaError> {
318 unimplemented!();
319 }
320}
321
322#[derive(Clone)]
323pub struct HandleTransactionTestAuthorityClient {
324 pub tx_info_resp_to_return: IotaResult<HandleTransactionResponse>,
325 pub cert_resp_to_return: IotaResult<HandleCertificateResponseV1>,
326 pub sleep_duration_before_responding: Option<Duration>,
329}
330
331#[async_trait]
332impl AuthorityAPI for HandleTransactionTestAuthorityClient {
333 async fn handle_transaction(
334 &self,
335 _transaction: Transaction,
336 _client_addr: Option<SocketAddr>,
337 ) -> Result<HandleTransactionResponse, IotaError> {
338 if let Some(duration) = self.sleep_duration_before_responding {
339 tokio::time::sleep(duration).await;
340 }
341 self.tx_info_resp_to_return.clone()
342 }
343
344 async fn handle_certificate_v1(
345 &self,
346 _request: HandleCertificateRequestV1,
347 _client_addr: Option<SocketAddr>,
348 ) -> Result<HandleCertificateResponseV1, IotaError> {
349 if let Some(duration) = self.sleep_duration_before_responding {
350 tokio::time::sleep(duration).await;
351 }
352 self.cert_resp_to_return.clone()
353 }
354
355 async fn handle_soft_bundle_certificates_v1(
356 &self,
357 _request: HandleSoftBundleCertificatesRequestV1,
358 _client_addr: Option<SocketAddr>,
359 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
360 unimplemented!()
361 }
362
363 async fn handle_object_info_request(
364 &self,
365 _request: ObjectInfoRequest,
366 ) -> Result<ObjectInfoResponse, IotaError> {
367 unimplemented!()
368 }
369
370 async fn handle_transaction_info_request(
371 &self,
372 _request: TransactionInfoRequest,
373 ) -> Result<TransactionInfoResponse, IotaError> {
374 unimplemented!()
375 }
376
377 async fn handle_checkpoint(
378 &self,
379 _request: CheckpointRequest,
380 ) -> Result<CheckpointResponse, IotaError> {
381 unimplemented!()
382 }
383
384 async fn handle_system_state_object(
385 &self,
386 _request: SystemStateRequest,
387 ) -> Result<IotaSystemState, IotaError> {
388 unimplemented!()
389 }
390}
391
392impl HandleTransactionTestAuthorityClient {
393 pub fn new() -> Self {
394 Self {
395 tx_info_resp_to_return: Err(IotaError::Unknown("".to_string())),
396 cert_resp_to_return: Err(IotaError::Unknown("".to_string())),
397 sleep_duration_before_responding: None,
398 }
399 }
400
401 pub fn set_tx_info_response(&mut self, resp: HandleTransactionResponse) {
402 self.tx_info_resp_to_return = Ok(resp);
403 }
404
405 pub fn set_tx_info_response_error(&mut self, error: IotaError) {
406 self.tx_info_resp_to_return = Err(error);
407 }
408
409 pub fn reset_tx_info_response(&mut self) {
410 self.tx_info_resp_to_return = Err(IotaError::Unknown("".to_string()));
411 }
412
413 pub fn set_cert_resp_to_return(&mut self, resp: HandleCertificateResponseV1) {
414 self.cert_resp_to_return = Ok(resp);
415 }
416
417 pub fn set_cert_resp_to_return_error(&mut self, error: IotaError) {
418 self.cert_resp_to_return = Err(error);
419 }
420
421 pub fn reset_cert_response(&mut self) {
422 self.cert_resp_to_return = Err(IotaError::Unknown("".to_string()));
423 }
424
425 pub fn set_sleep_duration_before_responding(&mut self, duration: Duration) {
426 self.sleep_duration_before_responding = Some(duration);
427 }
428}
429
430impl Default for HandleTransactionTestAuthorityClient {
431 fn default() -> Self {
432 Self::new()
433 }
434}