1#[cfg(test)]
6use std::collections::HashSet;
7use std::{
8 collections::HashMap,
9 fmt,
10 path::Path,
11 sync::{Arc, Weak},
12};
13
14use async_recursion::async_recursion;
15use async_trait::async_trait;
16use iota_json_rpc_types::{
17 IotaObjectDataOptions, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
18 IotaTransactionBlockResponseOptions, OwnedObjectRef,
19};
20use iota_keys::keystore::AccountKeystore;
21use iota_metrics::spawn_monitored_task;
22use iota_sdk::wallet_context::WalletContext;
23use iota_types::{
24 base_types::{IotaAddress, ObjectID, TransactionDigest},
25 gas_coin::GasCoin,
26 object::Owner,
27 programmable_transaction_builder::ProgrammableTransactionBuilder,
28 quorum_driver_types::ExecuteTransactionRequestType,
29 transaction::{Transaction, TransactionData},
30};
31use prometheus::Registry;
32use shared_crypto::intent::Intent;
33use tap::tap::TapFallible;
34use tokio::{
35 sync::{
36 Mutex,
37 mpsc::{self, Receiver, Sender},
38 oneshot,
39 },
40 time::{Duration, timeout},
41};
42use tracing::{error, info, warn};
43use ttl_cache::TtlCache;
44use typed_store::Map;
45use uuid::Uuid;
46
47use super::write_ahead_log::WriteAheadLog;
48use crate::{
49 BatchFaucetReceipt, BatchSendStatus, BatchSendStatusType, CoinInfo, Faucet, FaucetConfig,
50 FaucetError, FaucetReceipt, faucet::write_ahead_log, metrics::FaucetMetrics,
51};
52
53pub struct SimpleFaucet {
54 wallet: WalletContext,
55 active_address: IotaAddress,
56 producer: Mutex<Sender<ObjectID>>,
57 consumer: Mutex<Receiver<ObjectID>>,
58 batch_producer: Mutex<Sender<ObjectID>>,
59 batch_consumer: Mutex<Receiver<ObjectID>>,
60 pub metrics: FaucetMetrics,
61 pub wal: Mutex<WriteAheadLog>,
62 request_producer: Sender<(Uuid, IotaAddress, Vec<u64>)>,
63 batch_request_size: u64,
64 task_id_cache: Mutex<TtlCache<Uuid, BatchSendStatus>>,
65 ttl_expiration: u64,
66 coin_amount: u64,
67 #[cfg_attr(not(test), expect(unused))]
69 batch_transfer_shutdown: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
70}
71
72impl fmt::Debug for SimpleFaucet {
75 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
76 f.debug_struct("SimpleFaucet")
77 .field("faucet_wallet", &self.active_address)
78 .field("producer", &self.producer)
79 .field("consumer", &self.consumer)
80 .field("batch_request_size", &self.batch_request_size)
81 .field("ttl_expiration", &self.ttl_expiration)
82 .field("coin_amount", &self.coin_amount)
83 .finish()
84 }
85}
86
87enum GasCoinResponse {
88 GasCoinWithInsufficientBalance(ObjectID),
89 InvalidGasCoin(ObjectID),
90 NoGasCoinAvailable,
91 UnknownGasCoin(ObjectID),
92 ValidGasCoin(ObjectID),
93}
94
95const DEFAULT_GAS_COMPUTATION_BUCKET: u64 = 10_000_000;
97const LOCK_TIMEOUT: Duration = Duration::from_secs(10);
98const RECV_TIMEOUT: Duration = Duration::from_secs(5);
99const BATCH_TIMEOUT: Duration = Duration::from_secs(10);
100
101impl SimpleFaucet {
102 pub async fn new(
103 wallet: WalletContext,
104 prometheus_registry: &Registry,
105 wal_path: &Path,
106 config: FaucetConfig,
107 ) -> Result<Arc<Self>, FaucetError> {
108 let active_address = wallet
109 .active_address()
110 .map_err(|err| FaucetError::Wallet(err.to_string()))?;
111 info!("SimpleFaucet::new with active address: {active_address}");
112
113 let coins = wallet
114 .gas_objects(active_address)
115 .await
116 .map_err(|e| FaucetError::Wallet(e.to_string()))?
117 .iter()
118 .map(|q| GasCoin::try_from(&q.1).unwrap())
120 .filter(|coin| coin.0.balance.value() >= (config.amount * config.num_coins as u64))
121 .collect::<Vec<GasCoin>>();
122
123 if coins.is_empty() {
124 return Err(FaucetError::NoGasCoinAvailable);
125 }
126
127 let metrics = FaucetMetrics::new(prometheus_registry);
128
129 let wal = WriteAheadLog::open(wal_path);
130 let mut pending = vec![];
131
132 let (producer, consumer) = mpsc::channel(coins.len());
133 let (batch_producer, batch_consumer) = mpsc::channel(coins.len());
134
135 let (sender, mut receiver) = mpsc::channel::<(Uuid, IotaAddress, Vec<u64>)>(
136 config.max_request_queue_length as usize,
137 );
138
139 let split_point = if config.batch_enabled {
143 if coins.len() > 1 {
144 1 } else {
146 0 }
148 } else {
149 coins.len() };
151
152 for (coins_processed, coin) in coins.iter().enumerate() {
153 let coin_id = *coin.id();
154 if let Some(write_ahead_log::Entry {
155 uuid,
156 recipient,
157 tx,
158 retry_count: _,
159 in_flight: _,
160 }) = wal.reclaim(coin_id).map_err(FaucetError::internal)?
161 {
162 let uuid = Uuid::from_bytes(uuid);
163 info!(?uuid, ?recipient, ?coin_id, "Retrying txn from WAL.");
164 pending.push((uuid, recipient, coin_id, tx));
165 } else if coins_processed < split_point {
166 producer
167 .send(coin_id)
168 .await
169 .tap_ok(|_| {
170 info!(?coin_id, "Adding coin to gas pool");
171 metrics.total_available_coins.inc();
172 })
173 .tap_err(|e| error!(?coin_id, "Failed to add coin to gas pools: {e:?}"))
174 .unwrap();
175 } else {
176 batch_producer
177 .send(coin_id)
178 .await
179 .tap_ok(|_| {
180 info!(?coin_id, "Adding coin to batch gas pool");
181 metrics.total_available_coins.inc();
182 })
183 .tap_err(|e| error!(?coin_id, "Failed to add coin to batch gas pools: {e:?}"))
184 .unwrap();
185 }
186 }
187 let (batch_transfer_shutdown, mut rx_batch_transfer_shutdown) = oneshot::channel();
188
189 let faucet = Self {
190 wallet,
191 active_address,
192 producer: Mutex::new(producer),
193 consumer: Mutex::new(consumer),
194 batch_producer: Mutex::new(batch_producer),
195 batch_consumer: Mutex::new(batch_consumer),
196 metrics,
197 wal: Mutex::new(wal),
198 request_producer: sender,
199 batch_request_size: config.batch_request_size,
200 task_id_cache: TtlCache::new(config.max_request_per_second as usize * 60 * 10).into(),
204 ttl_expiration: config.ttl_expiration,
205 coin_amount: config.amount,
206 batch_transfer_shutdown: parking_lot::Mutex::new(Some(batch_transfer_shutdown)),
207 };
208
209 let arc_faucet = Arc::new(faucet);
210 let batch_clone = Arc::downgrade(&arc_faucet);
211 spawn_monitored_task!(async move {
212 info!("Starting task to handle batch faucet requests.");
213 loop {
214 match batch_transfer_gases(
215 &batch_clone,
216 &mut receiver,
217 &mut rx_batch_transfer_shutdown,
218 )
219 .await
220 {
221 Ok(response) => {
222 if response == TransactionDigest::ZERO {
223 info!("Batch transfer incomplete due to faucet shutting down.");
224 } else {
225 info!(
226 "Batch transfer completed with transaction digest: {:?}",
227 response
228 );
229 }
230 }
231 Err(err) => {
232 error!("{:?}", err);
233 }
234 }
235 }
236 });
237 futures::future::join_all(pending.into_iter().map(|(uuid, recipient, coin_id, tx)| {
241 arc_faucet.sign_and_execute_txn(uuid, recipient, coin_id, tx, false)
242 }))
243 .await;
244
245 Ok(arc_faucet)
246 }
247
248 async fn pop_gas_coin(&self, uuid: Uuid) -> Option<ObjectID> {
251 let Ok(mut consumer) = tokio::time::timeout(LOCK_TIMEOUT, self.consumer.lock()).await
256 else {
257 error!(?uuid, "Timeout when getting consumer lock");
258 return None;
259 };
260
261 info!(?uuid, "Got consumer lock, pulling coins.");
262 let Ok(coin) = tokio::time::timeout(RECV_TIMEOUT, consumer.recv()).await else {
263 error!(?uuid, "Timeout when getting gas coin from the queue");
264 return None;
265 };
266
267 let Some(coin) = coin else {
268 unreachable!("channel is closed");
269 };
270
271 self.metrics.total_available_coins.dec();
272 Some(coin)
273 }
274
275 async fn pop_gas_coin_for_batch(&self, uuid: Uuid) -> Option<ObjectID> {
278 let Ok(mut batch_consumer) =
283 tokio::time::timeout(LOCK_TIMEOUT, self.batch_consumer.lock()).await
284 else {
285 error!(?uuid, "Timeout when getting batch consumer lock");
286 return None;
287 };
288
289 info!(?uuid, "Got consumer lock, pulling coins.");
290 let Ok(coin) = tokio::time::timeout(RECV_TIMEOUT, batch_consumer.recv()).await else {
291 error!(?uuid, "Timeout when getting gas coin from the queue");
292 return None;
293 };
294
295 let Some(coin) = coin else {
296 unreachable!("channel is closed");
297 };
298
299 self.metrics.total_available_coins.dec();
300 Some(coin)
301 }
302
303 async fn prepare_gas_coin(
306 &self,
307 total_amount: u64,
308 uuid: Uuid,
309 for_batch: bool,
310 ) -> GasCoinResponse {
311 let coin_id = if for_batch {
312 self.pop_gas_coin_for_batch(uuid).await
313 } else {
314 self.pop_gas_coin(uuid).await
315 };
316
317 let Some(coin_id) = coin_id else {
318 warn!("Failed getting gas coin, try later!");
319 return GasCoinResponse::NoGasCoinAvailable;
320 };
321
322 match self.get_gas_coin_and_check_faucet_owner(coin_id).await {
323 Ok(Some(gas_coin)) if gas_coin.value() >= total_amount => {
324 info!(?uuid, ?coin_id, "balance: {}", gas_coin.value());
325 GasCoinResponse::ValidGasCoin(coin_id)
326 }
327
328 Ok(Some(_)) => {
329 info!(?uuid, ?coin_id, "insufficient balance",);
330 GasCoinResponse::GasCoinWithInsufficientBalance(coin_id)
331 }
332
333 Ok(None) => {
334 info!(?uuid, ?coin_id, "No gas coin returned.",);
335 GasCoinResponse::InvalidGasCoin(coin_id)
336 }
337
338 Err(e) => {
339 error!(?uuid, ?coin_id, "Fullnode read error: {e:?}");
340 GasCoinResponse::UnknownGasCoin(coin_id)
341 }
342 }
343 }
344
345 async fn get_coin(
353 &self,
354 coin_id: ObjectID,
355 ) -> anyhow::Result<Option<(Option<Owner>, GasCoin)>> {
356 let client = self.wallet.get_client().await?;
357 let gas_obj = client
358 .read_api()
359 .get_object_with_options(
360 coin_id,
361 IotaObjectDataOptions::new()
362 .with_type()
363 .with_owner()
364 .with_content(),
365 )
366 .await?;
367 let o = gas_obj.data;
368 if let Some(o) = o {
369 Ok(GasCoin::try_from(&o).ok().map(|coin| (o.owner, coin)))
370 } else {
371 Ok(None)
372 }
373 }
374
375 async fn get_gas_coin_and_check_faucet_owner(
379 &self,
380 coin_id: ObjectID,
381 ) -> anyhow::Result<Option<GasCoin>> {
382 let gas_obj = self.get_coin(coin_id).await?;
383 info!(?coin_id, "Reading gas coin object: {:?}", gas_obj);
384 Ok(gas_obj.and_then(|(owner_opt, coin)| match owner_opt {
385 Some(Owner::AddressOwner(owner_addr)) if owner_addr == self.active_address => {
386 Some(coin)
387 }
388 _ => None,
389 }))
390 }
391
392 pub async fn retry_wal_coins(&self) -> Result<(), FaucetError> {
394 let mut wal = self.wal.lock().await;
395 let mut pending = vec![];
396
397 for item in wal.log.safe_iter() {
398 let (coin_id, entry) = item.unwrap();
400 let uuid = Uuid::from_bytes(entry.uuid);
401 if !entry.in_flight {
402 pending.push((uuid, entry.recipient, coin_id, entry.tx));
403 }
404 }
405
406 for (_, _, coin_id, _) in &pending {
407 wal.increment_retry_count(*coin_id)
408 .map_err(FaucetError::internal)?;
409 wal.set_in_flight(*coin_id, true)
410 .map_err(FaucetError::internal)?;
411 }
412
413 info!("Retrying WAL of length: {:?}", pending.len());
414 drop(wal);
416
417 futures::future::join_all(pending.into_iter().map(|(uuid, recipient, coin_id, tx)| {
418 self.sign_and_execute_txn(uuid, recipient, coin_id, tx, false)
419 }))
420 .await;
421
422 Ok(())
423 }
424
425 async fn sign_and_execute_txn(
429 &self,
430 uuid: Uuid,
431 recipient: IotaAddress,
432 coin_id: ObjectID,
433 tx_data: TransactionData,
434 for_batch: bool,
435 ) -> Result<IotaTransactionBlockResponse, FaucetError> {
436 let signature = self
437 .wallet
438 .config()
439 .keystore()
440 .sign_secure(&self.active_address, &tx_data, Intent::iota_transaction())
441 .map_err(FaucetError::internal)?;
442 let tx = Transaction::from_data(tx_data, vec![signature]);
443 let tx_digest = *tx.digest();
444 info!(
445 ?tx_digest,
446 ?recipient,
447 ?coin_id,
448 ?uuid,
449 "PayIota transaction in faucet."
450 );
451
452 match timeout(
453 Duration::from_secs(300),
454 self.execute_pay_iota_txn_with_retries(&tx, coin_id, recipient, uuid),
455 )
456 .await
457 {
458 Err(elapsed) => {
459 warn!(
460 ?recipient,
461 ?coin_id,
462 ?uuid,
463 "Failed to execute PayIota transactions in faucet after {elapsed}. Coin will \
464 not be reused."
465 );
466
467 if let Err(err) = self.wal.lock().await.set_in_flight(coin_id, false) {
471 error!(
472 ?recipient,
473 ?coin_id,
474 ?uuid,
475 "Failed to set coin in flight status in WAL: {:?}",
476 err
477 );
478 }
479
480 Err(FaucetError::Transfer(
481 "could not complete transfer within timeout".into(),
482 ))
483 }
484
485 Ok(result) => {
486 if self.wal.lock().await.commit(coin_id).is_err() {
497 error!(?coin_id, "Failed to remove coin from WAL");
498 }
499 if for_batch {
500 self.recycle_gas_coin_for_batch(coin_id, uuid).await;
501 } else {
502 self.recycle_gas_coin(coin_id, uuid).await;
503 }
504 Ok(result)
505 }
506 }
507 }
508
509 #[async_recursion]
510 async fn transfer_gases(
511 &self,
512 amounts: &[u64],
513 recipient: IotaAddress,
514 uuid: Uuid,
515 ) -> Result<(TransactionDigest, Vec<ObjectID>), FaucetError> {
516 let number_of_coins = amounts.len();
517 let total_amount: u64 = amounts.iter().sum();
518 let gas_cost = self.get_gas_cost().await?;
519
520 let gas_coin_response = self
521 .prepare_gas_coin(total_amount + gas_cost, uuid, false)
522 .await;
523 match gas_coin_response {
524 GasCoinResponse::ValidGasCoin(coin_id) => {
525 let tx_data = self
526 .build_pay_iota_txn(coin_id, self.active_address, recipient, amounts, gas_cost)
527 .await
528 .map_err(FaucetError::internal)?;
529
530 {
531 let mut wal = self.wal.lock().await;
535 wal.reserve(uuid, coin_id, recipient, tx_data.clone())
536 .map_err(FaucetError::internal)?;
537 }
538 let response = self
539 .sign_and_execute_txn(uuid, recipient, coin_id, tx_data, false)
540 .await?;
541 self.metrics.total_coin_requests_succeeded.inc();
542 self.check_and_map_transfer_gas_result(response, number_of_coins, recipient)
543 .await
544 }
545
546 GasCoinResponse::UnknownGasCoin(coin_id) => {
547 self.recycle_gas_coin(coin_id, uuid).await;
548 Err(FaucetError::FullnodeReading(format!(
549 "unknown gas coin {coin_id:?}"
550 )))
551 }
552
553 GasCoinResponse::GasCoinWithInsufficientBalance(coin_id) => {
554 warn!(?uuid, ?coin_id, "Insufficient balance, removing from pool");
555 self.metrics.total_discarded_coins.inc();
556 self.transfer_gases(amounts, recipient, uuid).await
557 }
558
559 GasCoinResponse::InvalidGasCoin(coin_id) => {
560 warn!(?uuid, ?coin_id, "Invalid, removing from pool");
562 self.metrics.total_discarded_coins.inc();
563 self.transfer_gases(amounts, recipient, uuid).await
564 }
565
566 GasCoinResponse::NoGasCoinAvailable => Err(FaucetError::NoGasCoinAvailable),
567 }
568 }
569
570 async fn recycle_gas_coin(&self, coin_id: ObjectID, uuid: Uuid) {
571 let producer = self.producer.lock().await;
575 info!(?uuid, ?coin_id, "Got producer lock and recycling coin");
576 producer
577 .try_send(coin_id)
578 .expect("unexpected - queue is large enough to hold all coins");
579 self.metrics.total_available_coins.inc();
580 info!(?uuid, ?coin_id, "Recycled coin");
581 }
582
583 async fn recycle_gas_coin_for_batch(&self, coin_id: ObjectID, uuid: Uuid) {
584 let batch_producer = self.batch_producer.lock().await;
588 info!(?uuid, ?coin_id, "Got producer lock and recycling coin");
589 batch_producer
590 .try_send(coin_id)
591 .expect("unexpected - queue is large enough to hold all coins");
592 self.metrics.total_available_coins.inc();
593 info!(?uuid, ?coin_id, "Recycled coin");
594 }
595
596 async fn execute_pay_iota_txn_with_retries(
597 &self,
598 tx: &Transaction,
599 coin_id: ObjectID,
600 recipient: IotaAddress,
601 uuid: Uuid,
602 ) -> IotaTransactionBlockResponse {
603 let mut retry_delay = Duration::from_millis(500);
604
605 loop {
606 let res = self
607 .execute_pay_iota_txn(tx, coin_id, recipient, uuid)
608 .await;
609
610 if let Ok(res) = res {
611 return res;
612 }
613
614 info!(
615 ?recipient,
616 ?coin_id,
617 ?uuid,
618 ?retry_delay,
619 "PayIota transaction in faucet failed, previous error: {:?}",
620 &res,
621 );
622
623 tokio::time::sleep(retry_delay).await;
624 retry_delay *= 2;
625 }
626 }
627
628 async fn execute_pay_iota_txn(
629 &self,
630 tx: &Transaction,
631 coin_id: ObjectID,
632 recipient: IotaAddress,
633 uuid: Uuid,
634 ) -> Result<IotaTransactionBlockResponse, anyhow::Error> {
635 self.metrics.current_executions_in_flight.inc();
636 let _metrics_guard = scopeguard::guard(self.metrics.clone(), |metrics| {
637 metrics.current_executions_in_flight.dec();
638 });
639
640 let tx_digest = tx.digest();
641 let client = self.wallet.get_client().await?;
642 Ok(client
643 .quorum_driver_api()
644 .execute_transaction_block(
645 tx.clone(),
646 IotaTransactionBlockResponseOptions::new().with_effects(),
647 Some(ExecuteTransactionRequestType::WaitForLocalExecution),
648 )
649 .await
650 .tap_err(|e| {
651 error!(
652 ?tx_digest,
653 ?recipient,
654 ?coin_id,
655 ?uuid,
656 "Transfer Transaction failed: {:?}",
657 e
658 )
659 })?)
660 }
661
662 async fn get_gas_cost(&self) -> Result<u64, FaucetError> {
663 let gas_price = self.get_gas_price().await?;
664 Ok(gas_price * DEFAULT_GAS_COMPUTATION_BUCKET)
665 }
666
667 async fn get_gas_price(&self) -> Result<u64, FaucetError> {
668 let client = self
669 .wallet
670 .get_client()
671 .await
672 .map_err(|e| FaucetError::Wallet(format!("Unable to get client: {e:?}")))?;
673 client
674 .read_api()
675 .get_reference_gas_price()
676 .await
677 .map_err(|e| FaucetError::FullnodeReading(format!("Error fetch gas price {e:?}")))
678 }
679
680 async fn build_pay_iota_txn(
681 &self,
682 coin_id: ObjectID,
683 signer: IotaAddress,
684 recipient: IotaAddress,
685 amounts: &[u64],
686 budget: u64,
687 ) -> Result<TransactionData, anyhow::Error> {
688 let recipients = vec![recipient; amounts.len()];
689 let client = self.wallet.get_client().await?;
690 client
691 .transaction_builder()
692 .pay_iota(signer, vec![coin_id], recipients, amounts.to_vec(), budget)
693 .await
694 .map_err(|e| {
695 anyhow::anyhow!(
696 "Failed to build PayIota transaction for coin {:?}, with err {:?}",
697 coin_id,
698 e
699 )
700 })
701 }
702
703 async fn check_and_map_transfer_gas_result(
704 &self,
705 res: IotaTransactionBlockResponse,
706 number_of_coins: usize,
707 recipient: IotaAddress,
708 ) -> Result<(TransactionDigest, Vec<ObjectID>), FaucetError> {
709 let created = res
710 .effects
711 .ok_or_else(|| {
712 FaucetError::ParseTransactionResponse(format!(
713 "effects field missing for txn {}",
714 res.digest
715 ))
716 })?
717 .created()
718 .to_vec();
719 if created.len() != number_of_coins {
720 return Err(FaucetError::CoinAmountTransferredIncorrect(format!(
721 "PayIota Transaction should create exact {:?} new coins, but got {:?}",
722 number_of_coins, created
723 )));
724 }
725 assert!(
726 created
727 .iter()
728 .all(|created_coin_owner_ref| created_coin_owner_ref.owner == recipient)
729 );
730 let coin_ids: Vec<ObjectID> = created
731 .iter()
732 .map(|created_coin_owner_ref| created_coin_owner_ref.reference.object_id)
733 .collect();
734 Ok((res.digest, coin_ids))
735 }
736
737 async fn build_batch_pay_iota_txn(
738 &self,
739 coin_id: ObjectID,
740 batch_requests: Vec<(Uuid, IotaAddress, Vec<u64>)>,
741 signer: IotaAddress,
742 budget: u64,
743 ) -> Result<TransactionData, anyhow::Error> {
744 let gas_payment = self.wallet.get_object_ref(coin_id).await?;
745 let gas_price = self.wallet.get_reference_gas_price().await?;
746 let pt = {
749 let mut builder = ProgrammableTransactionBuilder::new();
750 for (_uuid, recipient, amounts) in batch_requests {
751 let recipients = vec![recipient; amounts.len()];
752 builder.pay_iota(recipients, amounts)?;
753 }
754 builder.finish()
755 };
756
757 Ok(TransactionData::new_programmable(
758 signer,
759 vec![gas_payment],
760 pt,
761 budget,
762 gas_price,
763 ))
764 }
765
766 async fn check_and_map_batch_transfer_gas_result(
767 &self,
768 res: IotaTransactionBlockResponse,
769 requests: Vec<(Uuid, IotaAddress, Vec<u64>)>,
770 ) -> Result<(), FaucetError> {
771 let created = res
774 .effects
775 .ok_or_else(|| {
776 FaucetError::ParseTransactionResponse(format!(
777 "effects field missing for txn {}",
778 res.digest
779 ))
780 })?
781 .created()
782 .to_vec();
783
784 let mut address_coins_map: HashMap<IotaAddress, Vec<OwnedObjectRef>> = HashMap::new();
785 created.iter().for_each(|created_coin_owner_ref| {
786 let owner = created_coin_owner_ref.owner;
787 let coin_obj_ref = created_coin_owner_ref.clone();
788
789 address_coins_map
791 .entry(owner.get_owner_address().unwrap())
792 .or_default()
793 .push(coin_obj_ref);
794 });
795
796 let mut request_count: HashMap<IotaAddress, u64> = HashMap::new();
799 let mut task_map = self.task_id_cache.lock().await;
801 for (uuid, addy, amounts) in requests {
802 let number_of_coins = amounts.len();
803 let index = *request_count.entry(addy).or_insert(0);
805
806 let coins_created_for_address = address_coins_map.entry(addy).or_default();
809
810 if number_of_coins as u64 + index > coins_created_for_address.len() as u64 {
811 return Err(FaucetError::CoinAmountTransferredIncorrect(format!(
812 "PayIota Transaction should create exact {:?} new coins, but got {:?}",
813 number_of_coins as u64 + index,
814 coins_created_for_address.len()
815 )));
816 }
817 let coins_slice =
818 &mut coins_created_for_address[index as usize..(index as usize + number_of_coins)];
819
820 request_count.insert(addy, number_of_coins as u64 + index);
821
822 let transferred_gases = coins_slice
826 .iter()
827 .zip(amounts)
828 .map(|(coin, amount)| CoinInfo {
829 id: coin.object_id(),
830 transfer_tx_digest: res.digest,
831 amount,
832 })
833 .collect();
834
835 task_map.insert(
836 uuid,
837 BatchSendStatus {
838 status: BatchSendStatusType::SUCCEEDED,
839 transferred_gas_objects: Some(FaucetReceipt {
840 sent: transferred_gases,
841 }),
842 },
843 Duration::from_secs(self.ttl_expiration),
844 );
845 }
846
847 Ok(())
849 }
850
851 #[cfg(test)]
852 pub(crate) fn shutdown_batch_send_task(&self) {
853 self.batch_transfer_shutdown
854 .lock()
855 .take()
856 .unwrap()
857 .send(())
858 .unwrap();
859 }
860
861 #[cfg(test)]
862 pub fn wallet_mut(&mut self) -> &mut WalletContext {
863 &mut self.wallet
864 }
865
866 #[cfg(test)]
867 pub fn teardown(self) -> WalletContext {
868 self.wallet
869 }
870
871 #[cfg(test)]
872 async fn drain_gas_queue(&mut self, expected_gas_count: usize) -> HashSet<ObjectID> {
873 use tokio::sync::mpsc::error::TryRecvError;
874 let mut consumer = self.consumer.lock().await;
875 let mut candidates = HashSet::new();
876 let mut i = 0;
877 loop {
878 let coin_id = consumer
879 .try_recv()
880 .unwrap_or_else(|e| panic!("Expect the {}th candidate but got {}", i, e));
881 candidates.insert(coin_id);
882 i += 1;
883 if i == expected_gas_count {
884 assert_eq!(consumer.try_recv().unwrap_err(), TryRecvError::Empty);
885 break;
886 }
887 }
888 candidates
889 }
890}
891
892#[async_trait]
893impl Faucet for SimpleFaucet {
894 async fn send(
895 &self,
896 id: Uuid,
897 recipient: IotaAddress,
898 amounts: &[u64],
899 ) -> Result<FaucetReceipt, FaucetError> {
900 info!(?recipient, uuid = ?id, ?amounts, "Getting faucet requests");
901
902 let (digest, coin_ids) = self.transfer_gases(amounts, recipient, id).await?;
903
904 info!(uuid = ?id, ?recipient, ?digest, "PayIota txn succeeded");
905 let mut sent = Vec::with_capacity(coin_ids.len());
906 let coin_results =
907 futures::future::join_all(coin_ids.iter().map(|coin_id| self.get_coin(*coin_id))).await;
908 for (coin_id, res) in coin_ids.into_iter().zip(coin_results) {
909 let amount = if let Ok(Some((_, coin))) = res {
910 coin.value()
911 } else {
912 info!(
913 ?recipient,
914 ?coin_id,
915 uuid = ?id,
916 "Could not find coin after successful transaction, error: {:?}",
917 &res,
918 );
919 0
920 };
921 sent.push(CoinInfo {
922 transfer_tx_digest: digest,
923 amount,
924 id: coin_id,
925 });
926 }
927
928 let faucet_receipt = FaucetReceipt { sent };
930 let mut task_map = self.task_id_cache.lock().await;
931 task_map.insert(
932 id,
933 BatchSendStatus {
934 status: BatchSendStatusType::SUCCEEDED,
935 transferred_gas_objects: Some(faucet_receipt.clone()),
936 },
937 Duration::from_secs(self.ttl_expiration),
938 );
939
940 Ok(faucet_receipt)
941 }
942
943 async fn batch_send(
944 &self,
945 id: Uuid,
946 recipient: IotaAddress,
947 amounts: &[u64],
948 ) -> Result<BatchFaucetReceipt, FaucetError> {
949 info!(?recipient, uuid = ?id, "Getting faucet request");
950 if self
951 .request_producer
952 .try_send((id, recipient, amounts.to_vec()))
953 .is_err()
954 {
955 return Err(FaucetError::BatchSendQueueFull);
956 }
957
958 let mut task_map = self.task_id_cache.lock().await;
959 task_map.insert(
960 id,
961 BatchSendStatus {
962 status: BatchSendStatusType::INPROGRESS,
963 transferred_gas_objects: None,
964 },
965 Duration::from_secs(self.ttl_expiration),
966 );
967 Ok(BatchFaucetReceipt {
968 task: id.to_string(),
969 })
970 }
971
972 async fn get_batch_send_status(&self, task_id: Uuid) -> Result<BatchSendStatus, FaucetError> {
973 let task_map = self.task_id_cache.lock().await;
974 match task_map.get(&task_id) {
975 Some(status) => Ok(status.clone()),
976 None => Err(FaucetError::Internal("task id not found".to_string())),
977 }
978 }
979}
980
981pub async fn batch_gather(
982 request_consumer: &mut Receiver<(Uuid, IotaAddress, Vec<u64>)>,
983 requests: &mut Vec<(Uuid, IotaAddress, Vec<u64>)>,
984 batch_request_size: u64,
985) -> Result<(), FaucetError> {
986 for _ in 1..batch_request_size {
988 let Some(req) = request_consumer.recv().await else {
989 error!("Request consumer queue closed");
990 return Err(FaucetError::ChannelClosed);
991 };
992
993 requests.push(req);
994 }
995
996 Ok(())
997}
998
999pub async fn batch_transfer_gases(
1001 weak_faucet: &Weak<SimpleFaucet>,
1002 request_consumer: &mut Receiver<(Uuid, IotaAddress, Vec<u64>)>,
1003 rx_batch_transfer_shutdown: &mut oneshot::Receiver<()>,
1004) -> Result<TransactionDigest, FaucetError> {
1005 let mut requests = Vec::new();
1006
1007 tokio::select! {
1008 first_req = request_consumer.recv() => {
1009 if let Some((uuid, address, amounts)) = first_req {
1010 requests.push((uuid, address, amounts));
1011 } else {
1012 info!("No more faucet requests will be received. Exiting batch faucet task ...");
1014 return Ok(TransactionDigest::ZERO);
1015 };
1016 }
1017 _ = rx_batch_transfer_shutdown => {
1018 info!("Shutdown signal received. Exiting faucet ...");
1019 return Ok(TransactionDigest::ZERO);
1020 }
1021 };
1022
1023 let Some(faucet) = weak_faucet.upgrade() else {
1024 info!("Faucet has shut down already. Exiting ...");
1025 return Ok(TransactionDigest::ZERO);
1026 };
1027
1028 if timeout(
1029 BATCH_TIMEOUT,
1030 batch_gather(request_consumer, &mut requests, faucet.batch_request_size),
1031 )
1032 .await
1033 .is_err()
1034 {
1035 info!("Batch timeout elapsed while waiting.");
1036 };
1037
1038 let total_requests = requests.len();
1039 let gas_cost = faucet.get_gas_cost().await?;
1040 let uuid = Uuid::new_v4();
1042 info!(
1043 ?uuid,
1044 "Batch transfer attempted of size: {:?}", total_requests
1045 );
1046 let total_iota_needed: u64 = requests.iter().flat_map(|(_, _, amounts)| amounts).sum();
1047
1048 loop {
1050 let gas_coin_response = faucet
1051 .prepare_gas_coin(total_iota_needed + gas_cost, uuid, true)
1052 .await;
1053
1054 match gas_coin_response {
1055 GasCoinResponse::ValidGasCoin(coin_id) => {
1056 let tx_data = faucet
1057 .build_batch_pay_iota_txn(
1058 coin_id,
1059 requests.clone(),
1060 faucet.active_address,
1061 gas_cost,
1062 )
1063 .await
1064 .map_err(FaucetError::internal)?;
1065
1066 let recipient = IotaAddress::ZERO;
1070 {
1071 let mut wal = faucet.wal.lock().await;
1075 wal.reserve(uuid, coin_id, recipient, tx_data.clone())
1076 .map_err(FaucetError::internal)?;
1077 }
1078 let response = faucet
1079 .sign_and_execute_txn(uuid, recipient, coin_id, tx_data, true)
1080 .await?;
1081
1082 faucet
1083 .metrics
1084 .total_coin_requests_succeeded
1085 .add(total_requests as i64);
1086
1087 faucet
1088 .check_and_map_batch_transfer_gas_result(response.clone(), requests)
1089 .await?;
1090
1091 return Ok(response.digest);
1092 }
1093
1094 GasCoinResponse::UnknownGasCoin(coin_id) => {
1095 warn!(?uuid, ?coin_id, "unknown gas coin.");
1097 faucet.metrics.total_discarded_coins.inc();
1098 continue;
1099 }
1100
1101 GasCoinResponse::GasCoinWithInsufficientBalance(coin_id) => {
1102 warn!(?uuid, ?coin_id, "Insufficient balance, removing from pool");
1103 faucet.metrics.total_discarded_coins.inc();
1104 continue;
1106 }
1107
1108 GasCoinResponse::InvalidGasCoin(coin_id) => {
1109 warn!(?uuid, ?coin_id, "Invalid, removing from pool");
1111 faucet.metrics.total_discarded_coins.inc();
1112 continue;
1114 }
1115
1116 GasCoinResponse::NoGasCoinAvailable => return Err(FaucetError::NoGasCoinAvailable),
1117 }
1118 }
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123 use anyhow::*;
1124 use iota_json_rpc_types::{IotaExecutionStatus, IotaTransactionBlockEffects};
1125 use iota_sdk::wallet_context::WalletContext;
1126 use iota_types::transaction::{SenderSignedData, TransactionDataAPI};
1127 use shared_crypto::intent::Intent;
1128 use test_cluster::TestClusterBuilder;
1129
1130 use super::*;
1131
1132 async fn execute_tx(
1133 ctx: &mut WalletContext,
1134 tx_data: TransactionData,
1135 ) -> Result<IotaTransactionBlockEffects, anyhow::Error> {
1136 let signature = ctx.config().keystore().sign_secure(
1137 &tx_data.sender(),
1138 &tx_data,
1139 Intent::iota_transaction(),
1140 )?;
1141 let sender_signed_data = SenderSignedData::new_from_sender_signature(tx_data, signature);
1142 let transaction = Transaction::new(sender_signed_data);
1143 let response = ctx.execute_transaction_may_fail(transaction).await?;
1144 let result_effects = response.clone().effects;
1145
1146 if let Some(effects) = result_effects {
1147 if matches!(effects.status(), IotaExecutionStatus::Failure { .. }) {
1148 Err(anyhow!(
1149 "Error executing transaction: {:#?}",
1150 effects.status()
1151 ))
1152 } else {
1153 Ok(effects)
1154 }
1155 } else {
1156 Err(anyhow!(
1157 "Effects from IotaTransactionBlockResult should not be empty"
1158 ))
1159 }
1160 }
1161
1162 #[tokio::test]
1163 async fn simple_faucet_basic_interface_should_work() {
1164 telemetry_subscribers::init_for_testing();
1165 let test_cluster = TestClusterBuilder::new().build().await;
1166 let tmp = tempfile::tempdir().unwrap();
1167 let prom_registry = Registry::new();
1168 let config = FaucetConfig::default();
1169
1170 let address = test_cluster.get_address_0();
1171 let mut context = test_cluster.wallet;
1172 let gas_coins = context
1173 .get_all_gas_objects_owned_by_address(address)
1174 .await
1175 .unwrap();
1176 let client = context.get_client().await.unwrap();
1177 let tx_kind = client
1178 .transaction_builder()
1179 .split_coin_tx_kind(gas_coins.first().unwrap().0, None, Some(10))
1180 .await
1181 .unwrap();
1182 let gas_budget = 50_000_000;
1183 let rgp = context.get_reference_gas_price().await.unwrap();
1184 let tx_data = client
1185 .transaction_builder()
1186 .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1187 .await
1188 .unwrap();
1189
1190 execute_tx(&mut context, tx_data).await.unwrap();
1191
1192 let faucet = SimpleFaucet::new(
1193 context,
1194 &prom_registry,
1195 &tmp.path().join("faucet.wal"),
1196 config,
1197 )
1198 .await
1199 .unwrap();
1200 let faucet = Arc::try_unwrap(faucet).unwrap();
1203
1204 let available = faucet.metrics.total_available_coins.get();
1205 let discarded = faucet.metrics.total_discarded_coins.get();
1206
1207 test_basic_interface(&faucet).await;
1208 test_send_interface_has_success_status(&faucet).await;
1209
1210 assert_eq!(available, faucet.metrics.total_available_coins.get());
1211 assert_eq!(discarded, faucet.metrics.total_discarded_coins.get());
1212 }
1213
1214 #[tokio::test]
1215 async fn test_init_gas_queue() {
1216 let test_cluster = TestClusterBuilder::new().build().await;
1217 let address = test_cluster.get_address_0();
1218 let context = test_cluster.wallet;
1219 let gas_coins = context
1220 .get_all_gas_objects_owned_by_address(address)
1221 .await
1222 .unwrap();
1223 let gas_coins = HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.0));
1224
1225 let tmp = tempfile::tempdir().unwrap();
1226 let prom_registry = Registry::new();
1227 let config = FaucetConfig::default();
1228 let faucet = SimpleFaucet::new(
1229 context,
1230 &prom_registry,
1231 &tmp.path().join("faucet.wal"),
1232 config,
1233 )
1234 .await
1235 .unwrap();
1236 faucet.shutdown_batch_send_task();
1237 let available = faucet.metrics.total_available_coins.get();
1238 let faucet_unwrapped = &mut Arc::try_unwrap(faucet).unwrap();
1239
1240 let candidates = faucet_unwrapped.drain_gas_queue(gas_coins.len()).await;
1241
1242 assert_eq!(available as usize, candidates.len());
1243 assert_eq!(
1244 candidates, gas_coins,
1245 "gases: {:?}, candidates: {:?}",
1246 gas_coins, candidates
1247 );
1248 }
1249
1250 #[tokio::test]
1251 async fn test_transfer_state() {
1252 let test_cluster = TestClusterBuilder::new().build().await;
1253 let address = test_cluster.get_address_0();
1254 let context = test_cluster.wallet;
1255 let gas_coins = context
1256 .get_all_gas_objects_owned_by_address(address)
1257 .await
1258 .unwrap();
1259 let gas_coins = HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.0));
1260
1261 let tmp = tempfile::tempdir().unwrap();
1262 let prom_registry = Registry::new();
1263 let config = FaucetConfig::default();
1264 let faucet = SimpleFaucet::new(
1265 context,
1266 &prom_registry,
1267 &tmp.path().join("faucet.wal"),
1268 config,
1269 )
1270 .await
1271 .unwrap();
1272
1273 let number_of_coins = gas_coins.len();
1274 let amounts = &vec![1; number_of_coins];
1275 let _ = futures::future::join_all((0..number_of_coins).map(|_| {
1276 faucet.send(
1277 Uuid::new_v4(),
1278 IotaAddress::random_for_testing_only(),
1279 amounts,
1280 )
1281 }))
1282 .await
1283 .into_iter()
1284 .map(|res| res.unwrap())
1285 .collect::<Vec<_>>();
1286
1287 let available = faucet.metrics.total_available_coins.get();
1290 faucet.shutdown_batch_send_task();
1291
1292 let faucet_unwrapped: &mut SimpleFaucet = &mut Arc::try_unwrap(faucet).unwrap();
1293 let candidates = faucet_unwrapped.drain_gas_queue(gas_coins.len()).await;
1294 assert_eq!(available as usize, candidates.len());
1295 assert_eq!(
1296 candidates, gas_coins,
1297 "gases: {:?}, candidates: {:?}",
1298 gas_coins, candidates
1299 );
1300 }
1301
1302 #[tokio::test]
1303 async fn test_batch_transfer_interface() {
1304 let test_cluster = TestClusterBuilder::new().build().await;
1305 let config: FaucetConfig = FaucetConfig {
1306 batch_enabled: true,
1307 ..Default::default()
1308 };
1309 let coin_amount = config.amount;
1310 let prom_registry = Registry::new();
1311 let tmp = tempfile::tempdir().unwrap();
1312 let address = test_cluster.get_address_0();
1313 let mut context = test_cluster.wallet;
1314 let gas_coins = context
1315 .get_all_gas_objects_owned_by_address(address)
1316 .await
1317 .unwrap();
1318 let client = context.get_client().await.unwrap();
1319 let tx_kind = client
1320 .transaction_builder()
1321 .split_coin_tx_kind(gas_coins.first().unwrap().0, None, Some(10))
1322 .await
1323 .unwrap();
1324 let gas_budget = 50_000_000;
1325 let rgp = context.get_reference_gas_price().await.unwrap();
1326 let tx_data = client
1327 .transaction_builder()
1328 .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1329 .await
1330 .unwrap();
1331
1332 execute_tx(&mut context, tx_data).await.unwrap();
1333
1334 let faucet = SimpleFaucet::new(
1335 context,
1336 &prom_registry,
1337 &tmp.path().join("faucet.wal"),
1338 config,
1339 )
1340 .await
1341 .unwrap();
1342
1343 let amounts = &[coin_amount];
1344
1345 let target_addresses: Vec<IotaAddress> = (0..5)
1347 .map(|_| IotaAddress::random_for_testing_only())
1348 .collect();
1349
1350 let response = futures::future::join_all(
1351 target_addresses
1352 .iter()
1353 .map(|address| faucet.batch_send(Uuid::new_v4(), *address, amounts)),
1354 )
1355 .await
1356 .into_iter()
1357 .map(|res| res.unwrap())
1358 .collect::<Vec<BatchFaucetReceipt>>();
1359
1360 let status_results = futures::future::join_all(
1362 response
1363 .clone()
1364 .iter()
1365 .map(|task| faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())),
1366 )
1367 .await
1368 .into_iter()
1369 .map(|res| res.unwrap())
1370 .collect::<Vec<BatchSendStatus>>();
1371
1372 for status in status_results {
1373 assert_eq!(status.status, BatchSendStatusType::INPROGRESS);
1374 }
1375
1376 let mut status_results;
1377 loop {
1378 status_results =
1380 futures::future::join_all(response.clone().iter().map(|task| {
1381 faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())
1382 }))
1383 .await
1384 .into_iter()
1385 .map(|res| res.unwrap())
1386 .collect::<Vec<BatchSendStatus>>();
1387
1388 if status_results[0].status == BatchSendStatusType::SUCCEEDED {
1391 break;
1392 }
1393 info!(
1394 "Trying to get status again... current is: {:?}",
1395 status_results[0].status
1396 );
1397 }
1398 for status in status_results {
1399 assert_eq!(status.status, BatchSendStatusType::SUCCEEDED);
1400 }
1401 }
1402
1403 #[tokio::test]
1404 async fn test_ttl_cache_expires_after_duration() {
1405 let test_cluster = TestClusterBuilder::new().build().await;
1406 let context = test_cluster.wallet;
1407 let config = FaucetConfig {
1410 ttl_expiration: 1,
1411 ..Default::default()
1412 };
1413 let prom_registry = Registry::new();
1414 let tmp = tempfile::tempdir().unwrap();
1415 let faucet = SimpleFaucet::new(
1416 context,
1417 &prom_registry,
1418 &tmp.path().join("faucet.wal"),
1419 config,
1420 )
1421 .await
1422 .unwrap();
1423
1424 let amounts = &[1; 1];
1425 let target_addresses: Vec<IotaAddress> = (0..5)
1427 .map(|_| IotaAddress::random_for_testing_only())
1428 .collect();
1429
1430 let response = futures::future::join_all(
1431 target_addresses
1432 .iter()
1433 .map(|address| faucet.batch_send(Uuid::new_v4(), *address, amounts)),
1434 )
1435 .await
1436 .into_iter()
1437 .map(|res| res.unwrap())
1438 .collect::<Vec<BatchFaucetReceipt>>();
1439
1440 tokio::time::sleep(Duration::from_secs(10)).await;
1442 let status_results = futures::future::join_all(
1443 response
1444 .clone()
1445 .iter()
1446 .map(|task| faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())),
1447 )
1448 .await;
1449
1450 let all_errors = status_results.iter().all(Result::is_err);
1451 assert!(all_errors);
1452 }
1453
1454 #[tokio::test]
1455 async fn test_discard_invalid_gas() {
1456 let test_cluster = TestClusterBuilder::new().build().await;
1457 let address = test_cluster.get_address_0();
1458 let context = test_cluster.wallet;
1459 let mut gas_coins = context
1460 .get_all_gas_objects_owned_by_address(address)
1461 .await
1462 .unwrap();
1463
1464 let bad_gas = gas_coins.swap_remove(0);
1465 let gas_coins = HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.0));
1466
1467 let tmp = tempfile::tempdir().unwrap();
1468 let prom_registry = Registry::new();
1469 let config = FaucetConfig::default();
1470
1471 let client = context.get_client().await.unwrap();
1472 let faucet = SimpleFaucet::new(
1473 context,
1474 &prom_registry,
1475 &tmp.path().join("faucet.wal"),
1476 config,
1477 )
1478 .await
1479 .unwrap();
1480 faucet.shutdown_batch_send_task();
1481 let faucet: &mut SimpleFaucet = &mut Arc::try_unwrap(faucet).unwrap();
1482
1483 let gas_budget = 50_000_000;
1485 let tx_data = client
1486 .transaction_builder()
1487 .pay_all_iota(
1488 address,
1489 vec![bad_gas.0],
1490 IotaAddress::random_for_testing_only(),
1491 gas_budget,
1492 )
1493 .await
1494 .unwrap();
1495 execute_tx(faucet.wallet_mut(), tx_data).await.unwrap();
1496
1497 let number_of_coins = gas_coins.len();
1498 let amounts = &vec![1; number_of_coins];
1499 futures::future::join_all((0..2).map(|_| {
1502 faucet.send(
1503 Uuid::new_v4(),
1504 IotaAddress::random_for_testing_only(),
1505 amounts,
1506 )
1507 }))
1508 .await;
1509
1510 let available = faucet.metrics.total_available_coins.get();
1513 let discarded = faucet.metrics.total_discarded_coins.get();
1514 let candidates = faucet.drain_gas_queue(gas_coins.len()).await;
1515 assert_eq!(available as usize, candidates.len());
1516 assert_eq!(discarded, 1);
1517 assert_eq!(
1518 candidates, gas_coins,
1519 "gases: {:?}, candidates: {:?}",
1520 gas_coins, candidates
1521 );
1522 }
1523
1524 #[tokio::test]
1525 async fn test_clear_wal() {
1526 telemetry_subscribers::init_for_testing();
1527 let test_cluster = TestClusterBuilder::new().build().await;
1528 let context = test_cluster.wallet;
1529 let tmp = tempfile::tempdir().unwrap();
1530 let prom_registry = Registry::new();
1531 let config = FaucetConfig::default();
1532 let faucet = SimpleFaucet::new(
1533 context,
1534 &prom_registry,
1535 &tmp.path().join("faucet.wal"),
1536 config,
1537 )
1538 .await
1539 .unwrap();
1540
1541 let original_available = faucet.metrics.total_available_coins.get();
1542 let original_discarded = faucet.metrics.total_discarded_coins.get();
1543
1544 let recipient = IotaAddress::random_for_testing_only();
1545 let faucet_address = faucet.active_address;
1546 let uuid = Uuid::new_v4();
1547
1548 let GasCoinResponse::ValidGasCoin(coin_id) =
1549 faucet.prepare_gas_coin(100, uuid, false).await
1550 else {
1551 panic!("prepare_gas_coin did not give a valid coin.")
1552 };
1553
1554 let tx_data = faucet
1555 .build_pay_iota_txn(coin_id, faucet_address, recipient, &[100], 200_000_000)
1556 .await
1557 .map_err(FaucetError::internal)
1558 .unwrap();
1559
1560 let mut wal = faucet.wal.lock().await;
1561
1562 assert!(wal.log.is_empty());
1564 wal.reserve(Uuid::new_v4(), coin_id, recipient, tx_data)
1565 .map_err(FaucetError::internal)
1566 .ok();
1567 drop(wal);
1568
1569 faucet.retry_wal_coins().await.ok();
1571 let mut wal = faucet.wal.lock().await;
1572 assert!(!wal.log.is_empty());
1573
1574 wal.set_in_flight(coin_id, false)
1576 .expect("Unable to set in flight status to false.");
1577 drop(wal);
1578
1579 faucet.retry_wal_coins().await.ok();
1580 let wal = faucet.wal.lock().await;
1581 assert!(wal.log.is_empty());
1582
1583 let total_coins = faucet.metrics.total_available_coins.get();
1584 let discarded_coins = faucet.metrics.total_discarded_coins.get();
1585 assert_eq!(total_coins, original_available);
1586 assert_eq!(discarded_coins, original_discarded);
1587 }
1588
1589 #[tokio::test]
1590 async fn test_discard_smaller_amount_gas() {
1591 telemetry_subscribers::init_for_testing();
1592 let test_cluster = TestClusterBuilder::new().build().await;
1593 let address = test_cluster.get_address_0();
1594 let mut context = test_cluster.wallet;
1595 let gas_coins = context
1596 .get_all_gas_objects_owned_by_address(address)
1597 .await
1598 .unwrap();
1599
1600 let config = FaucetConfig::default();
1604 let tiny_value = (config.num_coins as u64 * config.amount) + 1;
1605 let client = context.get_client().await.unwrap();
1606 let tx_kind = client
1607 .transaction_builder()
1608 .split_coin_tx_kind(gas_coins.first().unwrap().0, Some(vec![tiny_value]), None)
1609 .await
1610 .unwrap();
1611 let gas_budget = 50_000_000;
1612 let rgp = context.get_reference_gas_price().await.unwrap();
1613 let tx_data = client
1614 .transaction_builder()
1615 .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1616 .await
1617 .unwrap();
1618
1619 let effects = execute_tx(&mut context, tx_data).await.unwrap();
1620
1621 let tiny_coin_id = effects.created()[0].reference.object_id;
1622
1623 let gas_coins = context.gas_objects(address).await.unwrap();
1625
1626 let tiny_amount = gas_coins
1627 .iter()
1628 .find(|gas| gas.1.object_id == tiny_coin_id)
1629 .unwrap()
1630 .0;
1631 assert_eq!(tiny_amount, tiny_value);
1632
1633 let gas_coins: HashSet<ObjectID> =
1634 HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.1.object_id));
1635
1636 let tmp = tempfile::tempdir().unwrap();
1637 let prom_registry = Registry::new();
1638 let faucet = SimpleFaucet::new(
1639 context,
1640 &prom_registry,
1641 &tmp.path().join("faucet.wal"),
1642 config,
1643 )
1644 .await
1645 .unwrap();
1646 faucet.shutdown_batch_send_task();
1647
1648 let faucet: &mut SimpleFaucet = &mut Arc::try_unwrap(faucet).unwrap();
1649
1650 let number_of_coins = gas_coins.len();
1652 let amounts = &vec![tiny_value + 1; number_of_coins];
1653 futures::future::join_all((0..10).map(|_| {
1656 faucet.send(
1657 Uuid::new_v4(),
1658 IotaAddress::random_for_testing_only(),
1659 amounts,
1660 )
1661 }))
1662 .await;
1663 info!(
1664 ?number_of_coins,
1665 "Sent to random addresses: {} {}",
1666 amounts[0],
1667 amounts.len(),
1668 );
1669
1670 tokio::task::yield_now().await;
1672 let discarded = faucet.metrics.total_discarded_coins.get();
1673
1674 info!("discarded: {:?}", discarded);
1675 let candidates = faucet.drain_gas_queue(gas_coins.len() - 1).await;
1676
1677 assert_eq!(discarded, 1);
1678 assert!(!candidates.contains(&tiny_coin_id));
1679 }
1680
1681 #[tokio::test]
1682 async fn test_insufficient_balance_will_retry_success() {
1683 let test_cluster = TestClusterBuilder::new().build().await;
1684 let address = test_cluster.get_address_0();
1685 let mut context = test_cluster.wallet;
1686 let gas_coins = context
1687 .get_all_gas_objects_owned_by_address(address)
1688 .await
1689 .unwrap();
1690 let config = FaucetConfig::default();
1691
1692 let reasonable_value = (config.num_coins as u64 * config.amount) * 10;
1695 let client = context.get_client().await.unwrap();
1696 let tx_kind = client
1697 .transaction_builder()
1698 .split_coin_tx_kind(
1699 gas_coins.first().unwrap().0,
1700 Some(vec![reasonable_value]),
1701 None,
1702 )
1703 .await
1704 .unwrap();
1705 let gas_budget = 50_000_000;
1706 let rgp = context.get_reference_gas_price().await.unwrap();
1707 let tx_data = client
1708 .transaction_builder()
1709 .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1710 .await
1711 .unwrap();
1712 execute_tx(&mut context, tx_data).await.unwrap();
1713
1714 let destination_address = IotaAddress::random_for_testing_only();
1715 for gas in gas_coins.iter().take(gas_coins.len() - 1) {
1717 let tx_data = client
1718 .transaction_builder()
1719 .transfer_iota(address, gas.0, gas_budget, destination_address, None)
1720 .await
1721 .unwrap();
1722 execute_tx(&mut context, tx_data).await.unwrap();
1723 }
1724
1725 let gas_coins = context
1728 .get_all_gas_objects_owned_by_address(address)
1729 .await
1730 .unwrap();
1731 assert!(!gas_coins.is_empty());
1732
1733 let tmp = tempfile::tempdir().unwrap();
1734 let prom_registry = Registry::new();
1735 let config = FaucetConfig::default();
1736 let faucet = SimpleFaucet::new(
1737 context,
1738 &prom_registry,
1739 &tmp.path().join("faucet.wal"),
1740 config,
1741 )
1742 .await
1743 .unwrap();
1744
1745 futures::future::join_all((0..2).map(|_| {
1747 faucet.send(
1748 Uuid::new_v4(),
1749 IotaAddress::random_for_testing_only(),
1750 &[30000000000],
1751 )
1752 }))
1753 .await;
1754
1755 let discarded = faucet.metrics.total_discarded_coins.get();
1757 assert_eq!(discarded, 1);
1758
1759 let wal = faucet.wal.lock().await;
1761 assert!(wal.log.is_empty());
1762 }
1763
1764 #[tokio::test]
1765 async fn test_faucet_no_loop_forever() {
1766 let test_cluster = TestClusterBuilder::new().build().await;
1767 let address = test_cluster.get_address_0();
1768 let mut context = test_cluster.wallet;
1769 let gas_coins = context
1770 .get_all_gas_objects_owned_by_address(address)
1771 .await
1772 .unwrap();
1773 let config = FaucetConfig::default();
1774
1775 let tiny_value = (config.num_coins as u64 * config.amount) + 1;
1776 let client = context.get_client().await.unwrap();
1777 let tx_kind = client
1778 .transaction_builder()
1779 .split_coin_tx_kind(gas_coins.first().unwrap().0, Some(vec![tiny_value]), None)
1780 .await
1781 .unwrap();
1782
1783 let gas_budget = 50_000_000;
1784 let rgp = context.get_reference_gas_price().await.unwrap();
1785
1786 let tx_data = client
1787 .transaction_builder()
1788 .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1789 .await
1790 .unwrap();
1791
1792 execute_tx(&mut context, tx_data).await.unwrap();
1793
1794 let destination_address = IotaAddress::random_for_testing_only();
1795
1796 for gas in gas_coins {
1798 let tx_data = client
1799 .transaction_builder()
1800 .transfer_iota(address, gas.0, gas_budget, destination_address, None)
1801 .await
1802 .unwrap();
1803 execute_tx(&mut context, tx_data).await.unwrap();
1804 }
1805
1806 let gas_coins = context
1809 .get_all_gas_objects_owned_by_address(destination_address)
1810 .await
1811 .unwrap();
1812 assert!(!gas_coins.is_empty());
1813
1814 let tmp = tempfile::tempdir().unwrap();
1815 let prom_registry = Registry::new();
1816 let faucet = SimpleFaucet::new(
1817 context,
1818 &prom_registry,
1819 &tmp.path().join("faucet.wal"),
1820 config,
1821 )
1822 .await
1823 .unwrap();
1824
1825 let destination_address = IotaAddress::random_for_testing_only();
1826 let res = faucet
1828 .send(Uuid::new_v4(), destination_address, &[30000000000])
1829 .await;
1830
1831 assert!(matches!(res, Err(FaucetError::NoGasCoinAvailable)));
1833 }
1834
1835 #[tokio::test]
1836 async fn test_faucet_restart_clears_wal() {
1837 let test_cluster = TestClusterBuilder::new().build().await;
1838 let context = test_cluster.wallet;
1839 let tmp = tempfile::tempdir().unwrap();
1840 let prom_registry = Registry::new();
1841 let config = FaucetConfig::default();
1842
1843 let faucet = SimpleFaucet::new(
1844 context,
1845 &prom_registry,
1846 &tmp.path().join("faucet.wal"),
1847 config,
1848 )
1849 .await
1850 .unwrap();
1851
1852 let recipient = IotaAddress::random_for_testing_only();
1853 let faucet_address = faucet.active_address;
1854 let uuid = Uuid::new_v4();
1855
1856 let GasCoinResponse::ValidGasCoin(coin_id) =
1857 faucet.prepare_gas_coin(100, uuid, false).await
1858 else {
1859 panic!("prepare_gas_coin did not give a valid coin.")
1860 };
1861
1862 let tx_data = faucet
1863 .build_pay_iota_txn(coin_id, faucet_address, recipient, &[100], 200_000_000)
1864 .await
1865 .map_err(FaucetError::internal)
1866 .unwrap();
1867
1868 let mut wal = faucet.wal.lock().await;
1869
1870 assert!(wal.log.is_empty());
1872 wal.reserve(Uuid::new_v4(), coin_id, recipient, tx_data)
1873 .map_err(FaucetError::internal)
1874 .ok();
1875 drop(wal);
1876
1877 let mut wal = faucet.wal.lock().await;
1879 assert!(!wal.log.is_empty());
1880
1881 wal.set_in_flight(coin_id, false)
1883 .expect("Unable to set in flight status to false.");
1884 drop(wal);
1885 faucet.shutdown_batch_send_task();
1886
1887 let faucet_unwrapped = Arc::try_unwrap(faucet).unwrap();
1888
1889 let kept_context = faucet_unwrapped.teardown();
1890
1891 let prom_registry_new = Registry::new();
1893
1894 let faucet_restarted = SimpleFaucet::new(
1895 kept_context,
1896 &prom_registry_new,
1897 &tmp.path().join("faucet.wal"),
1898 FaucetConfig::default(),
1899 )
1900 .await
1901 .unwrap();
1902
1903 let restarted_wal = faucet_restarted.wal.lock().await;
1904 assert!(restarted_wal.log.is_empty())
1905 }
1906
1907 #[tokio::test]
1908 async fn test_amounts_transferred_on_batch() {
1909 let test_cluster = TestClusterBuilder::new().build().await;
1910 let config: FaucetConfig = FaucetConfig {
1911 batch_enabled: true,
1912 ..Default::default()
1913 };
1914 let address = test_cluster.get_address_0();
1915 let mut context = test_cluster.wallet;
1916 let gas_coins = context
1917 .get_all_gas_objects_owned_by_address(address)
1918 .await
1919 .unwrap();
1920 let client = context.get_client().await.unwrap();
1921 let tx_kind = client
1922 .transaction_builder()
1923 .split_coin_tx_kind(gas_coins.first().unwrap().0, None, Some(10))
1924 .await
1925 .unwrap();
1926 let gas_budget = 50_000_000;
1927 let rgp = context.get_reference_gas_price().await.unwrap();
1928 let tx_data = client
1929 .transaction_builder()
1930 .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1931 .await
1932 .unwrap();
1933 execute_tx(&mut context, tx_data).await.unwrap();
1934
1935 let prom_registry = Registry::new();
1936 let tmp = tempfile::tempdir().unwrap();
1937 let amount_to_send = config.amount;
1938
1939 let faucet = SimpleFaucet::new(
1940 context,
1941 &prom_registry,
1942 &tmp.path().join("faucet.wal"),
1943 config,
1944 )
1945 .await
1946 .unwrap();
1947
1948 let target_addresses: Vec<IotaAddress> = (0..2)
1950 .map(|_| IotaAddress::random_for_testing_only())
1951 .collect();
1952
1953 let coins_sent = 2;
1955 let amounts = &vec![amount_to_send; coins_sent];
1956
1957 let response = futures::future::join_all(
1959 target_addresses
1960 .iter()
1961 .map(|address| faucet.batch_send(Uuid::new_v4(), *address, amounts)),
1962 )
1963 .await
1964 .into_iter()
1965 .map(|res| res.unwrap())
1966 .collect::<Vec<BatchFaucetReceipt>>();
1967
1968 let mut status_results;
1969 loop {
1970 status_results =
1972 futures::future::join_all(response.clone().iter().map(|task| {
1973 faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())
1974 }))
1975 .await
1976 .into_iter()
1977 .map(|res| res.unwrap())
1978 .collect::<Vec<BatchSendStatus>>();
1979
1980 if status_results[0].status == BatchSendStatusType::SUCCEEDED {
1983 break;
1984 }
1985 info!(
1986 "Trying to get status again... current is: {:?}",
1987 status_results[0].status
1988 );
1989 }
1990
1991 for status in status_results {
1992 assert_eq!(status.status, BatchSendStatusType::SUCCEEDED);
1993 let amounts = status.transferred_gas_objects.unwrap().sent;
1994 assert_eq!(amounts.len(), coins_sent);
1995 for amt in amounts {
1996 assert_eq!(amt.amount, amount_to_send);
1997 }
1998 }
1999 }
2000
2001 async fn test_send_interface_has_success_status(faucet: &impl Faucet) {
2002 let recipient = IotaAddress::random_for_testing_only();
2003 let amounts = vec![1, 2, 3];
2004 let uuid_test = Uuid::new_v4();
2005
2006 faucet.send(uuid_test, recipient, &amounts).await.unwrap();
2007
2008 let status = faucet.get_batch_send_status(uuid_test).await.unwrap();
2009 let mut actual_amounts: Vec<u64> = status
2010 .transferred_gas_objects
2011 .unwrap()
2012 .sent
2013 .iter()
2014 .map(|c| c.amount)
2015 .collect();
2016 actual_amounts.sort_unstable();
2017
2018 assert_eq!(actual_amounts, amounts);
2019 assert_eq!(status.status, BatchSendStatusType::SUCCEEDED);
2020 }
2021
2022 async fn test_basic_interface(faucet: &impl Faucet) {
2023 let recipient = IotaAddress::random_for_testing_only();
2024 let amounts = vec![1, 2, 3];
2025
2026 let FaucetReceipt { sent } = faucet
2027 .send(Uuid::new_v4(), recipient, &amounts)
2028 .await
2029 .unwrap();
2030 let mut actual_amounts: Vec<u64> = sent.iter().map(|c| c.amount).collect();
2031 actual_amounts.sort_unstable();
2032 assert_eq!(actual_amounts, amounts);
2033 }
2034}