iota_core/
test_authority_clients.rs

1// Copyright (c) 2021, Facebook, Inc. and its affiliates
2// Copyright (c) Mysten Labs, Inc.
3// Modifications Copyright (c) 2024 IOTA Stiftung
4// SPDX-License-Identifier: Apache-2.0
5
6use std::{
7    net::SocketAddr,
8    sync::{Arc, Mutex},
9    time::Duration,
10};
11
12use async_trait::async_trait;
13use iota_config::genesis::Genesis;
14use iota_metrics::spawn_monitored_task;
15use iota_types::{
16    crypto::AuthorityKeyPair,
17    effects::TransactionEffectsAPI,
18    error::{IotaError, IotaResult},
19    iota_system_state::IotaSystemState,
20    messages_checkpoint::{CheckpointRequest, CheckpointResponse},
21    messages_grpc::{
22        HandleCertificateRequestV1, HandleCertificateResponseV1,
23        HandleSoftBundleCertificatesRequestV1, HandleSoftBundleCertificatesResponseV1,
24        HandleTransactionResponse, ObjectInfoRequest, ObjectInfoResponse, SystemStateRequest,
25        TransactionInfoRequest, TransactionInfoResponse,
26    },
27    transaction::{Transaction, VerifiedTransaction},
28};
29
30use crate::{
31    authority::{AuthorityState, test_authority_builder::TestAuthorityBuilder},
32    authority_client::AuthorityAPI,
33};
34
35#[derive(Clone, Copy, Default)]
36pub struct LocalAuthorityClientFaultConfig {
37    pub fail_before_handle_transaction: bool,
38    pub fail_after_handle_transaction: bool,
39    pub fail_before_handle_confirmation: bool,
40    pub fail_after_handle_confirmation: bool,
41    pub overload_retry_after_handle_transaction: Option<Duration>,
42}
43
44impl LocalAuthorityClientFaultConfig {
45    pub fn reset(&mut self) {
46        *self = Self::default();
47    }
48}
49
50#[derive(Clone)]
51pub struct LocalAuthorityClient {
52    pub state: Arc<AuthorityState>,
53    pub fault_config: LocalAuthorityClientFaultConfig,
54}
55
56#[async_trait]
57impl AuthorityAPI for LocalAuthorityClient {
58    async fn handle_transaction(
59        &self,
60        transaction: Transaction,
61        _client_addr: Option<SocketAddr>,
62    ) -> Result<HandleTransactionResponse, IotaError> {
63        if self.fault_config.fail_before_handle_transaction {
64            return Err(IotaError::from("Mock error before handle_transaction"));
65        }
66        let state = self.state.clone();
67        let epoch_store = self.state.load_epoch_store_one_call_per_task();
68        let transaction = epoch_store
69            .signature_verifier
70            .verify_tx(transaction.data())
71            .map(|_| VerifiedTransaction::new_from_verified(transaction))?;
72        let result = state.handle_transaction(&epoch_store, transaction).await;
73        if self.fault_config.fail_after_handle_transaction {
74            return Err(IotaError::GenericAuthority {
75                error: "Mock error after handle_transaction".to_owned(),
76            });
77        }
78        if let Some(duration) = self.fault_config.overload_retry_after_handle_transaction {
79            return Err(IotaError::ValidatorOverloadedRetryAfter {
80                retry_after_secs: duration.as_secs(),
81            });
82        }
83        result
84    }
85
86    async fn handle_certificate_v1(
87        &self,
88        request: HandleCertificateRequestV1,
89        _client_addr: Option<SocketAddr>,
90    ) -> Result<HandleCertificateResponseV1, IotaError> {
91        let state = self.state.clone();
92        let fault_config = self.fault_config;
93        spawn_monitored_task!(Self::handle_certificate(state, request, fault_config))
94            .await
95            .unwrap()
96    }
97
98    async fn handle_soft_bundle_certificates_v1(
99        &self,
100        _request: HandleSoftBundleCertificatesRequestV1,
101        _client_addr: Option<SocketAddr>,
102    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
103        unimplemented!()
104    }
105
106    async fn handle_object_info_request(
107        &self,
108        request: ObjectInfoRequest,
109    ) -> Result<ObjectInfoResponse, IotaError> {
110        let state = self.state.clone();
111        state.handle_object_info_request(request).await
112    }
113
114    /// Handle Object information requests for this account.
115    async fn handle_transaction_info_request(
116        &self,
117        request: TransactionInfoRequest,
118    ) -> Result<TransactionInfoResponse, IotaError> {
119        let state = self.state.clone();
120        state.handle_transaction_info_request(request).await
121    }
122
123    async fn handle_checkpoint(
124        &self,
125        request: CheckpointRequest,
126    ) -> Result<CheckpointResponse, IotaError> {
127        let state = self.state.clone();
128
129        state.handle_checkpoint_request(&request)
130    }
131
132    async fn handle_system_state_object(
133        &self,
134        _request: SystemStateRequest,
135    ) -> Result<IotaSystemState, IotaError> {
136        self.state.get_iota_system_state_object_for_testing()
137    }
138}
139
140impl LocalAuthorityClient {
141    pub async fn new(secret: AuthorityKeyPair, genesis: &Genesis) -> Self {
142        let state = TestAuthorityBuilder::new()
143            .with_genesis_and_keypair(genesis, &secret)
144            .build()
145            .await;
146        Self {
147            state,
148            fault_config: LocalAuthorityClientFaultConfig::default(),
149        }
150    }
151
152    pub fn new_from_authority(state: Arc<AuthorityState>) -> Self {
153        Self {
154            state,
155            fault_config: LocalAuthorityClientFaultConfig::default(),
156        }
157    }
158
159    // One difference between this implementation and actual certificate execution,
160    // is that this assumes shared object locks have already been acquired and
161    // tries to execute shared object transactions as well as owned object
162    // transactions.
163    async fn handle_certificate(
164        state: Arc<AuthorityState>,
165        request: HandleCertificateRequestV1,
166        fault_config: LocalAuthorityClientFaultConfig,
167    ) -> Result<HandleCertificateResponseV1, IotaError> {
168        if fault_config.fail_before_handle_confirmation {
169            return Err(IotaError::GenericAuthority {
170                error: "Mock error before handle_confirmation_transaction".to_owned(),
171            });
172        }
173        // Check existing effects before verifying the cert to allow querying certs
174        // finalized from previous epochs.
175        let tx_digest = *request.certificate.digest();
176        let epoch_store = state.epoch_store_for_testing();
177        let signed_effects = match state
178            .get_signed_effects_and_maybe_resign(&tx_digest, &epoch_store)
179        {
180            Ok(Some(effects)) => effects,
181            _ => {
182                let certificate = epoch_store
183                    .signature_verifier
184                    .verify_cert(request.certificate)
185                    .await?;
186                // let certificate = certificate.verify(epoch_store.committee())?;
187                state.enqueue_certificates_for_execution(vec![certificate.clone()], &epoch_store);
188                let effects = state.notify_read_effects(&certificate).await?;
189                state.sign_effects(effects, &epoch_store)?
190            }
191        }
192        .into_inner();
193
194        let events = if request.include_events {
195            if let Some(digest) = signed_effects.events_digest() {
196                Some(state.get_transaction_events(digest)?)
197            } else {
198                None
199            }
200        } else {
201            None
202        };
203
204        if fault_config.fail_after_handle_confirmation {
205            return Err(IotaError::GenericAuthority {
206                error: "Mock error after handle_confirmation_transaction".to_owned(),
207            });
208        }
209
210        let input_objects = request
211            .include_input_objects
212            .then(|| state.get_transaction_input_objects(&signed_effects))
213            .and_then(Result::ok);
214
215        let output_objects = request
216            .include_output_objects
217            .then(|| state.get_transaction_output_objects(&signed_effects))
218            .and_then(Result::ok);
219
220        Ok(HandleCertificateResponseV1 {
221            signed_effects,
222            events,
223            input_objects,
224            output_objects,
225            auxiliary_data: None, // We don't have any aux data generated presently
226        })
227    }
228}
229
230#[derive(Clone)]
231pub struct MockAuthorityApi {
232    delay: Duration,
233    count: Arc<Mutex<u32>>,
234    handle_object_info_request_result: Option<IotaResult<ObjectInfoResponse>>,
235}
236
237impl MockAuthorityApi {
238    pub fn new(delay: Duration, count: Arc<Mutex<u32>>) -> Self {
239        MockAuthorityApi {
240            delay,
241            count,
242            handle_object_info_request_result: None,
243        }
244    }
245
246    pub fn set_handle_object_info_request(&mut self, result: IotaResult<ObjectInfoResponse>) {
247        self.handle_object_info_request_result = Some(result);
248    }
249}
250
251#[async_trait]
252impl AuthorityAPI for MockAuthorityApi {
253    /// Initiate a new transaction to an IOTA or Primary account.
254    async fn handle_transaction(
255        &self,
256        _transaction: Transaction,
257        _client_addr: Option<SocketAddr>,
258    ) -> Result<HandleTransactionResponse, IotaError> {
259        unimplemented!();
260    }
261
262    async fn handle_certificate_v1(
263        &self,
264        _request: HandleCertificateRequestV1,
265        _client_addr: Option<SocketAddr>,
266    ) -> Result<HandleCertificateResponseV1, IotaError> {
267        unimplemented!()
268    }
269
270    async fn handle_soft_bundle_certificates_v1(
271        &self,
272        _request: HandleSoftBundleCertificatesRequestV1,
273        _client_addr: Option<SocketAddr>,
274    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
275        unimplemented!()
276    }
277
278    /// Handle Object information requests for this account.
279    async fn handle_object_info_request(
280        &self,
281        _request: ObjectInfoRequest,
282    ) -> Result<ObjectInfoResponse, IotaError> {
283        self.handle_object_info_request_result.clone().unwrap()
284    }
285
286    /// Handle Object information requests for this account.
287    async fn handle_transaction_info_request(
288        &self,
289        request: TransactionInfoRequest,
290    ) -> Result<TransactionInfoResponse, IotaError> {
291        let count = {
292            let mut count = self.count.lock().unwrap();
293            *count += 1;
294            *count
295        };
296
297        // timeout until the 15th request
298        if count < 15 {
299            tokio::time::sleep(self.delay).await;
300        }
301
302        Err(IotaError::TransactionNotFound {
303            digest: request.transaction_digest,
304        })
305    }
306
307    async fn handle_checkpoint(
308        &self,
309        _request: CheckpointRequest,
310    ) -> Result<CheckpointResponse, IotaError> {
311        unimplemented!();
312    }
313
314    async fn handle_system_state_object(
315        &self,
316        _request: SystemStateRequest,
317    ) -> Result<IotaSystemState, IotaError> {
318        unimplemented!();
319    }
320}
321
322#[derive(Clone)]
323pub struct HandleTransactionTestAuthorityClient {
324    pub tx_info_resp_to_return: IotaResult<HandleTransactionResponse>,
325    pub cert_resp_to_return: IotaResult<HandleCertificateResponseV1>,
326    // If set, sleep for this duration before responding to a request.
327    // This is useful in testing a timeout scenario.
328    pub sleep_duration_before_responding: Option<Duration>,
329}
330
331#[async_trait]
332impl AuthorityAPI for HandleTransactionTestAuthorityClient {
333    async fn handle_transaction(
334        &self,
335        _transaction: Transaction,
336        _client_addr: Option<SocketAddr>,
337    ) -> Result<HandleTransactionResponse, IotaError> {
338        if let Some(duration) = self.sleep_duration_before_responding {
339            tokio::time::sleep(duration).await;
340        }
341        self.tx_info_resp_to_return.clone()
342    }
343
344    async fn handle_certificate_v1(
345        &self,
346        _request: HandleCertificateRequestV1,
347        _client_addr: Option<SocketAddr>,
348    ) -> Result<HandleCertificateResponseV1, IotaError> {
349        if let Some(duration) = self.sleep_duration_before_responding {
350            tokio::time::sleep(duration).await;
351        }
352        self.cert_resp_to_return.clone()
353    }
354
355    async fn handle_soft_bundle_certificates_v1(
356        &self,
357        _request: HandleSoftBundleCertificatesRequestV1,
358        _client_addr: Option<SocketAddr>,
359    ) -> Result<HandleSoftBundleCertificatesResponseV1, IotaError> {
360        unimplemented!()
361    }
362
363    async fn handle_object_info_request(
364        &self,
365        _request: ObjectInfoRequest,
366    ) -> Result<ObjectInfoResponse, IotaError> {
367        unimplemented!()
368    }
369
370    async fn handle_transaction_info_request(
371        &self,
372        _request: TransactionInfoRequest,
373    ) -> Result<TransactionInfoResponse, IotaError> {
374        unimplemented!()
375    }
376
377    async fn handle_checkpoint(
378        &self,
379        _request: CheckpointRequest,
380    ) -> Result<CheckpointResponse, IotaError> {
381        unimplemented!()
382    }
383
384    async fn handle_system_state_object(
385        &self,
386        _request: SystemStateRequest,
387    ) -> Result<IotaSystemState, IotaError> {
388        unimplemented!()
389    }
390}
391
392impl HandleTransactionTestAuthorityClient {
393    pub fn new() -> Self {
394        Self {
395            tx_info_resp_to_return: Err(IotaError::Unknown("".to_string())),
396            cert_resp_to_return: Err(IotaError::Unknown("".to_string())),
397            sleep_duration_before_responding: None,
398        }
399    }
400
401    pub fn set_tx_info_response(&mut self, resp: HandleTransactionResponse) {
402        self.tx_info_resp_to_return = Ok(resp);
403    }
404
405    pub fn set_tx_info_response_error(&mut self, error: IotaError) {
406        self.tx_info_resp_to_return = Err(error);
407    }
408
409    pub fn reset_tx_info_response(&mut self) {
410        self.tx_info_resp_to_return = Err(IotaError::Unknown("".to_string()));
411    }
412
413    pub fn set_cert_resp_to_return(&mut self, resp: HandleCertificateResponseV1) {
414        self.cert_resp_to_return = Ok(resp);
415    }
416
417    pub fn set_cert_resp_to_return_error(&mut self, error: IotaError) {
418        self.cert_resp_to_return = Err(error);
419    }
420
421    pub fn reset_cert_response(&mut self) {
422        self.cert_resp_to_return = Err(IotaError::Unknown("".to_string()));
423    }
424
425    pub fn set_sleep_duration_before_responding(&mut self, duration: Duration) {
426        self.sleep_duration_before_responding = Some(duration);
427    }
428}
429
430impl Default for HandleTransactionTestAuthorityClient {
431    fn default() -> Self {
432        Self::new()
433    }
434}