1#![allow(clippy::type_complexity)]
6
7use std::{num::NonZeroUsize, str::FromStr, sync::Arc};
8
9use async_trait::async_trait;
10use axum::Json;
11use ethers::{providers::JsonRpcClient, types::TxHash};
12use iota_types::digests::TransactionDigest;
13use lru::LruCache;
14use tap::TapFallible;
15use tokio::sync::{Mutex, oneshot};
16use tracing::info;
17
18use super::governance_verifier::GovernanceVerifier;
19use crate::{
20 crypto::{BridgeAuthorityKeyPair, BridgeAuthoritySignInfo},
21 error::{BridgeError, BridgeResult},
22 eth_client::EthClient,
23 iota_client::{IotaClient, IotaClientInner},
24 metrics::BridgeMetrics,
25 types::{BridgeAction, SignedBridgeAction},
26};
27
28#[async_trait]
29pub trait BridgeRequestHandlerTrait {
30 async fn handle_eth_tx_hash(
34 &self,
35 tx_hash_hex: String,
36 event_idx: u16,
37 ) -> Result<Json<SignedBridgeAction>, BridgeError>;
38 async fn handle_iota_tx_digest(
42 &self,
43 tx_digest_base58: String,
44 event_idx: u16,
45 ) -> Result<Json<SignedBridgeAction>, BridgeError>;
46
47 async fn handle_governance_action(
49 &self,
50 action: BridgeAction,
51 ) -> Result<Json<SignedBridgeAction>, BridgeError>;
52}
53
54#[async_trait::async_trait]
55pub trait ActionVerifier<K>: Send + Sync {
56 fn name(&self) -> &'static str;
58 async fn verify(&self, key: K) -> BridgeResult<BridgeAction>;
59}
60
61struct IotaActionVerifier<C> {
62 iota_client: Arc<IotaClient<C>>,
63}
64
65struct EthActionVerifier<P> {
66 eth_client: Arc<EthClient<P>>,
67}
68
69#[async_trait::async_trait]
70impl<C> ActionVerifier<(TransactionDigest, u16)> for IotaActionVerifier<C>
71where
72 C: IotaClientInner + Send + Sync + 'static,
73{
74 fn name(&self) -> &'static str {
75 "IotaActionVerifier"
76 }
77
78 async fn verify(&self, key: (TransactionDigest, u16)) -> BridgeResult<BridgeAction> {
79 let (tx_digest, event_idx) = key;
80 self.iota_client
81 .get_bridge_action_by_tx_digest_and_event_idx_maybe(&tx_digest, event_idx)
82 .await
83 .tap_ok(|action| info!("IOTA action found: {:?}", action))
84 }
85}
86
87#[async_trait::async_trait]
88impl<C> ActionVerifier<(TxHash, u16)> for EthActionVerifier<C>
89where
90 C: JsonRpcClient + Send + Sync + 'static,
91{
92 fn name(&self) -> &'static str {
93 "EthActionVerifier"
94 }
95
96 async fn verify(&self, key: (TxHash, u16)) -> BridgeResult<BridgeAction> {
97 let (tx_hash, event_idx) = key;
98 self.eth_client
99 .get_finalized_bridge_action_maybe(tx_hash, event_idx)
100 .await
101 .tap_ok(|action| info!("Eth action found: {:?}", action))
102 }
103}
104
105struct SignerWithCache<K> {
106 signer: Arc<BridgeAuthorityKeyPair>,
107 verifier: Arc<dyn ActionVerifier<K>>,
108 mutex: Arc<Mutex<()>>,
109 cache: LruCache<K, Arc<Mutex<Option<BridgeResult<SignedBridgeAction>>>>>,
110 metrics: Arc<BridgeMetrics>,
111}
112
113impl<K> SignerWithCache<K>
114where
115 K: std::hash::Hash + Eq + Clone + Send + Sync + 'static,
116{
117 fn new(
118 signer: Arc<BridgeAuthorityKeyPair>,
119 verifier: impl ActionVerifier<K> + 'static,
120 metrics: Arc<BridgeMetrics>,
121 ) -> Self {
122 Self {
123 signer,
124 verifier: Arc::new(verifier),
125 mutex: Arc::new(Mutex::new(())),
126 cache: LruCache::new(NonZeroUsize::new(1000).unwrap()),
127 metrics,
128 }
129 }
130
131 fn spawn(
132 mut self,
133 mut rx: iota_metrics::metered_channel::Receiver<(
134 K,
135 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
136 )>,
137 ) -> tokio::task::JoinHandle<()> {
138 tokio::spawn(async move {
139 loop {
140 let (key, tx) = rx
141 .recv()
142 .await
143 .unwrap_or_else(|| panic!("Server signer's channel is closed"));
144 let result = self.sign(key).await;
145 let _ = tx.send(result);
148 }
149 })
150 }
151
152 async fn get_cache_entry(
153 &mut self,
154 key: K,
155 ) -> Arc<Mutex<Option<BridgeResult<SignedBridgeAction>>>> {
156 let _ = self.mutex.lock().await;
159 self.cache
160 .get_or_insert(key, || Arc::new(Mutex::new(None)))
161 .clone()
162 }
163
164 async fn sign(&mut self, key: K) -> BridgeResult<SignedBridgeAction> {
165 let signer = self.signer.clone();
166 let verifier = self.verifier.clone();
167 let verifier_name = verifier.name();
168 let entry = self.get_cache_entry(key.clone()).await;
169 let mut guard = entry.lock().await;
170 if let Some(result) = &*guard {
171 self.metrics
172 .signer_with_cache_hit
173 .with_label_values(&[verifier_name])
174 .inc();
175 return result.clone();
176 }
177 self.metrics
178 .signer_with_cache_miss
179 .with_label_values(&[verifier_name])
180 .inc();
181 match verifier.verify(key.clone()).await {
182 Ok(bridge_action) => {
183 let sig = BridgeAuthoritySignInfo::new(&bridge_action, &signer);
184 let result = SignedBridgeAction::new_from_data_and_sig(bridge_action, sig);
185 *guard = Some(Ok(result.clone()));
187 Ok(result)
188 }
189 Err(e) => {
190 match e {
191 BridgeError::GovernanceActionIsNotApproved
193 | BridgeError::ActionIsNotGovernanceAction(..)
194 | BridgeError::BridgeEventInUnrecognizedIotaPackage
195 | BridgeError::BridgeEventInUnrecognizedEthContract
196 | BridgeError::BridgeEventNotActionable
197 | BridgeError::NoBridgeEventsInTxPosition => {
198 *guard = Some(Err(e.clone()));
199 }
200 _ => (),
201 }
202 Err(e)
203 }
204 }
205 }
206
207 #[cfg(test)]
208 async fn get_testing_only(
209 &mut self,
210 key: K,
211 ) -> Option<&Arc<Mutex<Option<BridgeResult<SignedBridgeAction>>>>> {
212 let _ = self.mutex.lock().await;
213 self.cache.get(&key)
214 }
215}
216
217pub struct BridgeRequestHandler {
218 iota_signer_tx: iota_metrics::metered_channel::Sender<(
219 (TransactionDigest, u16),
220 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
221 )>,
222 eth_signer_tx: iota_metrics::metered_channel::Sender<(
223 (TxHash, u16),
224 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
225 )>,
226 governance_signer_tx: iota_metrics::metered_channel::Sender<(
227 BridgeAction,
228 oneshot::Sender<BridgeResult<SignedBridgeAction>>,
229 )>,
230}
231
232impl BridgeRequestHandler {
233 pub fn new<
234 SC: IotaClientInner + Send + Sync + 'static,
235 EP: JsonRpcClient + Send + Sync + 'static,
236 >(
237 signer: BridgeAuthorityKeyPair,
238 iota_client: Arc<IotaClient<SC>>,
239 eth_client: Arc<EthClient<EP>>,
240 approved_governance_actions: Vec<BridgeAction>,
241 metrics: Arc<BridgeMetrics>,
242 ) -> Self {
243 let (iota_signer_tx, iota_rx) = iota_metrics::metered_channel::channel(
244 1000,
245 &iota_metrics::get_metrics()
246 .unwrap()
247 .channel_inflight
248 .with_label_values(&["server_iota_action_signing_queue"]),
249 );
250 let (eth_signer_tx, eth_rx) = iota_metrics::metered_channel::channel(
251 1000,
252 &iota_metrics::get_metrics()
253 .unwrap()
254 .channel_inflight
255 .with_label_values(&["server_eth_action_signing_queue"]),
256 );
257 let (governance_signer_tx, governance_rx) = iota_metrics::metered_channel::channel(
258 1000,
259 &iota_metrics::get_metrics()
260 .unwrap()
261 .channel_inflight
262 .with_label_values(&["server_governance_action_signing_queue"]),
263 );
264 let signer = Arc::new(signer);
265
266 SignerWithCache::new(
267 signer.clone(),
268 IotaActionVerifier { iota_client },
269 metrics.clone(),
270 )
271 .spawn(iota_rx);
272 SignerWithCache::new(
273 signer.clone(),
274 EthActionVerifier { eth_client },
275 metrics.clone(),
276 )
277 .spawn(eth_rx);
278 SignerWithCache::new(
279 signer.clone(),
280 GovernanceVerifier::new(approved_governance_actions).unwrap(),
281 metrics.clone(),
282 )
283 .spawn(governance_rx);
284
285 Self {
286 iota_signer_tx,
287 eth_signer_tx,
288 governance_signer_tx,
289 }
290 }
291}
292
293#[async_trait]
294impl BridgeRequestHandlerTrait for BridgeRequestHandler {
295 async fn handle_eth_tx_hash(
296 &self,
297 tx_hash_hex: String,
298 event_idx: u16,
299 ) -> Result<Json<SignedBridgeAction>, BridgeError> {
300 let tx_hash = TxHash::from_str(&tx_hash_hex).map_err(|_| BridgeError::InvalidTxHash)?;
301
302 let (tx, rx) = oneshot::channel();
303 self.eth_signer_tx
304 .send(((tx_hash, event_idx), tx))
305 .await
306 .unwrap_or_else(|_| panic!("Server eth signing channel is closed"));
307 let signed_action = rx
308 .await
309 .unwrap_or_else(|_| panic!("Server signing task's oneshot channel is dropped"))?;
310 Ok(Json(signed_action))
311 }
312
313 async fn handle_iota_tx_digest(
314 &self,
315 tx_digest_base58: String,
316 event_idx: u16,
317 ) -> Result<Json<SignedBridgeAction>, BridgeError> {
318 let tx_digest = TransactionDigest::from_str(&tx_digest_base58)
319 .map_err(|_e| BridgeError::InvalidTxHash)?;
320 let (tx, rx) = oneshot::channel();
321 self.iota_signer_tx
322 .send(((tx_digest, event_idx), tx))
323 .await
324 .unwrap_or_else(|_| panic!("Server iota signing channel is closed"));
325 let signed_action = rx
326 .await
327 .unwrap_or_else(|_| panic!("Server signing task's oneshot channel is dropped"))?;
328 Ok(Json(signed_action))
329 }
330
331 async fn handle_governance_action(
332 &self,
333 action: BridgeAction,
334 ) -> Result<Json<SignedBridgeAction>, BridgeError> {
335 if !action.is_governace_action() {
336 return Err(BridgeError::ActionIsNotGovernanceAction(action));
337 }
338 let (tx, rx) = oneshot::channel();
339 self.governance_signer_tx
340 .send((action, tx))
341 .await
342 .unwrap_or_else(|_| panic!("Server governance action signing channel is closed"));
343 let signed_action = rx.await.unwrap_or_else(|_| {
344 panic!("Server governance action task's oneshot channel is dropped")
345 })?;
346 Ok(Json(signed_action))
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use std::collections::HashSet;
353
354 use ethers::types::{Address as EthAddress, TransactionReceipt};
355 use iota_json_rpc_types::{BcsEvent, IotaEvent};
356 use iota_types::{
357 base_types::IotaAddress,
358 bridge::{BridgeChainId, TOKEN_ID_USDC},
359 crypto::get_key_pair,
360 };
361
362 use super::*;
363 use crate::{
364 eth_mock_provider::EthMockProvider,
365 events::{IotaToEthTokenBridgeV1, MoveTokenDepositedEvent, init_all_struct_tags},
366 iota_mock_client::IotaMockClient,
367 test_utils::{
368 get_test_iota_to_eth_bridge_action, get_test_log_and_action, mock_last_finalized_block,
369 },
370 types::{EmergencyAction, EmergencyActionType, LimitUpdateAction},
371 };
372
373 #[tokio::test]
374 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
375 async fn test_iota_signer_with_cache() {
376 let (_, kp): (_, BridgeAuthorityKeyPair) = get_key_pair();
377 let signer = Arc::new(kp);
378 let iota_client_mock = IotaMockClient::default();
379 let iota_verifier = IotaActionVerifier {
380 iota_client: Arc::new(IotaClient::new_for_testing(iota_client_mock.clone())),
381 };
382 let metrics = Arc::new(BridgeMetrics::new_for_testing());
383 let mut iota_signer_with_cache =
384 SignerWithCache::new(signer.clone(), iota_verifier, metrics);
385
386 let iota_tx_digest = TransactionDigest::random();
388 let iota_event_idx = 42;
389 assert!(
390 iota_signer_with_cache
391 .get_testing_only((iota_tx_digest, iota_event_idx))
392 .await
393 .is_none()
394 );
395 let entry = iota_signer_with_cache
396 .get_cache_entry((iota_tx_digest, iota_event_idx))
397 .await;
398 let entry_ = iota_signer_with_cache
399 .get_testing_only((iota_tx_digest, iota_event_idx))
400 .await;
401 assert!(entry_.unwrap().lock().await.is_none());
402
403 let action = get_test_iota_to_eth_bridge_action(
404 Some(iota_tx_digest),
405 Some(iota_event_idx),
406 None,
407 None,
408 None,
409 None,
410 None,
411 );
412 let sig = BridgeAuthoritySignInfo::new(&action, &signer);
413 let signed_action = SignedBridgeAction::new_from_data_and_sig(action.clone(), sig);
414 entry.lock().await.replace(Ok(signed_action));
415 let entry_ = iota_signer_with_cache
416 .get_testing_only((iota_tx_digest, iota_event_idx))
417 .await;
418 assert!(entry_.unwrap().lock().await.is_some());
419
420 let iota_tx_digest = TransactionDigest::random();
422 let iota_event_idx = 0;
423
424 iota_client_mock.add_events_by_tx_digest_error(iota_tx_digest);
426 iota_signer_with_cache
427 .sign((iota_tx_digest, iota_event_idx))
428 .await
429 .unwrap_err();
430 let entry_ = iota_signer_with_cache
431 .get_testing_only((iota_tx_digest, iota_event_idx))
432 .await;
433 assert!(entry_.unwrap().lock().await.is_none());
434
435 iota_client_mock.add_events_by_tx_digest(iota_tx_digest, vec![]);
438 assert!(matches!(
439 iota_signer_with_cache
440 .sign((iota_tx_digest, iota_event_idx))
441 .await,
442 Err(BridgeError::NoBridgeEventsInTxPosition)
443 ));
444 let entry_ = iota_signer_with_cache
445 .get_testing_only((iota_tx_digest, iota_event_idx))
446 .await;
447 assert_eq!(
448 entry_.unwrap().lock().await.clone().unwrap().unwrap_err(),
449 BridgeError::NoBridgeEventsInTxPosition,
450 );
451
452 let emitted_event_1 = MoveTokenDepositedEvent {
458 seq_num: 1,
459 source_chain: BridgeChainId::IotaCustom as u8,
460 sender_address: IotaAddress::random_for_testing_only().to_vec(),
461 target_chain: BridgeChainId::EthCustom as u8,
462 target_address: EthAddress::random().as_bytes().to_vec(),
463 token_type: TOKEN_ID_USDC,
464 amount_iota_adjusted: 12345,
465 };
466
467 init_all_struct_tags();
468
469 let mut iota_event_1 = IotaEvent::random_for_testing();
470 iota_event_1.type_ = IotaToEthTokenBridgeV1.get().unwrap().clone();
471 iota_event_1.bcs = BcsEvent::new(bcs::to_bytes(&emitted_event_1).unwrap());
472 let iota_tx_digest = iota_event_1.id.tx_digest;
473
474 let mut iota_event_2 = IotaEvent::random_for_testing();
475 iota_event_2.type_ = IotaToEthTokenBridgeV1.get().unwrap().clone();
476 iota_event_2.bcs = BcsEvent::new(bcs::to_bytes(&emitted_event_1).unwrap());
477 let iota_event_idx_2 = 1;
478 iota_client_mock.add_events_by_tx_digest(iota_tx_digest, vec![iota_event_2.clone()]);
479
480 iota_client_mock.add_events_by_tx_digest(
481 iota_tx_digest,
482 vec![iota_event_1.clone(), iota_event_2.clone()],
483 );
484 let signed_1 = iota_signer_with_cache
485 .sign((iota_tx_digest, iota_event_idx))
486 .await
487 .unwrap();
488 let signed_2 = iota_signer_with_cache
489 .sign((iota_tx_digest, iota_event_idx_2))
490 .await
491 .unwrap();
492
493 iota_client_mock.add_events_by_tx_digest(iota_tx_digest, vec![]);
497 assert_eq!(
498 iota_signer_with_cache
499 .sign((iota_tx_digest, iota_event_idx))
500 .await
501 .unwrap(),
502 signed_1
503 );
504 assert_eq!(
505 iota_signer_with_cache
506 .sign((iota_tx_digest, iota_event_idx_2))
507 .await
508 .unwrap(),
509 signed_2
510 );
511 }
512
513 #[tokio::test]
514 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
515 async fn test_eth_signer_with_cache() {
516 let (_, kp): (_, BridgeAuthorityKeyPair) = get_key_pair();
517 let signer = Arc::new(kp);
518 let eth_mock_provider = EthMockProvider::default();
519 let contract_address = EthAddress::random();
520 let eth_client = EthClient::new_mocked(
521 eth_mock_provider.clone(),
522 HashSet::from_iter(vec![contract_address]),
523 );
524 let eth_verifier = EthActionVerifier {
525 eth_client: Arc::new(eth_client),
526 };
527 let metrics = Arc::new(BridgeMetrics::new_for_testing());
528 let mut eth_signer_with_cache =
529 SignerWithCache::new(signer.clone(), eth_verifier, metrics.clone());
530
531 let eth_tx_hash = TxHash::random();
533 let eth_event_idx = 42;
534 assert!(
535 eth_signer_with_cache
536 .get_testing_only((eth_tx_hash, eth_event_idx))
537 .await
538 .is_none()
539 );
540 let entry = eth_signer_with_cache
541 .get_cache_entry((eth_tx_hash, eth_event_idx))
542 .await;
543 let entry_ = eth_signer_with_cache
544 .get_testing_only((eth_tx_hash, eth_event_idx))
545 .await;
546 assert!(entry_.unwrap().lock().await.is_none());
549
550 let (_, action) = get_test_log_and_action(contract_address, eth_tx_hash, eth_event_idx);
551 let sig = BridgeAuthoritySignInfo::new(&action, &signer);
552 let signed_action = SignedBridgeAction::new_from_data_and_sig(action.clone(), sig);
553 entry.lock().await.replace(Ok(signed_action.clone()));
554 let entry_ = eth_signer_with_cache
555 .get_testing_only((eth_tx_hash, eth_event_idx))
556 .await;
557 assert_eq!(
558 entry_.unwrap().lock().await.clone().unwrap().unwrap(),
559 signed_action
560 );
561
562 let eth_tx_hash = TxHash::random();
564 let eth_event_idx = 0;
565 let (log, _action) = get_test_log_and_action(contract_address, eth_tx_hash, eth_event_idx);
566 eth_mock_provider
567 .add_response::<[TxHash; 1], TransactionReceipt, TransactionReceipt>(
568 "eth_getTransactionReceipt",
569 [log.transaction_hash.unwrap()],
570 TransactionReceipt {
571 block_number: log.block_number,
572 logs: vec![log.clone()],
573 ..Default::default()
574 },
575 )
576 .unwrap();
577 mock_last_finalized_block(ð_mock_provider, log.block_number.unwrap().as_u64());
578
579 eth_signer_with_cache
580 .sign((eth_tx_hash, eth_event_idx))
581 .await
582 .unwrap();
583 let entry_ = eth_signer_with_cache
584 .get_testing_only((eth_tx_hash, eth_event_idx))
585 .await;
586 entry_.unwrap().lock().await.clone().unwrap().unwrap();
587 }
588
589 #[tokio::test]
590 #[ignore = "https://github.com/iotaledger/iota/issues/3224"]
591 async fn test_signer_with_governace_verifier() {
592 let action_1 = BridgeAction::EmergencyAction(EmergencyAction {
593 chain_id: BridgeChainId::EthCustom,
594 nonce: 1,
595 action_type: EmergencyActionType::Pause,
596 });
597 let action_2 = BridgeAction::LimitUpdateAction(LimitUpdateAction {
598 chain_id: BridgeChainId::EthCustom,
599 sending_chain_id: BridgeChainId::IotaCustom,
600 nonce: 1,
601 new_usd_limit: 10000,
602 });
603
604 let verifier = GovernanceVerifier::new(vec![action_1.clone(), action_2.clone()]).unwrap();
605 assert_eq!(
606 verifier.verify(action_1.clone()).await.unwrap(),
607 action_1.clone()
608 );
609 assert_eq!(
610 verifier.verify(action_2.clone()).await.unwrap(),
611 action_2.clone()
612 );
613
614 let (_, kp): (_, BridgeAuthorityKeyPair) = get_key_pair();
615 let signer = Arc::new(kp);
616 let metrics = Arc::new(BridgeMetrics::new_for_testing());
617 let mut signer_with_cache = SignerWithCache::new(signer.clone(), verifier, metrics.clone());
618
619 signer_with_cache.sign(action_1.clone()).await.unwrap();
621 let entry_ = signer_with_cache.get_testing_only(action_1.clone()).await;
623 assert_eq!(
624 entry_
625 .unwrap()
626 .lock()
627 .await
628 .clone()
629 .unwrap()
630 .unwrap()
631 .data(),
632 &action_1
633 );
634
635 let action_3 = BridgeAction::EmergencyAction(EmergencyAction {
637 chain_id: BridgeChainId::EthCustom,
638 nonce: 1,
639 action_type: EmergencyActionType::Unpause,
640 });
641 assert!(matches!(
643 signer_with_cache.sign(action_3.clone()).await.unwrap_err(),
644 BridgeError::GovernanceActionIsNotApproved
645 ));
646 let entry_ = signer_with_cache.get_testing_only(action_3.clone()).await;
648 assert!(matches!(
649 entry_.unwrap().lock().await.clone().unwrap().unwrap_err(),
650 BridgeError::GovernanceActionIsNotApproved
651 ));
652
653 let action_4 = get_test_iota_to_eth_bridge_action(None, None, None, None, None, None, None);
655 assert!(matches!(
656 signer_with_cache.sign(action_4.clone()).await.unwrap_err(),
657 BridgeError::ActionIsNotGovernanceAction(..)
658 ));
659 let entry_ = signer_with_cache.get_testing_only(action_4.clone()).await;
661 assert!(matches!(
662 entry_.unwrap().lock().await.clone().unwrap().unwrap_err(),
663 BridgeError::ActionIsNotGovernanceAction { .. }
664 ));
665 }
666 }