iota_faucet/faucet/
simple_faucet.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5#[cfg(test)]
6use std::collections::HashSet;
7use std::{
8    collections::HashMap,
9    fmt,
10    path::Path,
11    sync::{Arc, Weak},
12};
13
14use async_recursion::async_recursion;
15use async_trait::async_trait;
16use iota_json_rpc_types::{
17    IotaObjectDataOptions, IotaTransactionBlockEffectsAPI, IotaTransactionBlockResponse,
18    IotaTransactionBlockResponseOptions, OwnedObjectRef,
19};
20use iota_keys::keystore::AccountKeystore;
21use iota_metrics::spawn_monitored_task;
22use iota_sdk::wallet_context::WalletContext;
23use iota_types::{
24    base_types::{IotaAddress, ObjectID, TransactionDigest},
25    gas_coin::GasCoin,
26    object::Owner,
27    programmable_transaction_builder::ProgrammableTransactionBuilder,
28    quorum_driver_types::ExecuteTransactionRequestType,
29    transaction::{Transaction, TransactionData},
30};
31use prometheus::Registry;
32use shared_crypto::intent::Intent;
33use tap::tap::TapFallible;
34use tokio::{
35    sync::{
36        Mutex,
37        mpsc::{self, Receiver, Sender},
38        oneshot,
39    },
40    time::{Duration, timeout},
41};
42use tracing::{error, info, warn};
43use ttl_cache::TtlCache;
44use typed_store::Map;
45use uuid::Uuid;
46
47use super::write_ahead_log::WriteAheadLog;
48use crate::{
49    BatchFaucetReceipt, BatchSendStatus, BatchSendStatusType, CoinInfo, Faucet, FaucetConfig,
50    FaucetError, FaucetReceipt, faucet::write_ahead_log, metrics::FaucetMetrics,
51};
52
53pub struct SimpleFaucet {
54    wallet: WalletContext,
55    active_address: IotaAddress,
56    producer: Mutex<Sender<ObjectID>>,
57    consumer: Mutex<Receiver<ObjectID>>,
58    batch_producer: Mutex<Sender<ObjectID>>,
59    batch_consumer: Mutex<Receiver<ObjectID>>,
60    pub metrics: FaucetMetrics,
61    pub wal: Mutex<WriteAheadLog>,
62    request_producer: Sender<(Uuid, IotaAddress, Vec<u64>)>,
63    batch_request_size: u64,
64    task_id_cache: Mutex<TtlCache<Uuid, BatchSendStatus>>,
65    ttl_expiration: u64,
66    coin_amount: u64,
67    /// Shuts down the batch transfer task. Used only in testing.
68    #[cfg_attr(not(test), expect(unused))]
69    batch_transfer_shutdown: parking_lot::Mutex<Option<oneshot::Sender<()>>>,
70}
71
72/// We do not just derive(Debug) because WalletContext and the WriteAheadLog do
73/// not implement Debug / are also hard to implement Debug.
74impl fmt::Debug for SimpleFaucet {
75    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
76        f.debug_struct("SimpleFaucet")
77            .field("faucet_wallet", &self.active_address)
78            .field("producer", &self.producer)
79            .field("consumer", &self.consumer)
80            .field("batch_request_size", &self.batch_request_size)
81            .field("ttl_expiration", &self.ttl_expiration)
82            .field("coin_amount", &self.coin_amount)
83            .finish()
84    }
85}
86
87enum GasCoinResponse {
88    GasCoinWithInsufficientBalance(ObjectID),
89    InvalidGasCoin(ObjectID),
90    NoGasCoinAvailable,
91    UnknownGasCoin(ObjectID),
92    ValidGasCoin(ObjectID),
93}
94
95// TODO: replace this with dryrun at the SDK level
96const DEFAULT_GAS_COMPUTATION_BUCKET: u64 = 10_000_000;
97const LOCK_TIMEOUT: Duration = Duration::from_secs(10);
98const RECV_TIMEOUT: Duration = Duration::from_secs(5);
99const BATCH_TIMEOUT: Duration = Duration::from_secs(10);
100
101impl SimpleFaucet {
102    pub async fn new(
103        wallet: WalletContext,
104        prometheus_registry: &Registry,
105        wal_path: &Path,
106        config: FaucetConfig,
107    ) -> Result<Arc<Self>, FaucetError> {
108        let active_address = wallet
109            .active_address()
110            .map_err(|err| FaucetError::Wallet(err.to_string()))?;
111        info!("SimpleFaucet::new with active address: {active_address}");
112
113        let coins = wallet
114            .gas_objects(active_address)
115            .await
116            .map_err(|e| FaucetError::Wallet(e.to_string()))?
117            .iter()
118            // Ok to unwrap() since `get_gas_objects` guarantees gas
119            .map(|q| GasCoin::try_from(&q.1).unwrap())
120            .filter(|coin| coin.0.balance.value() >= (config.amount * config.num_coins as u64))
121            .collect::<Vec<GasCoin>>();
122
123        if coins.is_empty() {
124            return Err(FaucetError::NoGasCoinAvailable);
125        }
126
127        let metrics = FaucetMetrics::new(prometheus_registry);
128
129        let wal = WriteAheadLog::open(wal_path);
130        let mut pending = vec![];
131
132        let (producer, consumer) = mpsc::channel(coins.len());
133        let (batch_producer, batch_consumer) = mpsc::channel(coins.len());
134
135        let (sender, mut receiver) = mpsc::channel::<(Uuid, IotaAddress, Vec<u64>)>(
136            config.max_request_queue_length as usize,
137        );
138
139        // Split the coins eventually into two pools: one for the gas pool and one for
140        // the batch pool. The batch pool will only be populated if the batch feature is
141        // enabled.
142        let split_point = if config.batch_enabled {
143            if coins.len() > 1 {
144                1 // At least one coin goes to the gas pool the rest to the batch pool
145            } else {
146                0 // Only one coin available, all coins go to the batch pool. This is safe as we have already checked above that `coins` is not empty.
147            }
148        } else {
149            coins.len() // All coins go to the gas pool if batch is disabled
150        };
151
152        for (coins_processed, coin) in coins.iter().enumerate() {
153            let coin_id = *coin.id();
154            if let Some(write_ahead_log::Entry {
155                uuid,
156                recipient,
157                tx,
158                retry_count: _,
159                in_flight: _,
160            }) = wal.reclaim(coin_id).map_err(FaucetError::internal)?
161            {
162                let uuid = Uuid::from_bytes(uuid);
163                info!(?uuid, ?recipient, ?coin_id, "Retrying txn from WAL.");
164                pending.push((uuid, recipient, coin_id, tx));
165            } else if coins_processed < split_point {
166                producer
167                    .send(coin_id)
168                    .await
169                    .tap_ok(|_| {
170                        info!(?coin_id, "Adding coin to gas pool");
171                        metrics.total_available_coins.inc();
172                    })
173                    .tap_err(|e| error!(?coin_id, "Failed to add coin to gas pools: {e:?}"))
174                    .unwrap();
175            } else {
176                batch_producer
177                    .send(coin_id)
178                    .await
179                    .tap_ok(|_| {
180                        info!(?coin_id, "Adding coin to batch gas pool");
181                        metrics.total_available_coins.inc();
182                    })
183                    .tap_err(|e| error!(?coin_id, "Failed to add coin to batch gas pools: {e:?}"))
184                    .unwrap();
185            }
186        }
187        let (batch_transfer_shutdown, mut rx_batch_transfer_shutdown) = oneshot::channel();
188
189        let faucet = Self {
190            wallet,
191            active_address,
192            producer: Mutex::new(producer),
193            consumer: Mutex::new(consumer),
194            batch_producer: Mutex::new(batch_producer),
195            batch_consumer: Mutex::new(batch_consumer),
196            metrics,
197            wal: Mutex::new(wal),
198            request_producer: sender,
199            batch_request_size: config.batch_request_size,
200            // Max faucet requests times 10 minutes worth of requests to hold onto at max.
201            // Note that the cache holds onto a Uuid for [ttl_expiration] in from every update in
202            // status with both INPROGRESS and SUCCEEDED
203            task_id_cache: TtlCache::new(config.max_request_per_second as usize * 60 * 10).into(),
204            ttl_expiration: config.ttl_expiration,
205            coin_amount: config.amount,
206            batch_transfer_shutdown: parking_lot::Mutex::new(Some(batch_transfer_shutdown)),
207        };
208
209        let arc_faucet = Arc::new(faucet);
210        let batch_clone = Arc::downgrade(&arc_faucet);
211        spawn_monitored_task!(async move {
212            info!("Starting task to handle batch faucet requests.");
213            loop {
214                match batch_transfer_gases(
215                    &batch_clone,
216                    &mut receiver,
217                    &mut rx_batch_transfer_shutdown,
218                )
219                .await
220                {
221                    Ok(response) => {
222                        if response == TransactionDigest::ZERO {
223                            info!("Batch transfer incomplete due to faucet shutting down.");
224                        } else {
225                            info!(
226                                "Batch transfer completed with transaction digest: {:?}",
227                                response
228                            );
229                        }
230                    }
231                    Err(err) => {
232                        error!("{:?}", err);
233                    }
234                }
235            }
236        });
237        // Retrying all the pending transactions from the WAL, before continuing.
238        // Ignore return values -- if the executions failed, the pending coins
239        // will simply remain in the WAL, and not recycled.
240        futures::future::join_all(pending.into_iter().map(|(uuid, recipient, coin_id, tx)| {
241            arc_faucet.sign_and_execute_txn(uuid, recipient, coin_id, tx, false)
242        }))
243        .await;
244
245        Ok(arc_faucet)
246    }
247
248    /// Take the consumer lock and pull a Coin ID from the queue, without
249    /// checking whether it is valid or not.
250    async fn pop_gas_coin(&self, uuid: Uuid) -> Option<ObjectID> {
251        // If the gas candidate queue is exhausted, the request will be suspended
252        // indefinitely until a producer puts in more candidate gas objects. At
253        // the same time, other requests will be blocked by the lock acquisition
254        // as well.
255        let Ok(mut consumer) = tokio::time::timeout(LOCK_TIMEOUT, self.consumer.lock()).await
256        else {
257            error!(?uuid, "Timeout when getting consumer lock");
258            return None;
259        };
260
261        info!(?uuid, "Got consumer lock, pulling coins.");
262        let Ok(coin) = tokio::time::timeout(RECV_TIMEOUT, consumer.recv()).await else {
263            error!(?uuid, "Timeout when getting gas coin from the queue");
264            return None;
265        };
266
267        let Some(coin) = coin else {
268            unreachable!("channel is closed");
269        };
270
271        self.metrics.total_available_coins.dec();
272        Some(coin)
273    }
274
275    /// Take the consumer lock and pull a Coin ID from the queue, without
276    /// checking whether it is valid or not.
277    async fn pop_gas_coin_for_batch(&self, uuid: Uuid) -> Option<ObjectID> {
278        // If the gas candidate queue is exhausted, the request will be suspended
279        // indefinitely until a producer puts in more candidate gas objects. At
280        // the same time, other requests will be blocked by the lock acquisition
281        // as well.
282        let Ok(mut batch_consumer) =
283            tokio::time::timeout(LOCK_TIMEOUT, self.batch_consumer.lock()).await
284        else {
285            error!(?uuid, "Timeout when getting batch consumer lock");
286            return None;
287        };
288
289        info!(?uuid, "Got consumer lock, pulling coins.");
290        let Ok(coin) = tokio::time::timeout(RECV_TIMEOUT, batch_consumer.recv()).await else {
291            error!(?uuid, "Timeout when getting gas coin from the queue");
292            return None;
293        };
294
295        let Some(coin) = coin else {
296            unreachable!("channel is closed");
297        };
298
299        self.metrics.total_available_coins.dec();
300        Some(coin)
301    }
302
303    /// Pulls a coin from the queue and makes sure it is fit for use (belongs to
304    /// the faucet, has sufficient balance).
305    async fn prepare_gas_coin(
306        &self,
307        total_amount: u64,
308        uuid: Uuid,
309        for_batch: bool,
310    ) -> GasCoinResponse {
311        let coin_id = if for_batch {
312            self.pop_gas_coin_for_batch(uuid).await
313        } else {
314            self.pop_gas_coin(uuid).await
315        };
316
317        let Some(coin_id) = coin_id else {
318            warn!("Failed getting gas coin, try later!");
319            return GasCoinResponse::NoGasCoinAvailable;
320        };
321
322        match self.get_gas_coin_and_check_faucet_owner(coin_id).await {
323            Ok(Some(gas_coin)) if gas_coin.value() >= total_amount => {
324                info!(?uuid, ?coin_id, "balance: {}", gas_coin.value());
325                GasCoinResponse::ValidGasCoin(coin_id)
326            }
327
328            Ok(Some(_)) => {
329                info!(?uuid, ?coin_id, "insufficient balance",);
330                GasCoinResponse::GasCoinWithInsufficientBalance(coin_id)
331            }
332
333            Ok(None) => {
334                info!(?uuid, ?coin_id, "No gas coin returned.",);
335                GasCoinResponse::InvalidGasCoin(coin_id)
336            }
337
338            Err(e) => {
339                error!(?uuid, ?coin_id, "Fullnode read error: {e:?}");
340                GasCoinResponse::UnknownGasCoin(coin_id)
341            }
342        }
343    }
344
345    /// Check if the gas coin is still valid. A valid gas coin
346    /// 1. Exists presently
347    /// 2. is a gas coin
348    ///
349    /// If the coin is valid, return Ok(Some(GasCoin))
350    /// If the coin invalid, return Ok(None)
351    /// If the fullnode returns an unexpected error, returns Err(e)
352    async fn get_coin(
353        &self,
354        coin_id: ObjectID,
355    ) -> anyhow::Result<Option<(Option<Owner>, GasCoin)>> {
356        let client = self.wallet.get_client().await?;
357        let gas_obj = client
358            .read_api()
359            .get_object_with_options(
360                coin_id,
361                IotaObjectDataOptions::new()
362                    .with_type()
363                    .with_owner()
364                    .with_content(),
365            )
366            .await?;
367        let o = gas_obj.data;
368        if let Some(o) = o {
369            Ok(GasCoin::try_from(&o).ok().map(|coin| (o.owner, coin)))
370        } else {
371            Ok(None)
372        }
373    }
374
375    /// Similar to get_coin but checks that the owner is the active
376    /// faucet address. If the coin exists, but does not have the correct owner,
377    /// returns None
378    async fn get_gas_coin_and_check_faucet_owner(
379        &self,
380        coin_id: ObjectID,
381    ) -> anyhow::Result<Option<GasCoin>> {
382        let gas_obj = self.get_coin(coin_id).await?;
383        info!(?coin_id, "Reading gas coin object: {:?}", gas_obj);
384        Ok(gas_obj.and_then(|(owner_opt, coin)| match owner_opt {
385            Some(Owner::AddressOwner(owner_addr)) if owner_addr == self.active_address => {
386                Some(coin)
387            }
388            _ => None,
389        }))
390    }
391
392    /// Clear the WAL list in the faucet
393    pub async fn retry_wal_coins(&self) -> Result<(), FaucetError> {
394        let mut wal = self.wal.lock().await;
395        let mut pending = vec![];
396
397        for item in wal.log.safe_iter() {
398            // Safe unwrap as we are the only ones that ever add to the WAL.
399            let (coin_id, entry) = item.unwrap();
400            let uuid = Uuid::from_bytes(entry.uuid);
401            if !entry.in_flight {
402                pending.push((uuid, entry.recipient, coin_id, entry.tx));
403            }
404        }
405
406        for (_, _, coin_id, _) in &pending {
407            wal.increment_retry_count(*coin_id)
408                .map_err(FaucetError::internal)?;
409            wal.set_in_flight(*coin_id, true)
410                .map_err(FaucetError::internal)?;
411        }
412
413        info!("Retrying WAL of length: {:?}", pending.len());
414        // Drops the lock early because sign_and_execute_txn requires the lock.
415        drop(wal);
416
417        futures::future::join_all(pending.into_iter().map(|(uuid, recipient, coin_id, tx)| {
418            self.sign_and_execute_txn(uuid, recipient, coin_id, tx, false)
419        }))
420        .await;
421
422        Ok(())
423    }
424
425    /// Sign an already created transaction (in `tx_data`) and keep trying to
426    /// execute it until fullnode returns a definite response or a timeout
427    /// is hit.
428    async fn sign_and_execute_txn(
429        &self,
430        uuid: Uuid,
431        recipient: IotaAddress,
432        coin_id: ObjectID,
433        tx_data: TransactionData,
434        for_batch: bool,
435    ) -> Result<IotaTransactionBlockResponse, FaucetError> {
436        let signature = self
437            .wallet
438            .config()
439            .keystore()
440            .sign_secure(&self.active_address, &tx_data, Intent::iota_transaction())
441            .map_err(FaucetError::internal)?;
442        let tx = Transaction::from_data(tx_data, vec![signature]);
443        let tx_digest = *tx.digest();
444        info!(
445            ?tx_digest,
446            ?recipient,
447            ?coin_id,
448            ?uuid,
449            "PayIota transaction in faucet."
450        );
451
452        match timeout(
453            Duration::from_secs(300),
454            self.execute_pay_iota_txn_with_retries(&tx, coin_id, recipient, uuid),
455        )
456        .await
457        {
458            Err(elapsed) => {
459                warn!(
460                    ?recipient,
461                    ?coin_id,
462                    ?uuid,
463                    "Failed to execute PayIota transactions in faucet after {elapsed}. Coin will \
464                     not be reused."
465                );
466
467                // We set the inflight status to false so that the async thread that
468                // retries this transactions will attempt to try again.
469                // We should only set this inflight if we see that it's not a client error
470                if let Err(err) = self.wal.lock().await.set_in_flight(coin_id, false) {
471                    error!(
472                        ?recipient,
473                        ?coin_id,
474                        ?uuid,
475                        "Failed to set coin in flight status in WAL: {:?}",
476                        err
477                    );
478                }
479
480                Err(FaucetError::Transfer(
481                    "could not complete transfer within timeout".into(),
482                ))
483            }
484
485            Ok(result) => {
486                // Note: we do not recycle gas unless the transaction was successful - the
487                // faucet may run out of available coins due to errors, but this
488                // allows a human to intervene and attempt to fix things. If we
489                // re-use coins that had errors, we may lock them permanently.
490
491                // It's important to remove the coin from the WAL before recycling it, to avoid
492                // a race with the next request served with this coin.  If this
493                // operation fails, log it and continue so we don't lose access
494                // to the coin -- the worst that can happen is that the WAL
495                // contains a stale entry.
496                if self.wal.lock().await.commit(coin_id).is_err() {
497                    error!(?coin_id, "Failed to remove coin from WAL");
498                }
499                if for_batch {
500                    self.recycle_gas_coin_for_batch(coin_id, uuid).await;
501                } else {
502                    self.recycle_gas_coin(coin_id, uuid).await;
503                }
504                Ok(result)
505            }
506        }
507    }
508
509    #[async_recursion]
510    async fn transfer_gases(
511        &self,
512        amounts: &[u64],
513        recipient: IotaAddress,
514        uuid: Uuid,
515    ) -> Result<(TransactionDigest, Vec<ObjectID>), FaucetError> {
516        let number_of_coins = amounts.len();
517        let total_amount: u64 = amounts.iter().sum();
518        let gas_cost = self.get_gas_cost().await?;
519
520        let gas_coin_response = self
521            .prepare_gas_coin(total_amount + gas_cost, uuid, false)
522            .await;
523        match gas_coin_response {
524            GasCoinResponse::ValidGasCoin(coin_id) => {
525                let tx_data = self
526                    .build_pay_iota_txn(coin_id, self.active_address, recipient, amounts, gas_cost)
527                    .await
528                    .map_err(FaucetError::internal)?;
529
530                {
531                    // Register the intention to send this transaction before we send it, so that if
532                    // faucet fails or we give up before we get a definite response, we have a
533                    // chance to retry later.
534                    let mut wal = self.wal.lock().await;
535                    wal.reserve(uuid, coin_id, recipient, tx_data.clone())
536                        .map_err(FaucetError::internal)?;
537                }
538                let response = self
539                    .sign_and_execute_txn(uuid, recipient, coin_id, tx_data, false)
540                    .await?;
541                self.metrics.total_coin_requests_succeeded.inc();
542                self.check_and_map_transfer_gas_result(response, number_of_coins, recipient)
543                    .await
544            }
545
546            GasCoinResponse::UnknownGasCoin(coin_id) => {
547                self.recycle_gas_coin(coin_id, uuid).await;
548                Err(FaucetError::FullnodeReading(format!(
549                    "unknown gas coin {coin_id:?}"
550                )))
551            }
552
553            GasCoinResponse::GasCoinWithInsufficientBalance(coin_id) => {
554                warn!(?uuid, ?coin_id, "Insufficient balance, removing from pool");
555                self.metrics.total_discarded_coins.inc();
556                self.transfer_gases(amounts, recipient, uuid).await
557            }
558
559            GasCoinResponse::InvalidGasCoin(coin_id) => {
560                // The coin does not exist, or does not belong to the current active address.
561                warn!(?uuid, ?coin_id, "Invalid, removing from pool");
562                self.metrics.total_discarded_coins.inc();
563                self.transfer_gases(amounts, recipient, uuid).await
564            }
565
566            GasCoinResponse::NoGasCoinAvailable => Err(FaucetError::NoGasCoinAvailable),
567        }
568    }
569
570    async fn recycle_gas_coin(&self, coin_id: ObjectID, uuid: Uuid) {
571        // Once transactions are done, in despite of success or failure,
572        // we put back the coins. The producer should never wait indefinitely,
573        // in that the channel is initialized with big enough capacity.
574        let producer = self.producer.lock().await;
575        info!(?uuid, ?coin_id, "Got producer lock and recycling coin");
576        producer
577            .try_send(coin_id)
578            .expect("unexpected - queue is large enough to hold all coins");
579        self.metrics.total_available_coins.inc();
580        info!(?uuid, ?coin_id, "Recycled coin");
581    }
582
583    async fn recycle_gas_coin_for_batch(&self, coin_id: ObjectID, uuid: Uuid) {
584        // Once transactions are done, in despite of success or failure,
585        // we put back the coins. The producer should never wait indefinitely,
586        // in that the channel is initialized with big enough capacity.
587        let batch_producer = self.batch_producer.lock().await;
588        info!(?uuid, ?coin_id, "Got producer lock and recycling coin");
589        batch_producer
590            .try_send(coin_id)
591            .expect("unexpected - queue is large enough to hold all coins");
592        self.metrics.total_available_coins.inc();
593        info!(?uuid, ?coin_id, "Recycled coin");
594    }
595
596    async fn execute_pay_iota_txn_with_retries(
597        &self,
598        tx: &Transaction,
599        coin_id: ObjectID,
600        recipient: IotaAddress,
601        uuid: Uuid,
602    ) -> IotaTransactionBlockResponse {
603        let mut retry_delay = Duration::from_millis(500);
604
605        loop {
606            let res = self
607                .execute_pay_iota_txn(tx, coin_id, recipient, uuid)
608                .await;
609
610            if let Ok(res) = res {
611                return res;
612            }
613
614            info!(
615                ?recipient,
616                ?coin_id,
617                ?uuid,
618                ?retry_delay,
619                "PayIota transaction in faucet failed, previous error: {:?}",
620                &res,
621            );
622
623            tokio::time::sleep(retry_delay).await;
624            retry_delay *= 2;
625        }
626    }
627
628    async fn execute_pay_iota_txn(
629        &self,
630        tx: &Transaction,
631        coin_id: ObjectID,
632        recipient: IotaAddress,
633        uuid: Uuid,
634    ) -> Result<IotaTransactionBlockResponse, anyhow::Error> {
635        self.metrics.current_executions_in_flight.inc();
636        let _metrics_guard = scopeguard::guard(self.metrics.clone(), |metrics| {
637            metrics.current_executions_in_flight.dec();
638        });
639
640        let tx_digest = tx.digest();
641        let client = self.wallet.get_client().await?;
642        Ok(client
643            .quorum_driver_api()
644            .execute_transaction_block(
645                tx.clone(),
646                IotaTransactionBlockResponseOptions::new().with_effects(),
647                Some(ExecuteTransactionRequestType::WaitForLocalExecution),
648            )
649            .await
650            .tap_err(|e| {
651                error!(
652                    ?tx_digest,
653                    ?recipient,
654                    ?coin_id,
655                    ?uuid,
656                    "Transfer Transaction failed: {:?}",
657                    e
658                )
659            })?)
660    }
661
662    async fn get_gas_cost(&self) -> Result<u64, FaucetError> {
663        let gas_price = self.get_gas_price().await?;
664        Ok(gas_price * DEFAULT_GAS_COMPUTATION_BUCKET)
665    }
666
667    async fn get_gas_price(&self) -> Result<u64, FaucetError> {
668        let client = self
669            .wallet
670            .get_client()
671            .await
672            .map_err(|e| FaucetError::Wallet(format!("Unable to get client: {e:?}")))?;
673        client
674            .read_api()
675            .get_reference_gas_price()
676            .await
677            .map_err(|e| FaucetError::FullnodeReading(format!("Error fetch gas price {e:?}")))
678    }
679
680    async fn build_pay_iota_txn(
681        &self,
682        coin_id: ObjectID,
683        signer: IotaAddress,
684        recipient: IotaAddress,
685        amounts: &[u64],
686        budget: u64,
687    ) -> Result<TransactionData, anyhow::Error> {
688        let recipients = vec![recipient; amounts.len()];
689        let client = self.wallet.get_client().await?;
690        client
691            .transaction_builder()
692            .pay_iota(signer, vec![coin_id], recipients, amounts.to_vec(), budget)
693            .await
694            .map_err(|e| {
695                anyhow::anyhow!(
696                    "Failed to build PayIota transaction for coin {:?}, with err {:?}",
697                    coin_id,
698                    e
699                )
700            })
701    }
702
703    async fn check_and_map_transfer_gas_result(
704        &self,
705        res: IotaTransactionBlockResponse,
706        number_of_coins: usize,
707        recipient: IotaAddress,
708    ) -> Result<(TransactionDigest, Vec<ObjectID>), FaucetError> {
709        let created = res
710            .effects
711            .ok_or_else(|| {
712                FaucetError::ParseTransactionResponse(format!(
713                    "effects field missing for txn {}",
714                    res.digest
715                ))
716            })?
717            .created()
718            .to_vec();
719        if created.len() != number_of_coins {
720            return Err(FaucetError::CoinAmountTransferredIncorrect(format!(
721                "PayIota Transaction should create exact {:?} new coins, but got {:?}",
722                number_of_coins, created
723            )));
724        }
725        assert!(
726            created
727                .iter()
728                .all(|created_coin_owner_ref| created_coin_owner_ref.owner == recipient)
729        );
730        let coin_ids: Vec<ObjectID> = created
731            .iter()
732            .map(|created_coin_owner_ref| created_coin_owner_ref.reference.object_id)
733            .collect();
734        Ok((res.digest, coin_ids))
735    }
736
737    async fn build_batch_pay_iota_txn(
738        &self,
739        coin_id: ObjectID,
740        batch_requests: Vec<(Uuid, IotaAddress, Vec<u64>)>,
741        signer: IotaAddress,
742        budget: u64,
743    ) -> Result<TransactionData, anyhow::Error> {
744        let gas_payment = self.wallet.get_object_ref(coin_id).await?;
745        let gas_price = self.wallet.get_reference_gas_price().await?;
746        // TODO (Jian): change to make this more efficient by changing impl to one
747        // Splitcoin, and many TransferObjects
748        let pt = {
749            let mut builder = ProgrammableTransactionBuilder::new();
750            for (_uuid, recipient, amounts) in batch_requests {
751                let recipients = vec![recipient; amounts.len()];
752                builder.pay_iota(recipients, amounts)?;
753            }
754            builder.finish()
755        };
756
757        Ok(TransactionData::new_programmable(
758            signer,
759            vec![gas_payment],
760            pt,
761            budget,
762            gas_price,
763        ))
764    }
765
766    async fn check_and_map_batch_transfer_gas_result(
767        &self,
768        res: IotaTransactionBlockResponse,
769        requests: Vec<(Uuid, IotaAddress, Vec<u64>)>,
770    ) -> Result<(), FaucetError> {
771        // Grab the list of created coins and turn it into a map of destination
772        // IotaAddress to Vec<Coins>
773        let created = res
774            .effects
775            .ok_or_else(|| {
776                FaucetError::ParseTransactionResponse(format!(
777                    "effects field missing for txn {}",
778                    res.digest
779                ))
780            })?
781            .created()
782            .to_vec();
783
784        let mut address_coins_map: HashMap<IotaAddress, Vec<OwnedObjectRef>> = HashMap::new();
785        created.iter().for_each(|created_coin_owner_ref| {
786            let owner = created_coin_owner_ref.owner;
787            let coin_obj_ref = created_coin_owner_ref.clone();
788
789            // Insert the coins into the map based on the destination address
790            address_coins_map
791                .entry(owner.get_owner_address().unwrap())
792                .or_default()
793                .push(coin_obj_ref);
794        });
795
796        // Assert that the number of times a iota_address occurs is the number of times
797        // the coins come up in the vector.
798        let mut request_count: HashMap<IotaAddress, u64> = HashMap::new();
799        // Acquire lock and update all of the request Uuids
800        let mut task_map = self.task_id_cache.lock().await;
801        for (uuid, addy, amounts) in requests {
802            let number_of_coins = amounts.len();
803            // Get or insert iota_address into request count
804            let index = *request_count.entry(addy).or_insert(0);
805
806            // The address coin map should contain the coins transferred in the given
807            // request.
808            let coins_created_for_address = address_coins_map.entry(addy).or_default();
809
810            if number_of_coins as u64 + index > coins_created_for_address.len() as u64 {
811                return Err(FaucetError::CoinAmountTransferredIncorrect(format!(
812                    "PayIota Transaction should create exact {:?} new coins, but got {:?}",
813                    number_of_coins as u64 + index,
814                    coins_created_for_address.len()
815                )));
816            }
817            let coins_slice =
818                &mut coins_created_for_address[index as usize..(index as usize + number_of_coins)];
819
820            request_count.insert(addy, number_of_coins as u64 + index);
821
822            // We can safely zip up `coins_slice` and `amounts` without truncating either
823            // slice, since both are checked to have the same length, according
824            // to the above code.
825            let transferred_gases = coins_slice
826                .iter()
827                .zip(amounts)
828                .map(|(coin, amount)| CoinInfo {
829                    id: coin.object_id(),
830                    transfer_tx_digest: res.digest,
831                    amount,
832                })
833                .collect();
834
835            task_map.insert(
836                uuid,
837                BatchSendStatus {
838                    status: BatchSendStatusType::SUCCEEDED,
839                    transferred_gas_objects: Some(FaucetReceipt {
840                        sent: transferred_gases,
841                    }),
842                },
843                Duration::from_secs(self.ttl_expiration),
844            );
845        }
846
847        // We use a separate map to figure out which index should correlate to the
848        Ok(())
849    }
850
851    #[cfg(test)]
852    pub(crate) fn shutdown_batch_send_task(&self) {
853        self.batch_transfer_shutdown
854            .lock()
855            .take()
856            .unwrap()
857            .send(())
858            .unwrap();
859    }
860
861    #[cfg(test)]
862    pub fn wallet_mut(&mut self) -> &mut WalletContext {
863        &mut self.wallet
864    }
865
866    #[cfg(test)]
867    pub fn teardown(self) -> WalletContext {
868        self.wallet
869    }
870
871    #[cfg(test)]
872    async fn drain_gas_queue(&mut self, expected_gas_count: usize) -> HashSet<ObjectID> {
873        use tokio::sync::mpsc::error::TryRecvError;
874        let mut consumer = self.consumer.lock().await;
875        let mut candidates = HashSet::new();
876        let mut i = 0;
877        loop {
878            let coin_id = consumer
879                .try_recv()
880                .unwrap_or_else(|e| panic!("Expect the {}th candidate but got {}", i, e));
881            candidates.insert(coin_id);
882            i += 1;
883            if i == expected_gas_count {
884                assert_eq!(consumer.try_recv().unwrap_err(), TryRecvError::Empty);
885                break;
886            }
887        }
888        candidates
889    }
890}
891
892#[async_trait]
893impl Faucet for SimpleFaucet {
894    async fn send(
895        &self,
896        id: Uuid,
897        recipient: IotaAddress,
898        amounts: &[u64],
899    ) -> Result<FaucetReceipt, FaucetError> {
900        info!(?recipient, uuid = ?id, ?amounts, "Getting faucet requests");
901
902        let (digest, coin_ids) = self.transfer_gases(amounts, recipient, id).await?;
903
904        info!(uuid = ?id, ?recipient, ?digest, "PayIota txn succeeded");
905        let mut sent = Vec::with_capacity(coin_ids.len());
906        let coin_results =
907            futures::future::join_all(coin_ids.iter().map(|coin_id| self.get_coin(*coin_id))).await;
908        for (coin_id, res) in coin_ids.into_iter().zip(coin_results) {
909            let amount = if let Ok(Some((_, coin))) = res {
910                coin.value()
911            } else {
912                info!(
913                    ?recipient,
914                    ?coin_id,
915                    uuid = ?id,
916                    "Could not find coin after successful transaction, error: {:?}",
917                    &res,
918                );
919                0
920            };
921            sent.push(CoinInfo {
922                transfer_tx_digest: digest,
923                amount,
924                id: coin_id,
925            });
926        }
927
928        // Store into status map that the txn was successful for backwards compatibility
929        let faucet_receipt = FaucetReceipt { sent };
930        let mut task_map = self.task_id_cache.lock().await;
931        task_map.insert(
932            id,
933            BatchSendStatus {
934                status: BatchSendStatusType::SUCCEEDED,
935                transferred_gas_objects: Some(faucet_receipt.clone()),
936            },
937            Duration::from_secs(self.ttl_expiration),
938        );
939
940        Ok(faucet_receipt)
941    }
942
943    async fn batch_send(
944        &self,
945        id: Uuid,
946        recipient: IotaAddress,
947        amounts: &[u64],
948    ) -> Result<BatchFaucetReceipt, FaucetError> {
949        info!(?recipient, uuid = ?id, "Getting faucet request");
950        if self
951            .request_producer
952            .try_send((id, recipient, amounts.to_vec()))
953            .is_err()
954        {
955            return Err(FaucetError::BatchSendQueueFull);
956        }
957
958        let mut task_map = self.task_id_cache.lock().await;
959        task_map.insert(
960            id,
961            BatchSendStatus {
962                status: BatchSendStatusType::INPROGRESS,
963                transferred_gas_objects: None,
964            },
965            Duration::from_secs(self.ttl_expiration),
966        );
967        Ok(BatchFaucetReceipt {
968            task: id.to_string(),
969        })
970    }
971
972    async fn get_batch_send_status(&self, task_id: Uuid) -> Result<BatchSendStatus, FaucetError> {
973        let task_map = self.task_id_cache.lock().await;
974        match task_map.get(&task_id) {
975            Some(status) => Ok(status.clone()),
976            None => Err(FaucetError::Internal("task id not found".to_string())),
977        }
978    }
979}
980
981pub async fn batch_gather(
982    request_consumer: &mut Receiver<(Uuid, IotaAddress, Vec<u64>)>,
983    requests: &mut Vec<(Uuid, IotaAddress, Vec<u64>)>,
984    batch_request_size: u64,
985) -> Result<(), FaucetError> {
986    // Gather the rest of the batch after the first item has been taken.
987    for _ in 1..batch_request_size {
988        let Some(req) = request_consumer.recv().await else {
989            error!("Request consumer queue closed");
990            return Err(FaucetError::ChannelClosed);
991        };
992
993        requests.push(req);
994    }
995
996    Ok(())
997}
998
999// Function to process the batch send of the mcsp queue
1000pub async fn batch_transfer_gases(
1001    weak_faucet: &Weak<SimpleFaucet>,
1002    request_consumer: &mut Receiver<(Uuid, IotaAddress, Vec<u64>)>,
1003    rx_batch_transfer_shutdown: &mut oneshot::Receiver<()>,
1004) -> Result<TransactionDigest, FaucetError> {
1005    let mut requests = Vec::new();
1006
1007    tokio::select! {
1008        first_req = request_consumer.recv() => {
1009            if let Some((uuid, address, amounts)) = first_req {
1010                requests.push((uuid, address, amounts));
1011            } else {
1012                // Should only happen after the Faucet has shut down
1013                info!("No more faucet requests will be received. Exiting batch faucet task ...");
1014                return Ok(TransactionDigest::ZERO);
1015            };
1016        }
1017        _ = rx_batch_transfer_shutdown => {
1018            info!("Shutdown signal received. Exiting faucet ...");
1019            return Ok(TransactionDigest::ZERO);
1020        }
1021    };
1022
1023    let Some(faucet) = weak_faucet.upgrade() else {
1024        info!("Faucet has shut down already. Exiting ...");
1025        return Ok(TransactionDigest::ZERO);
1026    };
1027
1028    if timeout(
1029        BATCH_TIMEOUT,
1030        batch_gather(request_consumer, &mut requests, faucet.batch_request_size),
1031    )
1032    .await
1033    .is_err()
1034    {
1035        info!("Batch timeout elapsed while waiting.");
1036    };
1037
1038    let total_requests = requests.len();
1039    let gas_cost = faucet.get_gas_cost().await?;
1040    // The UUID here is for the batched request
1041    let uuid = Uuid::new_v4();
1042    info!(
1043        ?uuid,
1044        "Batch transfer attempted of size: {:?}", total_requests
1045    );
1046    let total_iota_needed: u64 = requests.iter().flat_map(|(_, _, amounts)| amounts).sum();
1047
1048    // This loop is utilized to grab a coin that is large enough for the request
1049    loop {
1050        let gas_coin_response = faucet
1051            .prepare_gas_coin(total_iota_needed + gas_cost, uuid, true)
1052            .await;
1053
1054        match gas_coin_response {
1055            GasCoinResponse::ValidGasCoin(coin_id) => {
1056                let tx_data = faucet
1057                    .build_batch_pay_iota_txn(
1058                        coin_id,
1059                        requests.clone(),
1060                        faucet.active_address,
1061                        gas_cost,
1062                    )
1063                    .await
1064                    .map_err(FaucetError::internal)?;
1065
1066                // Because we are batching transactions to faucet, we will just not use a real
1067                // recipient for iota address, and instead just fill it with the
1068                // ZERO address.
1069                let recipient = IotaAddress::ZERO;
1070                {
1071                    // Register the intention to send this transaction before we send it, so that if
1072                    // faucet fails or we give up before we get a definite response, we have a
1073                    // chance to retry later.
1074                    let mut wal = faucet.wal.lock().await;
1075                    wal.reserve(uuid, coin_id, recipient, tx_data.clone())
1076                        .map_err(FaucetError::internal)?;
1077                }
1078                let response = faucet
1079                    .sign_and_execute_txn(uuid, recipient, coin_id, tx_data, true)
1080                    .await?;
1081
1082                faucet
1083                    .metrics
1084                    .total_coin_requests_succeeded
1085                    .add(total_requests as i64);
1086
1087                faucet
1088                    .check_and_map_batch_transfer_gas_result(response.clone(), requests)
1089                    .await?;
1090
1091                return Ok(response.digest);
1092            }
1093
1094            GasCoinResponse::UnknownGasCoin(coin_id) => {
1095                // Continue the loop to retry preparing the gas coin
1096                warn!(?uuid, ?coin_id, "unknown gas coin.");
1097                faucet.metrics.total_discarded_coins.inc();
1098                continue;
1099            }
1100
1101            GasCoinResponse::GasCoinWithInsufficientBalance(coin_id) => {
1102                warn!(?uuid, ?coin_id, "Insufficient balance, removing from pool");
1103                faucet.metrics.total_discarded_coins.inc();
1104                // Continue the loop to retry preparing the gas coin
1105                continue;
1106            }
1107
1108            GasCoinResponse::InvalidGasCoin(coin_id) => {
1109                // The coin does not exist, or does not belong to the current active address.
1110                warn!(?uuid, ?coin_id, "Invalid, removing from pool");
1111                faucet.metrics.total_discarded_coins.inc();
1112                // Continue the loop to retry preparing the gas coin
1113                continue;
1114            }
1115
1116            GasCoinResponse::NoGasCoinAvailable => return Err(FaucetError::NoGasCoinAvailable),
1117        }
1118    }
1119}
1120
1121#[cfg(test)]
1122mod tests {
1123    use anyhow::*;
1124    use iota_json_rpc_types::{IotaExecutionStatus, IotaTransactionBlockEffects};
1125    use iota_sdk::wallet_context::WalletContext;
1126    use iota_types::transaction::{SenderSignedData, TransactionDataAPI};
1127    use shared_crypto::intent::Intent;
1128    use test_cluster::TestClusterBuilder;
1129
1130    use super::*;
1131
1132    async fn execute_tx(
1133        ctx: &mut WalletContext,
1134        tx_data: TransactionData,
1135    ) -> Result<IotaTransactionBlockEffects, anyhow::Error> {
1136        let signature = ctx.config().keystore().sign_secure(
1137            &tx_data.sender(),
1138            &tx_data,
1139            Intent::iota_transaction(),
1140        )?;
1141        let sender_signed_data = SenderSignedData::new_from_sender_signature(tx_data, signature);
1142        let transaction = Transaction::new(sender_signed_data);
1143        let response = ctx.execute_transaction_may_fail(transaction).await?;
1144        let result_effects = response.clone().effects;
1145
1146        if let Some(effects) = result_effects {
1147            if matches!(effects.status(), IotaExecutionStatus::Failure { .. }) {
1148                Err(anyhow!(
1149                    "Error executing transaction: {:#?}",
1150                    effects.status()
1151                ))
1152            } else {
1153                Ok(effects)
1154            }
1155        } else {
1156            Err(anyhow!(
1157                "Effects from IotaTransactionBlockResult should not be empty"
1158            ))
1159        }
1160    }
1161
1162    #[tokio::test]
1163    async fn simple_faucet_basic_interface_should_work() {
1164        telemetry_subscribers::init_for_testing();
1165        let test_cluster = TestClusterBuilder::new().build().await;
1166        let tmp = tempfile::tempdir().unwrap();
1167        let prom_registry = Registry::new();
1168        let config = FaucetConfig::default();
1169
1170        let address = test_cluster.get_address_0();
1171        let mut context = test_cluster.wallet;
1172        let gas_coins = context
1173            .get_all_gas_objects_owned_by_address(address)
1174            .await
1175            .unwrap();
1176        let client = context.get_client().await.unwrap();
1177        let tx_kind = client
1178            .transaction_builder()
1179            .split_coin_tx_kind(gas_coins.first().unwrap().0, None, Some(10))
1180            .await
1181            .unwrap();
1182        let gas_budget = 50_000_000;
1183        let rgp = context.get_reference_gas_price().await.unwrap();
1184        let tx_data = client
1185            .transaction_builder()
1186            .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1187            .await
1188            .unwrap();
1189
1190        execute_tx(&mut context, tx_data).await.unwrap();
1191
1192        let faucet = SimpleFaucet::new(
1193            context,
1194            &prom_registry,
1195            &tmp.path().join("faucet.wal"),
1196            config,
1197        )
1198        .await
1199        .unwrap();
1200        // faucet.shutdown_batch_send_task();
1201
1202        let faucet = Arc::try_unwrap(faucet).unwrap();
1203
1204        let available = faucet.metrics.total_available_coins.get();
1205        let discarded = faucet.metrics.total_discarded_coins.get();
1206
1207        test_basic_interface(&faucet).await;
1208        test_send_interface_has_success_status(&faucet).await;
1209
1210        assert_eq!(available, faucet.metrics.total_available_coins.get());
1211        assert_eq!(discarded, faucet.metrics.total_discarded_coins.get());
1212    }
1213
1214    #[tokio::test]
1215    async fn test_init_gas_queue() {
1216        let test_cluster = TestClusterBuilder::new().build().await;
1217        let address = test_cluster.get_address_0();
1218        let context = test_cluster.wallet;
1219        let gas_coins = context
1220            .get_all_gas_objects_owned_by_address(address)
1221            .await
1222            .unwrap();
1223        let gas_coins = HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.0));
1224
1225        let tmp = tempfile::tempdir().unwrap();
1226        let prom_registry = Registry::new();
1227        let config = FaucetConfig::default();
1228        let faucet = SimpleFaucet::new(
1229            context,
1230            &prom_registry,
1231            &tmp.path().join("faucet.wal"),
1232            config,
1233        )
1234        .await
1235        .unwrap();
1236        faucet.shutdown_batch_send_task();
1237        let available = faucet.metrics.total_available_coins.get();
1238        let faucet_unwrapped = &mut Arc::try_unwrap(faucet).unwrap();
1239
1240        let candidates = faucet_unwrapped.drain_gas_queue(gas_coins.len()).await;
1241
1242        assert_eq!(available as usize, candidates.len());
1243        assert_eq!(
1244            candidates, gas_coins,
1245            "gases: {:?}, candidates: {:?}",
1246            gas_coins, candidates
1247        );
1248    }
1249
1250    #[tokio::test]
1251    async fn test_transfer_state() {
1252        let test_cluster = TestClusterBuilder::new().build().await;
1253        let address = test_cluster.get_address_0();
1254        let context = test_cluster.wallet;
1255        let gas_coins = context
1256            .get_all_gas_objects_owned_by_address(address)
1257            .await
1258            .unwrap();
1259        let gas_coins = HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.0));
1260
1261        let tmp = tempfile::tempdir().unwrap();
1262        let prom_registry = Registry::new();
1263        let config = FaucetConfig::default();
1264        let faucet = SimpleFaucet::new(
1265            context,
1266            &prom_registry,
1267            &tmp.path().join("faucet.wal"),
1268            config,
1269        )
1270        .await
1271        .unwrap();
1272
1273        let number_of_coins = gas_coins.len();
1274        let amounts = &vec![1; number_of_coins];
1275        let _ = futures::future::join_all((0..number_of_coins).map(|_| {
1276            faucet.send(
1277                Uuid::new_v4(),
1278                IotaAddress::random_for_testing_only(),
1279                amounts,
1280            )
1281        }))
1282        .await
1283        .into_iter()
1284        .map(|res| res.unwrap())
1285        .collect::<Vec<_>>();
1286
1287        // After all transfer requests settle, we still have the original candidates gas
1288        // in queue.
1289        let available = faucet.metrics.total_available_coins.get();
1290        faucet.shutdown_batch_send_task();
1291
1292        let faucet_unwrapped: &mut SimpleFaucet = &mut Arc::try_unwrap(faucet).unwrap();
1293        let candidates = faucet_unwrapped.drain_gas_queue(gas_coins.len()).await;
1294        assert_eq!(available as usize, candidates.len());
1295        assert_eq!(
1296            candidates, gas_coins,
1297            "gases: {:?}, candidates: {:?}",
1298            gas_coins, candidates
1299        );
1300    }
1301
1302    #[tokio::test]
1303    async fn test_batch_transfer_interface() {
1304        let test_cluster = TestClusterBuilder::new().build().await;
1305        let config: FaucetConfig = FaucetConfig {
1306            batch_enabled: true,
1307            ..Default::default()
1308        };
1309        let coin_amount = config.amount;
1310        let prom_registry = Registry::new();
1311        let tmp = tempfile::tempdir().unwrap();
1312        let address = test_cluster.get_address_0();
1313        let mut context = test_cluster.wallet;
1314        let gas_coins = context
1315            .get_all_gas_objects_owned_by_address(address)
1316            .await
1317            .unwrap();
1318        let client = context.get_client().await.unwrap();
1319        let tx_kind = client
1320            .transaction_builder()
1321            .split_coin_tx_kind(gas_coins.first().unwrap().0, None, Some(10))
1322            .await
1323            .unwrap();
1324        let gas_budget = 50_000_000;
1325        let rgp = context.get_reference_gas_price().await.unwrap();
1326        let tx_data = client
1327            .transaction_builder()
1328            .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1329            .await
1330            .unwrap();
1331
1332        execute_tx(&mut context, tx_data).await.unwrap();
1333
1334        let faucet = SimpleFaucet::new(
1335            context,
1336            &prom_registry,
1337            &tmp.path().join("faucet.wal"),
1338            config,
1339        )
1340        .await
1341        .unwrap();
1342
1343        let amounts = &[coin_amount];
1344
1345        // Create a vector containing five randomly generated addresses
1346        let target_addresses: Vec<IotaAddress> = (0..5)
1347            .map(|_| IotaAddress::random_for_testing_only())
1348            .collect();
1349
1350        let response = futures::future::join_all(
1351            target_addresses
1352                .iter()
1353                .map(|address| faucet.batch_send(Uuid::new_v4(), *address, amounts)),
1354        )
1355        .await
1356        .into_iter()
1357        .map(|res| res.unwrap())
1358        .collect::<Vec<BatchFaucetReceipt>>();
1359
1360        // Assert that all of these return in progress
1361        let status_results = futures::future::join_all(
1362            response
1363                .clone()
1364                .iter()
1365                .map(|task| faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())),
1366        )
1367        .await
1368        .into_iter()
1369        .map(|res| res.unwrap())
1370        .collect::<Vec<BatchSendStatus>>();
1371
1372        for status in status_results {
1373            assert_eq!(status.status, BatchSendStatusType::INPROGRESS);
1374        }
1375
1376        let mut status_results;
1377        loop {
1378            // Assert that all of these are SUCCEEDED
1379            status_results =
1380                futures::future::join_all(response.clone().iter().map(|task| {
1381                    faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())
1382                }))
1383                .await
1384                .into_iter()
1385                .map(|res| res.unwrap())
1386                .collect::<Vec<BatchSendStatus>>();
1387
1388            // All requests are submitted and picked up by the same batch, so one success in
1389            // the test will guarantee all success.
1390            if status_results[0].status == BatchSendStatusType::SUCCEEDED {
1391                break;
1392            }
1393            info!(
1394                "Trying to get status again... current is: {:?}",
1395                status_results[0].status
1396            );
1397        }
1398        for status in status_results {
1399            assert_eq!(status.status, BatchSendStatusType::SUCCEEDED);
1400        }
1401    }
1402
1403    #[tokio::test]
1404    async fn test_ttl_cache_expires_after_duration() {
1405        let test_cluster = TestClusterBuilder::new().build().await;
1406        let context = test_cluster.wallet;
1407        // We set it to a fast expiration for the purposes of testing and so these
1408        // requests don't have time to pass through the batch process.
1409        let config = FaucetConfig {
1410            ttl_expiration: 1,
1411            ..Default::default()
1412        };
1413        let prom_registry = Registry::new();
1414        let tmp = tempfile::tempdir().unwrap();
1415        let faucet = SimpleFaucet::new(
1416            context,
1417            &prom_registry,
1418            &tmp.path().join("faucet.wal"),
1419            config,
1420        )
1421        .await
1422        .unwrap();
1423
1424        let amounts = &[1; 1];
1425        // Create a vector containing five randomly generated addresses
1426        let target_addresses: Vec<IotaAddress> = (0..5)
1427            .map(|_| IotaAddress::random_for_testing_only())
1428            .collect();
1429
1430        let response = futures::future::join_all(
1431            target_addresses
1432                .iter()
1433                .map(|address| faucet.batch_send(Uuid::new_v4(), *address, amounts)),
1434        )
1435        .await
1436        .into_iter()
1437        .map(|res| res.unwrap())
1438        .collect::<Vec<BatchFaucetReceipt>>();
1439
1440        // Check that TTL cache expires
1441        tokio::time::sleep(Duration::from_secs(10)).await;
1442        let status_results = futures::future::join_all(
1443            response
1444                .clone()
1445                .iter()
1446                .map(|task| faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())),
1447        )
1448        .await;
1449
1450        let all_errors = status_results.iter().all(Result::is_err);
1451        assert!(all_errors);
1452    }
1453
1454    #[tokio::test]
1455    async fn test_discard_invalid_gas() {
1456        let test_cluster = TestClusterBuilder::new().build().await;
1457        let address = test_cluster.get_address_0();
1458        let context = test_cluster.wallet;
1459        let mut gas_coins = context
1460            .get_all_gas_objects_owned_by_address(address)
1461            .await
1462            .unwrap();
1463
1464        let bad_gas = gas_coins.swap_remove(0);
1465        let gas_coins = HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.0));
1466
1467        let tmp = tempfile::tempdir().unwrap();
1468        let prom_registry = Registry::new();
1469        let config = FaucetConfig::default();
1470
1471        let client = context.get_client().await.unwrap();
1472        let faucet = SimpleFaucet::new(
1473            context,
1474            &prom_registry,
1475            &tmp.path().join("faucet.wal"),
1476            config,
1477        )
1478        .await
1479        .unwrap();
1480        faucet.shutdown_batch_send_task();
1481        let faucet: &mut SimpleFaucet = &mut Arc::try_unwrap(faucet).unwrap();
1482
1483        // Now we transfer one gas out
1484        let gas_budget = 50_000_000;
1485        let tx_data = client
1486            .transaction_builder()
1487            .pay_all_iota(
1488                address,
1489                vec![bad_gas.0],
1490                IotaAddress::random_for_testing_only(),
1491                gas_budget,
1492            )
1493            .await
1494            .unwrap();
1495        execute_tx(faucet.wallet_mut(), tx_data).await.unwrap();
1496
1497        let number_of_coins = gas_coins.len();
1498        let amounts = &vec![1; number_of_coins];
1499        // We traverse the list twice, which must trigger the transferred gas to be
1500        // kicked out
1501        futures::future::join_all((0..2).map(|_| {
1502            faucet.send(
1503                Uuid::new_v4(),
1504                IotaAddress::random_for_testing_only(),
1505                amounts,
1506            )
1507        }))
1508        .await;
1509
1510        // Verify that the bad gas is no longer in the queue.
1511        // Note `gases` does not contain the bad gas.
1512        let available = faucet.metrics.total_available_coins.get();
1513        let discarded = faucet.metrics.total_discarded_coins.get();
1514        let candidates = faucet.drain_gas_queue(gas_coins.len()).await;
1515        assert_eq!(available as usize, candidates.len());
1516        assert_eq!(discarded, 1);
1517        assert_eq!(
1518            candidates, gas_coins,
1519            "gases: {:?}, candidates: {:?}",
1520            gas_coins, candidates
1521        );
1522    }
1523
1524    #[tokio::test]
1525    async fn test_clear_wal() {
1526        telemetry_subscribers::init_for_testing();
1527        let test_cluster = TestClusterBuilder::new().build().await;
1528        let context = test_cluster.wallet;
1529        let tmp = tempfile::tempdir().unwrap();
1530        let prom_registry = Registry::new();
1531        let config = FaucetConfig::default();
1532        let faucet = SimpleFaucet::new(
1533            context,
1534            &prom_registry,
1535            &tmp.path().join("faucet.wal"),
1536            config,
1537        )
1538        .await
1539        .unwrap();
1540
1541        let original_available = faucet.metrics.total_available_coins.get();
1542        let original_discarded = faucet.metrics.total_discarded_coins.get();
1543
1544        let recipient = IotaAddress::random_for_testing_only();
1545        let faucet_address = faucet.active_address;
1546        let uuid = Uuid::new_v4();
1547
1548        let GasCoinResponse::ValidGasCoin(coin_id) =
1549            faucet.prepare_gas_coin(100, uuid, false).await
1550        else {
1551            panic!("prepare_gas_coin did not give a valid coin.")
1552        };
1553
1554        let tx_data = faucet
1555            .build_pay_iota_txn(coin_id, faucet_address, recipient, &[100], 200_000_000)
1556            .await
1557            .map_err(FaucetError::internal)
1558            .unwrap();
1559
1560        let mut wal = faucet.wal.lock().await;
1561
1562        // Check no WAL
1563        assert!(wal.log.is_empty());
1564        wal.reserve(Uuid::new_v4(), coin_id, recipient, tx_data)
1565            .map_err(FaucetError::internal)
1566            .ok();
1567        drop(wal);
1568
1569        // Check WAL is not empty but will not clear because txn is in_flight
1570        faucet.retry_wal_coins().await.ok();
1571        let mut wal = faucet.wal.lock().await;
1572        assert!(!wal.log.is_empty());
1573
1574        // Set in flight to false so WAL will clear
1575        wal.set_in_flight(coin_id, false)
1576            .expect("Unable to set in flight status to false.");
1577        drop(wal);
1578
1579        faucet.retry_wal_coins().await.ok();
1580        let wal = faucet.wal.lock().await;
1581        assert!(wal.log.is_empty());
1582
1583        let total_coins = faucet.metrics.total_available_coins.get();
1584        let discarded_coins = faucet.metrics.total_discarded_coins.get();
1585        assert_eq!(total_coins, original_available);
1586        assert_eq!(discarded_coins, original_discarded);
1587    }
1588
1589    #[tokio::test]
1590    async fn test_discard_smaller_amount_gas() {
1591        telemetry_subscribers::init_for_testing();
1592        let test_cluster = TestClusterBuilder::new().build().await;
1593        let address = test_cluster.get_address_0();
1594        let mut context = test_cluster.wallet;
1595        let gas_coins = context
1596            .get_all_gas_objects_owned_by_address(address)
1597            .await
1598            .unwrap();
1599
1600        // split out a coin that has a very small balance such that
1601        // this coin will be not used later on. This is the new default amount for
1602        // faucet due to gas changes
1603        let config = FaucetConfig::default();
1604        let tiny_value = (config.num_coins as u64 * config.amount) + 1;
1605        let client = context.get_client().await.unwrap();
1606        let tx_kind = client
1607            .transaction_builder()
1608            .split_coin_tx_kind(gas_coins.first().unwrap().0, Some(vec![tiny_value]), None)
1609            .await
1610            .unwrap();
1611        let gas_budget = 50_000_000;
1612        let rgp = context.get_reference_gas_price().await.unwrap();
1613        let tx_data = client
1614            .transaction_builder()
1615            .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1616            .await
1617            .unwrap();
1618
1619        let effects = execute_tx(&mut context, tx_data).await.unwrap();
1620
1621        let tiny_coin_id = effects.created()[0].reference.object_id;
1622
1623        // Get the latest list of gas
1624        let gas_coins = context.gas_objects(address).await.unwrap();
1625
1626        let tiny_amount = gas_coins
1627            .iter()
1628            .find(|gas| gas.1.object_id == tiny_coin_id)
1629            .unwrap()
1630            .0;
1631        assert_eq!(tiny_amount, tiny_value);
1632
1633        let gas_coins: HashSet<ObjectID> =
1634            HashSet::from_iter(gas_coins.into_iter().map(|gas| gas.1.object_id));
1635
1636        let tmp = tempfile::tempdir().unwrap();
1637        let prom_registry = Registry::new();
1638        let faucet = SimpleFaucet::new(
1639            context,
1640            &prom_registry,
1641            &tmp.path().join("faucet.wal"),
1642            config,
1643        )
1644        .await
1645        .unwrap();
1646        faucet.shutdown_batch_send_task();
1647
1648        let faucet: &mut SimpleFaucet = &mut Arc::try_unwrap(faucet).unwrap();
1649
1650        // Ask for a value higher than tiny coin + DEFAULT_GAS_COMPUTATION_BUCKET
1651        let number_of_coins = gas_coins.len();
1652        let amounts = &vec![tiny_value + 1; number_of_coins];
1653        // We traverse the list ten times, which must trigger the tiny gas to be
1654        // examined and then discarded
1655        futures::future::join_all((0..10).map(|_| {
1656            faucet.send(
1657                Uuid::new_v4(),
1658                IotaAddress::random_for_testing_only(),
1659                amounts,
1660            )
1661        }))
1662        .await;
1663        info!(
1664            ?number_of_coins,
1665            "Sent to random addresses: {} {}",
1666            amounts[0],
1667            amounts.len(),
1668        );
1669
1670        // Verify that the tiny gas is not in the queue.
1671        tokio::task::yield_now().await;
1672        let discarded = faucet.metrics.total_discarded_coins.get();
1673
1674        info!("discarded: {:?}", discarded);
1675        let candidates = faucet.drain_gas_queue(gas_coins.len() - 1).await;
1676
1677        assert_eq!(discarded, 1);
1678        assert!(!candidates.contains(&tiny_coin_id));
1679    }
1680
1681    #[tokio::test]
1682    async fn test_insufficient_balance_will_retry_success() {
1683        let test_cluster = TestClusterBuilder::new().build().await;
1684        let address = test_cluster.get_address_0();
1685        let mut context = test_cluster.wallet;
1686        let gas_coins = context
1687            .get_all_gas_objects_owned_by_address(address)
1688            .await
1689            .unwrap();
1690        let config = FaucetConfig::default();
1691
1692        // The coin that is split off stays because we don't try to refresh the coin
1693        // vector
1694        let reasonable_value = (config.num_coins as u64 * config.amount) * 10;
1695        let client = context.get_client().await.unwrap();
1696        let tx_kind = client
1697            .transaction_builder()
1698            .split_coin_tx_kind(
1699                gas_coins.first().unwrap().0,
1700                Some(vec![reasonable_value]),
1701                None,
1702            )
1703            .await
1704            .unwrap();
1705        let gas_budget = 50_000_000;
1706        let rgp = context.get_reference_gas_price().await.unwrap();
1707        let tx_data = client
1708            .transaction_builder()
1709            .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1710            .await
1711            .unwrap();
1712        execute_tx(&mut context, tx_data).await.unwrap();
1713
1714        let destination_address = IotaAddress::random_for_testing_only();
1715        // Transfer all valid gases away except for 1
1716        for gas in gas_coins.iter().take(gas_coins.len() - 1) {
1717            let tx_data = client
1718                .transaction_builder()
1719                .transfer_iota(address, gas.0, gas_budget, destination_address, None)
1720                .await
1721                .unwrap();
1722            execute_tx(&mut context, tx_data).await.unwrap();
1723        }
1724
1725        // Assert that the coins were transferred away successfully to destination
1726        // address
1727        let gas_coins = context
1728            .get_all_gas_objects_owned_by_address(address)
1729            .await
1730            .unwrap();
1731        assert!(!gas_coins.is_empty());
1732
1733        let tmp = tempfile::tempdir().unwrap();
1734        let prom_registry = Registry::new();
1735        let config = FaucetConfig::default();
1736        let faucet = SimpleFaucet::new(
1737            context,
1738            &prom_registry,
1739            &tmp.path().join("faucet.wal"),
1740            config,
1741        )
1742        .await
1743        .unwrap();
1744
1745        // We traverse the list twice, which must trigger the split gas to be kicked out
1746        futures::future::join_all((0..2).map(|_| {
1747            faucet.send(
1748                Uuid::new_v4(),
1749                IotaAddress::random_for_testing_only(),
1750                &[30000000000],
1751            )
1752        }))
1753        .await;
1754
1755        // Check that the gas was discarded for being too small
1756        let discarded = faucet.metrics.total_discarded_coins.get();
1757        assert_eq!(discarded, 1);
1758
1759        // Check that the WAL is empty so we don't retry bad requests
1760        let wal = faucet.wal.lock().await;
1761        assert!(wal.log.is_empty());
1762    }
1763
1764    #[tokio::test]
1765    async fn test_faucet_no_loop_forever() {
1766        let test_cluster = TestClusterBuilder::new().build().await;
1767        let address = test_cluster.get_address_0();
1768        let mut context = test_cluster.wallet;
1769        let gas_coins = context
1770            .get_all_gas_objects_owned_by_address(address)
1771            .await
1772            .unwrap();
1773        let config = FaucetConfig::default();
1774
1775        let tiny_value = (config.num_coins as u64 * config.amount) + 1;
1776        let client = context.get_client().await.unwrap();
1777        let tx_kind = client
1778            .transaction_builder()
1779            .split_coin_tx_kind(gas_coins.first().unwrap().0, Some(vec![tiny_value]), None)
1780            .await
1781            .unwrap();
1782
1783        let gas_budget = 50_000_000;
1784        let rgp = context.get_reference_gas_price().await.unwrap();
1785
1786        let tx_data = client
1787            .transaction_builder()
1788            .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1789            .await
1790            .unwrap();
1791
1792        execute_tx(&mut context, tx_data).await.unwrap();
1793
1794        let destination_address = IotaAddress::random_for_testing_only();
1795
1796        // Transfer all valid gases away
1797        for gas in gas_coins {
1798            let tx_data = client
1799                .transaction_builder()
1800                .transfer_iota(address, gas.0, gas_budget, destination_address, None)
1801                .await
1802                .unwrap();
1803            execute_tx(&mut context, tx_data).await.unwrap();
1804        }
1805
1806        // Assert that the coins were transferred away successfully to destination
1807        // address
1808        let gas_coins = context
1809            .get_all_gas_objects_owned_by_address(destination_address)
1810            .await
1811            .unwrap();
1812        assert!(!gas_coins.is_empty());
1813
1814        let tmp = tempfile::tempdir().unwrap();
1815        let prom_registry = Registry::new();
1816        let faucet = SimpleFaucet::new(
1817            context,
1818            &prom_registry,
1819            &tmp.path().join("faucet.wal"),
1820            config,
1821        )
1822        .await
1823        .unwrap();
1824
1825        let destination_address = IotaAddress::random_for_testing_only();
1826        // Assert that faucet will discard and also terminate
1827        let res = faucet
1828            .send(Uuid::new_v4(), destination_address, &[30000000000])
1829            .await;
1830
1831        // Assert that the result is an Error
1832        assert!(matches!(res, Err(FaucetError::NoGasCoinAvailable)));
1833    }
1834
1835    #[tokio::test]
1836    async fn test_faucet_restart_clears_wal() {
1837        let test_cluster = TestClusterBuilder::new().build().await;
1838        let context = test_cluster.wallet;
1839        let tmp = tempfile::tempdir().unwrap();
1840        let prom_registry = Registry::new();
1841        let config = FaucetConfig::default();
1842
1843        let faucet = SimpleFaucet::new(
1844            context,
1845            &prom_registry,
1846            &tmp.path().join("faucet.wal"),
1847            config,
1848        )
1849        .await
1850        .unwrap();
1851
1852        let recipient = IotaAddress::random_for_testing_only();
1853        let faucet_address = faucet.active_address;
1854        let uuid = Uuid::new_v4();
1855
1856        let GasCoinResponse::ValidGasCoin(coin_id) =
1857            faucet.prepare_gas_coin(100, uuid, false).await
1858        else {
1859            panic!("prepare_gas_coin did not give a valid coin.")
1860        };
1861
1862        let tx_data = faucet
1863            .build_pay_iota_txn(coin_id, faucet_address, recipient, &[100], 200_000_000)
1864            .await
1865            .map_err(FaucetError::internal)
1866            .unwrap();
1867
1868        let mut wal = faucet.wal.lock().await;
1869
1870        // Check no WAL
1871        assert!(wal.log.is_empty());
1872        wal.reserve(Uuid::new_v4(), coin_id, recipient, tx_data)
1873            .map_err(FaucetError::internal)
1874            .ok();
1875        drop(wal);
1876
1877        // Check WAL is not empty but will not clear because txn is in_flight
1878        let mut wal = faucet.wal.lock().await;
1879        assert!(!wal.log.is_empty());
1880
1881        // Set in flight to false so WAL will clear
1882        wal.set_in_flight(coin_id, false)
1883            .expect("Unable to set in flight status to false.");
1884        drop(wal);
1885        faucet.shutdown_batch_send_task();
1886
1887        let faucet_unwrapped = Arc::try_unwrap(faucet).unwrap();
1888
1889        let kept_context = faucet_unwrapped.teardown();
1890
1891        // Simulate a faucet restart and check that it clears the WAL
1892        let prom_registry_new = Registry::new();
1893
1894        let faucet_restarted = SimpleFaucet::new(
1895            kept_context,
1896            &prom_registry_new,
1897            &tmp.path().join("faucet.wal"),
1898            FaucetConfig::default(),
1899        )
1900        .await
1901        .unwrap();
1902
1903        let restarted_wal = faucet_restarted.wal.lock().await;
1904        assert!(restarted_wal.log.is_empty())
1905    }
1906
1907    #[tokio::test]
1908    async fn test_amounts_transferred_on_batch() {
1909        let test_cluster = TestClusterBuilder::new().build().await;
1910        let config: FaucetConfig = FaucetConfig {
1911            batch_enabled: true,
1912            ..Default::default()
1913        };
1914        let address = test_cluster.get_address_0();
1915        let mut context = test_cluster.wallet;
1916        let gas_coins = context
1917            .get_all_gas_objects_owned_by_address(address)
1918            .await
1919            .unwrap();
1920        let client = context.get_client().await.unwrap();
1921        let tx_kind = client
1922            .transaction_builder()
1923            .split_coin_tx_kind(gas_coins.first().unwrap().0, None, Some(10))
1924            .await
1925            .unwrap();
1926        let gas_budget = 50_000_000;
1927        let rgp = context.get_reference_gas_price().await.unwrap();
1928        let tx_data = client
1929            .transaction_builder()
1930            .tx_data(address, tx_kind, gas_budget, rgp, vec![], None)
1931            .await
1932            .unwrap();
1933        execute_tx(&mut context, tx_data).await.unwrap();
1934
1935        let prom_registry = Registry::new();
1936        let tmp = tempfile::tempdir().unwrap();
1937        let amount_to_send = config.amount;
1938
1939        let faucet = SimpleFaucet::new(
1940            context,
1941            &prom_registry,
1942            &tmp.path().join("faucet.wal"),
1943            config,
1944        )
1945        .await
1946        .unwrap();
1947
1948        // Create a vector containing two randomly generated addresses
1949        let target_addresses: Vec<IotaAddress> = (0..2)
1950            .map(|_| IotaAddress::random_for_testing_only())
1951            .collect();
1952
1953        // Send 2 coins of 1 iota each. We
1954        let coins_sent = 2;
1955        let amounts = &vec![amount_to_send; coins_sent];
1956
1957        // Send a request
1958        let response = futures::future::join_all(
1959            target_addresses
1960                .iter()
1961                .map(|address| faucet.batch_send(Uuid::new_v4(), *address, amounts)),
1962        )
1963        .await
1964        .into_iter()
1965        .map(|res| res.unwrap())
1966        .collect::<Vec<BatchFaucetReceipt>>();
1967
1968        let mut status_results;
1969        loop {
1970            // Assert that all of these are SUCCEEDED
1971            status_results =
1972                futures::future::join_all(response.clone().iter().map(|task| {
1973                    faucet.get_batch_send_status(Uuid::parse_str(&task.task).unwrap())
1974                }))
1975                .await
1976                .into_iter()
1977                .map(|res| res.unwrap())
1978                .collect::<Vec<BatchSendStatus>>();
1979
1980            // All requests are submitted and picked up by the same batch, so one success in
1981            // the test will guarantee all success.
1982            if status_results[0].status == BatchSendStatusType::SUCCEEDED {
1983                break;
1984            }
1985            info!(
1986                "Trying to get status again... current is: {:?}",
1987                status_results[0].status
1988            );
1989        }
1990
1991        for status in status_results {
1992            assert_eq!(status.status, BatchSendStatusType::SUCCEEDED);
1993            let amounts = status.transferred_gas_objects.unwrap().sent;
1994            assert_eq!(amounts.len(), coins_sent);
1995            for amt in amounts {
1996                assert_eq!(amt.amount, amount_to_send);
1997            }
1998        }
1999    }
2000
2001    async fn test_send_interface_has_success_status(faucet: &impl Faucet) {
2002        let recipient = IotaAddress::random_for_testing_only();
2003        let amounts = vec![1, 2, 3];
2004        let uuid_test = Uuid::new_v4();
2005
2006        faucet.send(uuid_test, recipient, &amounts).await.unwrap();
2007
2008        let status = faucet.get_batch_send_status(uuid_test).await.unwrap();
2009        let mut actual_amounts: Vec<u64> = status
2010            .transferred_gas_objects
2011            .unwrap()
2012            .sent
2013            .iter()
2014            .map(|c| c.amount)
2015            .collect();
2016        actual_amounts.sort_unstable();
2017
2018        assert_eq!(actual_amounts, amounts);
2019        assert_eq!(status.status, BatchSendStatusType::SUCCEEDED);
2020    }
2021
2022    async fn test_basic_interface(faucet: &impl Faucet) {
2023        let recipient = IotaAddress::random_for_testing_only();
2024        let amounts = vec![1, 2, 3];
2025
2026        let FaucetReceipt { sent } = faucet
2027            .send(Uuid::new_v4(), recipient, &amounts)
2028            .await
2029            .unwrap();
2030        let mut actual_amounts: Vec<u64> = sent.iter().map(|c| c.amount).collect();
2031        actual_amounts.sort_unstable();
2032        assert_eq!(actual_amounts, amounts);
2033    }
2034}