iota_core/
test_authority_clients.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    net::SocketAddr,
8    sync::{Arc, Mutex},
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use iota_config::genesis::Genesis;
14use iota_metrics::spawn_monitored_task;
15use iota_types::{
16    crypto::AuthorityKeyPair,
17    effects::TransactionEffectsAPI,
18    error::{IotaError, IotaResult},
19    iota_system_state::IotaSystemState,
20    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
21    messages_grpc::{
22        HandleCapabilityNotificationRequestV1, HandleCapabilityNotificationResponseV1,
23        HandleCertificateRequestV1, HandleCertificateResponseV1,
24        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
25        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
26        TransactionInfoRequest, TransactionInfoResponse,
27    },
28    transaction::{Transaction, VerifiedTransaction},
29};
30use tracing::info;
31
32use crate::{
33    authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
34    authority_client::AuthorityAPI,
35};
36
37#[derive(Clone, Copy, Default)]
38pub struct LocalAuthorityClientFaultConfig {
39    pub fail_before_handle_transaction: bool,
40    pub fail_after_handle_transaction: bool,
41    pub fail_before_handle_confirmation: bool,
42    pub fail_after_handle_confirmation: bool,
43    pub overload_retry_after_handle_transaction: Option<Duration>,
44}
45
46impl LocalAuthorityClientFaultConfig {
47    pub fn reset(&mut self) {
48        *self = Self::default();
49    }
50}
51
52#[derive(Clone)]
53pub struct LocalAuthorityClient {
54    pub state: Arc<AuthorityState>,
55    pub fault_config: LocalAuthorityClientFaultConfig,
56}
57
58#[async_trait]
59impl AuthorityAPI for LocalAuthorityClient {
60    async fn handle_transaction(
61        &self,
62        transaction: Transaction,
63        _client_addr: Option<SocketAddr>,
64    ) -> Result<HandleTransactionResponse, IotaError> {
65        if self.fault_config.fail_before_handle_transaction {
66            return Err(IotaError::from("Mock error before handle_transaction"));
67        }
68        let state = self.state.clone();
69        let epoch_store = self.state.load_epoch_store_one_call_per_task();
70        let transaction = epoch_store
71            .signature_verifier
72            .verify_tx(transaction.data())
73            .map(|_| VerifiedTransaction::new_from_verified(transaction))?;
74        let result = state.handle_transaction(&epoch_store, transaction).await;
75        if self.fault_config.fail_after_handle_transaction {
76            return Err(IotaError::GenericAuthority {
77                error: "Mock error after handle_transaction".to_owned(),
78            });
79        }
80        if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
81            return Err(IotaError::ValidatorOverloadedRetryAfter {
82                retry_after_secs: duration.as_secs(),
83            });
84        }
85        result
86    }
87
88    async fn handle_certificate_v1(
89        &self,
90        request: HandleCertificateRequestV1,
91        _client_addr: Option<SocketAddr>,
92    ) -> Result<HandleCertificateResponseV1, IotaError> {
93        let state = self.state.clone();
94        let fault_config = self.fault_config;
95        spawn_monitored_task!(Self::handle_certificate(state, request, fault_config))
96            .await
97            .unwrap()
98    }
99
100    async fn handle_soft_bundle_certificates_v1(
101        &self,
102        _request: HandleSoftBundleCertificatesRequestV1,
103        _client_addr: Option<SocketAddr>,
104    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
105        unimplemented!()
106    }
107
108    async fn handle_object_info_request(
109        &self,
110        request: ObjectInfoRequest,
111    ) -> Result<ObjectInfoResponse, IotaError> {
112        let state = self.state.clone();
113        state.handle_object_info_request(request).await
114    }
115
116    /// Handle Object information requests for this account.
117    async fn handle_transaction_info_request(
118        &self,
119        request: TransactionInfoRequest,
120    ) -> Result<TransactionInfoResponse, IotaError> {
121        let state = self.state.clone();
122        state.handle_transaction_info_request(request).await
123    }
124
125    async fn handle_checkpoint(
126        &self,
127        request: CheckpointRequest,
128    ) -> Result<CheckpointResponse, IotaError> {
129        let state = self.state.clone();
130
131        state.handle_checkpoint_request(&request)
132    }
133
134    async fn handle_system_state_object(
135        &self,
136        _request: SystemStateRequest,
137    ) -> Result<IotaSystemState, IotaError> {
138        self.state.get_iota_system_state_object_for_testing()
139    }
140
141    async fn handle_capability_notification_v1(
142        &self,
143        request: HandleCapabilityNotificationRequestV1,
144    ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
145        let state = self.state.clone();
146        let epoch_store = state.load_epoch_store_one_call_per_task();
147
148        // Verify the message signature
149        let verified_authority_capabilities =
150            epoch_store.verify_authority_capabilities(request.message)?;
151
152        // Process the verified capabilities
153        info!(
154            "Received capability notification: {:?}",
155            verified_authority_capabilities.data()
156        );
157
158        // For test clients, directly record capabilities since we don't have consensus
159        epoch_store.record_capabilities_v1(verified_authority_capabilities.data())?;
160
161        Ok(HandleCapabilityNotificationResponseV1 { _unused: false })
162    }
163}
164
165impl LocalAuthorityClient {
166    pub async fn new(secret: AuthorityKeyPair, genesis: &Genesis) -> Self {
167        let state = TestAuthorityBuilder::new()
168            .with_genesis_and_keypair(genesis, &secret)
169            .build()
170            .await;
171        Self {
172            state,
173            fault_config: LocalAuthorityClientFaultConfig::default(),
174        }
175    }
176
177    pub fn new_from_authority(state: Arc<AuthorityState>) -> Self {
178        Self {
179            state,
180            fault_config: LocalAuthorityClientFaultConfig::default(),
181        }
182    }
183
184    // One difference between this implementation and actual certificate execution,
185    // is that this assumes shared object locks have already been acquired and
186    // tries to execute shared object transactions as well as owned object
187    // transactions.
188    async fn handle_certificate(
189        state: Arc<AuthorityState>,
190        request: HandleCertificateRequestV1,
191        fault_config: LocalAuthorityClientFaultConfig,
192    ) -> Result<HandleCertificateResponseV1, IotaError> {
193        if fault_config.fail_before_handle_confirmation {
194            return Err(IotaError::GenericAuthority {
195                error: "Mock error before handle_confirmation_transaction".to_owned(),
196            });
197        }
198        // Check existing effects before verifying the cert to allow querying certs
199        // finalized from previous epochs.
200        let tx_digest = *request.certificate.digest();
201        let epoch_store = state.epoch_store_for_testing();
202        let signed_effects = match state
203            .get_signed_effects_and_maybe_resign(&tx_digest, &epoch_store)
204        {
205            Ok(Some(effects)) => effects,
206            _ => {
207                let certificate = epoch_store
208                    .signature_verifier
209                    .verify_cert(request.certificate)
210                    .await?;
211                // let certificate = certificate.verify(epoch_store.committee())?;
212                state.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store);
213                let effects = state.notify_read_effects(&certificate).await?;
214                state.sign_effects(effects, &epoch_store)?
215            }
216        }
217        .into_inner();
218
219        let events = if request.include_events {
220            if let Some(digest) = signed_effects.events_digest() {
221                Some(state.get_transaction_events(digest)?)
222            } else {
223                None
224            }
225        } else {
226            None
227        };
228
229        if fault_config.fail_after_handle_confirmation {
230            return Err(IotaError::GenericAuthority {
231                error: "Mock error after handle_confirmation_transaction".to_owned(),
232            });
233        }
234
235        let input_objects = request
236            .include_input_objects
237            .then(|| state.get_transaction_input_objects(&signed_effects))
238            .and_then(Result::ok);
239
240        let output_objects = request
241            .include_output_objects
242            .then(|| state.get_transaction_output_objects(&signed_effects))
243            .and_then(Result::ok);
244
245        Ok(HandleCertificateResponseV1 {
246            signed_effects,
247            events,
248            input_objects,
249            output_objects,
250            auxiliary_data: None, // We don't have any aux data generated presently
251        })
252    }
253}
254
255#[derive(Clone)]
256pub struct MockAuthorityApi {
257    delay: Duration,
258    count: Arc<Mutex<u32>>,
259    handle_object_info_request_result: Option<IotaResult<ObjectInfoResponse>>,
260    handle_capability_notification_result:
261        Option<IotaResult<HandleCapabilityNotificationResponseV1>>,
262}
263
264impl MockAuthorityApi {
265    pub fn new(delay: Duration, count: Arc<Mutex<u32>>) -> Self {
266        MockAuthorityApi {
267            delay,
268            count,
269            handle_object_info_request_result: None,
270            handle_capability_notification_result: None,
271        }
272    }
273
274    pub fn set_handle_object_info_request(&mut self, result: IotaResult<ObjectInfoResponse>) {
275        self.handle_object_info_request_result = Some(result);
276    }
277
278    pub fn set_handle_capability_notification(
279        &mut self,
280        result: IotaResult<HandleCapabilityNotificationResponseV1>,
281    ) {
282        self.handle_capability_notification_result = Some(result);
283    }
284}
285
286#[async_trait]
287impl AuthorityAPI for MockAuthorityApi {
288    /// Initiate a new transaction to an IOTA or Primary account.
289    async fn handle_transaction(
290        &self,
291        _transaction: Transaction,
292        _client_addr: Option<SocketAddr>,
293    ) -> Result<HandleTransactionResponse, IotaError> {
294        unimplemented!();
295    }
296
297    async fn handle_certificate_v1(
298        &self,
299        _request: HandleCertificateRequestV1,
300        _client_addr: Option<SocketAddr>,
301    ) -> Result<HandleCertificateResponseV1, IotaError> {
302        unimplemented!()
303    }
304
305    async fn handle_soft_bundle_certificates_v1(
306        &self,
307        _request: HandleSoftBundleCertificatesRequestV1,
308        _client_addr: Option<SocketAddr>,
309    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
310        unimplemented!()
311    }
312
313    /// Handle Object information requests for this account.
314    async fn handle_object_info_request(
315        &self,
316        _request: ObjectInfoRequest,
317    ) -> Result<ObjectInfoResponse, IotaError> {
318        self.handle_object_info_request_result.clone().unwrap()
319    }
320
321    /// Handle Object information requests for this account.
322    async fn handle_transaction_info_request(
323        &self,
324        request: TransactionInfoRequest,
325    ) -> Result<TransactionInfoResponse, IotaError> {
326        let count = {
327            let mut count = self.count.lock().unwrap();
328            *count += 1;
329            *count
330        };
331
332        // timeout until the 15th request
333        if count < 15 {
334            tokio::time::sleep(self.delay).await;
335        }
336
337        Err(IotaError::TransactionNotFound {
338            digest: request.transaction_digest,
339        })
340    }
341
342    async fn handle_checkpoint(
343        &self,
344        _request: CheckpointRequest,
345    ) -> Result<CheckpointResponse, IotaError> {
346        unimplemented!();
347    }
348
349    async fn handle_system_state_object(
350        &self,
351        _request: SystemStateRequest,
352    ) -> Result<IotaSystemState, IotaError> {
353        unimplemented!();
354    }
355
356    async fn handle_capability_notification_v1(
357        &self,
358        _request: HandleCapabilityNotificationRequestV1,
359    ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
360        tokio::time::sleep(self.delay).await;
361
362        match &self.handle_capability_notification_result {
363            Some(result) => result.clone(),
364            None => Ok(HandleCapabilityNotificationResponseV1 { _unused: false }),
365        }
366    }
367}
368
369#[derive(Clone)]
370pub struct HandleTransactionTestAuthorityClient {
371    pub tx_info_resp_to_return: IotaResult<HandleTransactionResponse>,
372    pub cert_resp_to_return: IotaResult<HandleCertificateResponseV1>,
373    // If set, sleep for this duration before responding to a request.
374    // This is useful in testing a timeout scenario.
375    pub sleep_duration_before_responding: Option<Duration>,
376}
377
378#[async_trait]
379impl AuthorityAPI for HandleTransactionTestAuthorityClient {
380    async fn handle_transaction(
381        &self,
382        _transaction: Transaction,
383        _client_addr: Option<SocketAddr>,
384    ) -> Result<HandleTransactionResponse, IotaError> {
385        if let Some(duration) = self.sleep_duration_before_responding {
386            tokio::time::sleep(duration).await;
387        }
388        self.tx_info_resp_to_return.clone()
389    }
390
391    async fn handle_certificate_v1(
392        &self,
393        _request: HandleCertificateRequestV1,
394        _client_addr: Option<SocketAddr>,
395    ) -> Result<HandleCertificateResponseV1, IotaError> {
396        if let Some(duration) = self.sleep_duration_before_responding {
397            tokio::time::sleep(duration).await;
398        }
399        self.cert_resp_to_return.clone()
400    }
401
402    async fn handle_soft_bundle_certificates_v1(
403        &self,
404        _request: HandleSoftBundleCertificatesRequestV1,
405        _client_addr: Option<SocketAddr>,
406    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
407        unimplemented!()
408    }
409
410    async fn handle_object_info_request(
411        &self,
412        _request: ObjectInfoRequest,
413    ) -> Result<ObjectInfoResponse, IotaError> {
414        unimplemented!()
415    }
416
417    async fn handle_transaction_info_request(
418        &self,
419        _request: TransactionInfoRequest,
420    ) -> Result<TransactionInfoResponse, IotaError> {
421        unimplemented!()
422    }
423
424    async fn handle_checkpoint(
425        &self,
426        _request: CheckpointRequest,
427    ) -> Result<CheckpointResponse, IotaError> {
428        unimplemented!()
429    }
430
431    async fn handle_system_state_object(
432        &self,
433        _request: SystemStateRequest,
434    ) -> Result<IotaSystemState, IotaError> {
435        unimplemented!()
436    }
437
438    async fn handle_capability_notification_v1(
439        &self,
440        _request: HandleCapabilityNotificationRequestV1,
441    ) -> Result<HandleCapabilityNotificationResponseV1, IotaError> {
442        unimplemented!()
443    }
444}
445
446impl HandleTransactionTestAuthorityClient {
447    pub fn new() -> Self {
448        Self {
449            tx_info_resp_to_return: Err(IotaError::Unknown("".to_string())),
450            cert_resp_to_return: Err(IotaError::Unknown("".to_string())),
451            sleep_duration_before_responding: None,
452        }
453    }
454
455    pub fn set_tx_info_response(&mut self, resp: HandleTransactionResponse) {
456        self.tx_info_resp_to_return = Ok(resp);
457    }
458
459    pub fn set_tx_info_response_error(&mut self, error: IotaError) {
460        self.tx_info_resp_to_return = Err(error);
461    }
462
463    pub fn reset_tx_info_response(&mut self) {
464        self.tx_info_resp_to_return = Err(IotaError::Unknown("".to_string()));
465    }
466
467    pub fn set_cert_resp_to_return(&mut self, resp: HandleCertificateResponseV1) {
468        self.cert_resp_to_return = Ok(resp);
469    }
470
471    pub fn set_cert_resp_to_return_error(&mut self, error: IotaError) {
472        self.cert_resp_to_return = Err(error);
473    }
474
475    pub fn reset_cert_response(&mut self) {
476        self.cert_resp_to_return = Err(IotaError::Unknown("".to_string()));
477    }
478
479    pub fn set_sleep_duration_before_responding(&mut self, duration: Duration) {
480        self.sleep_duration_before_responding = Some(duration);
481    }
482}
483
484impl Default for HandleTransactionTestAuthorityClient {
485    fn default() -> Self {
486        Self::new()
487    }
488}