1use std::{
7 net::SocketAddr,
8 sync::{Arc, Mutex},
9 time::Duration,
10};
11
12use async_trait::async_trait;
13use iota_config::genesis::Genesis;
14use iota_metrics::spawn_monitored_task;
15use iota_types::{
16 crypto::AuthorityKeyPair,
17 effects::TransactionEffectsAPI,
18 error::{IotaError, IotaResult},
19 iota_system_state::IotaSystemState,
20 messages_checkpoint::{CheckpointRequest, CheckpointResponse},
21 messages_grpc::{
22 HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
23 HandleCertificateRequestV1, HandleCertificateResponseV1,
24 HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
25 HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
26 TransactionInfoRequest, TransactionInfoResponse,
27 },
28 transaction::{Transaction, VerifiedTransaction},
29};
30use tracing::info;
31
32use crate::{
33 authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
34 authority_client::AuthorityAPI,
35};
36
37#[derive(Clone, Copy, Default)]
38pub struct LocalAuthorityClientFaultConfig {
39 pub fail_before_handle_transaction: bool,
40 pub fail_after_handle_transaction: bool,
41 pub fail_before_handle_confirmation: bool,
42 pub fail_after_handle_confirmation: bool,
43 pub overload_retry_after_handle_transaction: Option<Duration>,
44}
45
46impl LocalAuthorityClientFaultConfig {
47 pub fn reset(&mut self) {
48 *self = Self::default();
49 }
50}
51
52#[derive(Clone)]
53pub struct LocalAuthorityClient {
54 pub state: Arc<AuthorityState>,
55 pub fault_config: LocalAuthorityClientFaultConfig,
56}
57
58#[async_trait]
59impl AuthorityAPI for LocalAuthorityClient {
60 async fn handle_transaction(
61 &self,
62 transaction: Transaction,
63 _client_addr: Option<SocketAddr>,
64 ) -> Result<HandleTransactionResponse, IotaError> {
65 if self.fault_config.fail_before_handle_transaction {
66 return Err(IotaError::from("Mock error before handle_transaction"));
67 }
68 let state = self.state.clone();
69 let epoch_store = self.state.load_epoch_store_one_call_per_task();
70 let transaction = epoch_store
71 .signature_verifier
72 .verify_tx(transaction.data())
73 .map(|_| VerifiedTransaction::new_from_verified(transaction))?;
74 let result = state.handle_transaction(&epoch_store, transaction).await;
75 if self.fault_config.fail_after_handle_transaction {
76 return Err(IotaError::GenericAuthority {
77 error: "Mock error after handle_transaction".to_owned(),
78 });
79 }
80 if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
81 return Err(IotaError::ValidatorOverloadedRetryAfter {
82 retry_after_secs: duration.as_secs(),
83 });
84 }
85 result
86 }
87
88 async fn handle_certificate_v1(
89 &self,
90 request: HandleCertificateRequestV1,
91 _client_addr: Option<SocketAddr>,
92 ) -> Result<HandleCertificateResponseV1, IotaError> {
93 let state = self.state.clone();
94 let fault_config = self.fault_config;
95 spawn_monitored_task!(Self::handle_certificate(state, request, fault_config))
96 .await
97 .unwrap()
98 }
99
100 async fn handle_soft_bundle_certificates_v1(
101 &self,
102 _request: HandleSoftBundleCertificatesRequestV1,
103 _client_addr: Option<SocketAddr>,
104 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
105 unimplemented!()
106 }
107
108 async fn handle_object_info_request(
109 &self,
110 request: ObjectInfoRequest,
111 ) -> Result<ObjectInfoResponse, IotaError> {
112 let state = self.state.clone();
113 state.handle_object_info_request(request).await
114 }
115
116 async fn handle_transaction_info_request(
118 &self,
119 request: TransactionInfoRequest,
120 ) -> Result<TransactionInfoResponse, IotaError> {
121 let state = self.state.clone();
122 state.handle_transaction_info_request(request).await
123 }
124
125 async fn handle_checkpoint(
126 &self,
127 request: CheckpointRequest,
128 ) -> Result<CheckpointResponse, IotaError> {
129 let state = self.state.clone();
130
131 state.handle_checkpoint_request(&request)
132 }
133
134 async fn handle_system_state_object(
135 &self,
136 _request: SystemStateRequest,
137 ) -> Result<IotaSystemState, IotaError> {
138 self.state.get_iota_system_state_object_for_testing()
139 }
140
141 async fn handle_capability_notification_v1(
142 &self,
143 request: HandleCapabilityNotificationRequestV1,
144 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
145 let state = self.state.clone();
146 let epoch_store = state.load_epoch_store_one_call_per_task();
147
148 let verified_authority_capabilities =
150 epoch_store.verify_authority_capabilities(request.message)?;
151
152 info!(
154 "Received capability notification: {:?}",
155 verified_authority_capabilities.data()
156 );
157
158 epoch_store.record_capabilities_v1(verified_authority_capabilities.data())?;
160
161 Ok(HandleCapabilityNotificationResponseV1 { _unused: false })
162 }
163}
164
165impl LocalAuthorityClient {
166 pub async fn new(secret: AuthorityKeyPair, genesis: &Genesis) -> Self {
167 let state = TestAuthorityBuilder::new()
168 .with_genesis_and_keypair(genesis, &secret)
169 .build()
170 .await;
171 Self {
172 state,
173 fault_config: LocalAuthorityClientFaultConfig::default(),
174 }
175 }
176
177 pub fn new_from_authority(state: Arc<AuthorityState>) -> Self {
178 Self {
179 state,
180 fault_config: LocalAuthorityClientFaultConfig::default(),
181 }
182 }
183
184 async fn handle_certificate(
189 state: Arc<AuthorityState>,
190 request: HandleCertificateRequestV1,
191 fault_config: LocalAuthorityClientFaultConfig,
192 ) -> Result<HandleCertificateResponseV1, IotaError> {
193 if fault_config.fail_before_handle_confirmation {
194 return Err(IotaError::GenericAuthority {
195 error: "Mock error before handle_confirmation_transaction".to_owned(),
196 });
197 }
198 let tx_digest = *request.certificate.digest();
201 let epoch_store = state.epoch_store_for_testing();
202 let signed_effects = match state
203 .get_signed_effects_and_maybe_resign(&tx_digest, &epoch_store)
204 {
205 Ok(Some(effects)) => effects,
206 _ => {
207 let certificate = epoch_store
208 .signature_verifier
209 .verify_cert(request.certificate)
210 .await?;
211 state.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store);
213 let effects = state.notify_read_effects(&certificate).await?;
214 state.sign_effects(effects, &epoch_store)?
215 }
216 }
217 .into_inner();
218
219 let events = if request.include_events {
220 if let Some(digest) = signed_effects.events_digest() {
221 Some(state.get_transaction_events(digest)?)
222 } else {
223 None
224 }
225 } else {
226 None
227 };
228
229 if fault_config.fail_after_handle_confirmation {
230 return Err(IotaError::GenericAuthority {
231 error: "Mock error after handle_confirmation_transaction".to_owned(),
232 });
233 }
234
235 let input_objects = request
236 .include_input_objects
237 .then(|| state.get_transaction_input_objects(&signed_effects))
238 .and_then(Result::ok);
239
240 let output_objects = request
241 .include_output_objects
242 .then(|| state.get_transaction_output_objects(&signed_effects))
243 .and_then(Result::ok);
244
245 Ok(HandleCertificateResponseV1 {
246 signed_effects,
247 events,
248 input_objects,
249 output_objects,
250 auxiliary_data: None, })
252 }
253}
254
255#[derive(Clone)]
256pub struct MockAuthorityApi {
257 delay: Duration,
258 count: Arc<Mutex<u32>>,
259 handle_object_info_request_result: Option<IotaResult<ObjectInfoResponse>>,
260 handle_capability_notification_result:
261 Option<IotaResult<HandleCapabilityNotificationResponseV1>>,
262}
263
264impl MockAuthorityApi {
265 pub fn new(delay: Duration, count: Arc<Mutex<u32>>) -> Self {
266 MockAuthorityApi {
267 delay,
268 count,
269 handle_object_info_request_result: None,
270 handle_capability_notification_result: None,
271 }
272 }
273
274 pub fn set_handle_object_info_request(&mut self, result: IotaResult<ObjectInfoResponse>) {
275 self.handle_object_info_request_result = Some(result);
276 }
277
278 pub fn set_handle_capability_notification(
279 &mut self,
280 result: IotaResult<HandleCapabilityNotificationResponseV1>,
281 ) {
282 self.handle_capability_notification_result = Some(result);
283 }
284}
285
286#[async_trait]
287impl AuthorityAPI for MockAuthorityApi {
288 async fn handle_transaction(
290 &self,
291 _transaction: Transaction,
292 _client_addr: Option<SocketAddr>,
293 ) -> Result<HandleTransactionResponse, IotaError> {
294 unimplemented!();
295 }
296
297 async fn handle_certificate_v1(
298 &self,
299 _request: HandleCertificateRequestV1,
300 _client_addr: Option<SocketAddr>,
301 ) -> Result<HandleCertificateResponseV1, IotaError> {
302 unimplemented!()
303 }
304
305 async fn handle_soft_bundle_certificates_v1(
306 &self,
307 _request: HandleSoftBundleCertificatesRequestV1,
308 _client_addr: Option<SocketAddr>,
309 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
310 unimplemented!()
311 }
312
313 async fn handle_object_info_request(
315 &self,
316 _request: ObjectInfoRequest,
317 ) -> Result<ObjectInfoResponse, IotaError> {
318 self.handle_object_info_request_result.clone().unwrap()
319 }
320
321 async fn handle_transaction_info_request(
323 &self,
324 request: TransactionInfoRequest,
325 ) -> Result<TransactionInfoResponse, IotaError> {
326 let count = {
327 let mut count = self.count.lock().unwrap();
328 *count += 1;
329 *count
330 };
331
332 if count < 15 {
334 tokio::time::sleep(self.delay).await;
335 }
336
337 Err(IotaError::TransactionNotFound {
338 digest: request.transaction_digest,
339 })
340 }
341
342 async fn handle_checkpoint(
343 &self,
344 _request: CheckpointRequest,
345 ) -> Result<CheckpointResponse, IotaError> {
346 unimplemented!();
347 }
348
349 async fn handle_system_state_object(
350 &self,
351 _request: SystemStateRequest,
352 ) -> Result<IotaSystemState, IotaError> {
353 unimplemented!();
354 }
355
356 async fn handle_capability_notification_v1(
357 &self,
358 _request: HandleCapabilityNotificationRequestV1,
359 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
360 tokio::time::sleep(self.delay).await;
361
362 match &self.handle_capability_notification_result {
363 Some(result) => result.clone(),
364 None => Ok(HandleCapabilityNotificationResponseV1 { _unused: false }),
365 }
366 }
367}
368
369#[derive(Clone)]
370pub struct HandleTransactionTestAuthorityClient {
371 pub tx_info_resp_to_return: IotaResult<HandleTransactionResponse>,
372 pub cert_resp_to_return: IotaResult<HandleCertificateResponseV1>,
373 pub sleep_duration_before_responding: Option<Duration>,
376}
377
378#[async_trait]
379impl AuthorityAPI for HandleTransactionTestAuthorityClient {
380 async fn handle_transaction(
381 &self,
382 _transaction: Transaction,
383 _client_addr: Option<SocketAddr>,
384 ) -> Result<HandleTransactionResponse, IotaError> {
385 if let Some(duration) = self.sleep_duration_before_responding {
386 tokio::time::sleep(duration).await;
387 }
388 self.tx_info_resp_to_return.clone()
389 }
390
391 async fn handle_certificate_v1(
392 &self,
393 _request: HandleCertificateRequestV1,
394 _client_addr: Option<SocketAddr>,
395 ) -> Result<HandleCertificateResponseV1, IotaError> {
396 if let Some(duration) = self.sleep_duration_before_responding {
397 tokio::time::sleep(duration).await;
398 }
399 self.cert_resp_to_return.clone()
400 }
401
402 async fn handle_soft_bundle_certificates_v1(
403 &self,
404 _request: HandleSoftBundleCertificatesRequestV1,
405 _client_addr: Option<SocketAddr>,
406 ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
407 unimplemented!()
408 }
409
410 async fn handle_object_info_request(
411 &self,
412 _request: ObjectInfoRequest,
413 ) -> Result<ObjectInfoResponse, IotaError> {
414 unimplemented!()
415 }
416
417 async fn handle_transaction_info_request(
418 &self,
419 _request: TransactionInfoRequest,
420 ) -> Result<TransactionInfoResponse, IotaError> {
421 unimplemented!()
422 }
423
424 async fn handle_checkpoint(
425 &self,
426 _request: CheckpointRequest,
427 ) -> Result<CheckpointResponse, IotaError> {
428 unimplemented!()
429 }
430
431 async fn handle_system_state_object(
432 &self,
433 _request: SystemStateRequest,
434 ) -> Result<IotaSystemState, IotaError> {
435 unimplemented!()
436 }
437
438 async fn handle_capability_notification_v1(
439 &self,
440 _request: HandleCapabilityNotificationRequestV1,
441 ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
442 unimplemented!()
443 }
444}
445
446impl HandleTransactionTestAuthorityClient {
447 pub fn new() -> Self {
448 Self {
449 tx_info_resp_to_return: Err(IotaError::Unknown("".to_string())),
450 cert_resp_to_return: Err(IotaError::Unknown("".to_string())),
451 sleep_duration_before_responding: None,
452 }
453 }
454
455 pub fn set_tx_info_response(&mut self, resp: HandleTransactionResponse) {
456 self.tx_info_resp_to_return = Ok(resp);
457 }
458
459 pub fn set_tx_info_response_error(&mut self, error: IotaError) {
460 self.tx_info_resp_to_return = Err(error);
461 }
462
463 pub fn reset_tx_info_response(&mut self) {
464 self.tx_info_resp_to_return = Err(IotaError::Unknown("".to_string()));
465 }
466
467 pub fn set_cert_resp_to_return(&mut self, resp: HandleCertificateResponseV1) {
468 self.cert_resp_to_return = Ok(resp);
469 }
470
471 pub fn set_cert_resp_to_return_error(&mut self, error: IotaError) {
472 self.cert_resp_to_return = Err(error);
473 }
474
475 pub fn reset_cert_response(&mut self) {
476 self.cert_resp_to_return = Err(IotaError::Unknown("".to_string()));
477 }
478
479 pub fn set_sleep_duration_before_responding(&mut self, duration: Duration) {
480 self.sleep_duration_before_responding = Some(duration);
481 }
482}
483
484impl Default for HandleTransactionTestAuthorityClient {
485 fn default() -> Self {
486 Self::new()
487 }
488}