consensus_core/
transaction.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::BTreeMap, sync::Arc};
6
7use iota_common::debug_fatal;
8use iota_metrics::monitored_mpsc::{Receiver, Sender, channel};
9use parking_lot::Mutex;
10use tap::tap::TapFallible;
11use thiserror::Error;
12use tokio::sync::oneshot;
13use tracing::{error, warn};
14
15use crate::{
16    Round,
17    block::{BlockRef, Transaction},
18    context::Context,
19};
20
21/// The maximum number of transactions pending to the queue to be pulled for
22/// block proposal
23const MAX_PENDING_TRANSACTIONS: usize = 2_000;
24
25/// The guard acts as an acknowledgment mechanism for the inclusion of the
26/// transactions to a block. When its last transaction is included to a block
27/// then `included_in_block_ack` will be signalled. If the guard is dropped
28/// without getting acknowledged that means the transactions have not been
29/// included to a block and the consensus is shutting down.
30pub(crate) struct TransactionsGuard {
31    // Holds a list of transactions to be included in the block.
32    // A TransactionsGuard may be partially consumed by `TransactionConsumer`, in which case, this
33    // holds the remaining transactions.
34    transactions: Vec<Transaction>,
35
36    included_in_block_ack: oneshot::Sender<(BlockRef, oneshot::Receiver<BlockStatus>)>,
37}
38
39/// The TransactionConsumer is responsible for fetching the next transactions to
40/// be included for the block proposals. The transactions are submitted to a
41/// channel which is shared between the TransactionConsumer and the
42/// TransactionClient and are pulled every time the `next` method is called.
43pub(crate) struct TransactionConsumer {
44    context: Arc<Context>,
45    tx_receiver: Receiver<TransactionsGuard>,
46    max_transactions_in_block_bytes: u64,
47    max_num_transactions_in_block: u64,
48    pending_transactions: Option<TransactionsGuard>,
49    block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
50}
51
52#[derive(Debug, Clone, Eq, PartialEq)]
53#[allow(unused)]
54pub enum BlockStatus {
55    /// The block has been sequenced as part of a committed sub dag. That means
56    /// that any transaction that has been included in the block
57    /// has been committed as well.
58    Sequenced(BlockRef),
59    /// The block has been garbage collected and will never be committed. Any
60    /// transactions that have been included in the block should also
61    /// be considered as impossible to be committed as part of this block and
62    /// might need to be retried
63    GarbageCollected(BlockRef),
64}
65
66#[derive(Debug, Clone, Eq, PartialEq)]
67pub enum LimitReached {
68    // The maximum number of transactions have been included
69    MaxNumOfTransactions,
70    // The maximum number of bytes have been included
71    MaxBytes,
72    // All available transactions have been included
73    AllTransactionsIncluded,
74}
75
76impl TransactionConsumer {
77    pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
78        Self {
79            tx_receiver,
80            max_transactions_in_block_bytes: context
81                .protocol_config
82                .max_transactions_in_block_bytes(),
83            max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
84            context,
85            pending_transactions: None,
86            block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
87        }
88    }
89
90    // Attempts to fetch the next transactions that have been submitted for
91    // sequence. Respects the `max_transactions_in_block_bytes`
92    // and `max_num_transactions_in_block` parameters specified via protocol config.
93    // This returns one or more transactions to be included in the block and a
94    // callback to acknowledge the inclusion of those transactions. Also returns
95    // a `LimitReached` enum to indicate which limit type has been reached.
96    pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
97        let mut transactions = Vec::new();
98        let mut acks = Vec::new();
99        let mut total_bytes = 0;
100        let mut limit_reached = LimitReached::AllTransactionsIncluded;
101
102        // Handle one batch of incoming transactions from TransactionGuard.
103        // The method will return `None` if all the transactions can be included in the
104        // block. Otherwise none of the transactions will be included in the
105        // block and the method will return the TransactionGuard.
106        let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
107            let transactions_bytes =
108                t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
109            let transactions_num = t.transactions.len() as u64;
110
111            if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
112                limit_reached = LimitReached::MaxBytes;
113                return Some(t);
114            }
115            if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
116                limit_reached = LimitReached::MaxNumOfTransactions;
117                return Some(t);
118            }
119
120            total_bytes += transactions_bytes;
121
122            // The transactions can be consumed, register its ack.
123            acks.push(t.included_in_block_ack);
124            transactions.extend(t.transactions);
125            None
126        };
127
128        if let Some(t) = self.pending_transactions.take() {
129            if let Some(pending_transactions) = handle_txs(t) {
130                debug_fatal!(
131                    "Previously pending transaction(s) should fit into an empty block! Dropping: {:?}",
132                    pending_transactions.transactions
133                );
134            }
135        }
136
137        // Until we have reached the limit for the pull.
138        // We may have already reached limit in the first iteration above, in which case
139        // we stop immediately.
140        while self.pending_transactions.is_none() {
141            if let Ok(t) = self.tx_receiver.try_recv() {
142                self.pending_transactions = handle_txs(t);
143            } else {
144                break;
145            }
146        }
147
148        let block_status_subscribers = self.block_status_subscribers.clone();
149        let gc_enabled = self.context.protocol_config.gc_depth() > 0;
150
151        (
152            transactions,
153            Box::new(move |block_ref: BlockRef| {
154                let mut block_status_subscribers = block_status_subscribers.lock();
155
156                for ack in acks {
157                    let (status_tx, status_rx) = oneshot::channel();
158
159                    if gc_enabled {
160                        block_status_subscribers
161                            .entry(block_ref)
162                            .or_default()
163                            .push(status_tx);
164                    } else {
165                        // When gc is not enabled, then report directly the block as sequenced while
166                        // tx is acknowledged for inclusion. As blocks can
167                        // never get garbage collected it is there is actually no meaning to do
168                        // otherwise and also is safer for edge cases.
169                        status_tx.send(BlockStatus::Sequenced(block_ref)).ok();
170                    }
171
172                    let _ = ack.send((block_ref, status_rx));
173                }
174            }),
175            limit_reached,
176        )
177    }
178
179    /// Notifies all the transaction submitters who are waiting to receive an
180    /// update on the status of the block. The `committed_blocks` are the
181    /// blocks that have been committed and the `gc_round` is the round up to
182    /// which the blocks have been garbage collected. First we'll notify for
183    /// all the committed blocks, and then for all the blocks that have been
184    /// garbage collected.
185    pub(crate) fn notify_own_blocks_status(
186        &self,
187        committed_blocks: Vec<BlockRef>,
188        gc_round: Round,
189    ) {
190        // Notify for all the committed blocks first
191        let mut block_status_subscribers = self.block_status_subscribers.lock();
192        for block_ref in committed_blocks {
193            if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
194                subscribers.into_iter().for_each(|s| {
195                    let _ = s.send(BlockStatus::Sequenced(block_ref));
196                });
197            }
198        }
199
200        // Now notify everyone <= gc_round that their block has been garbage collected
201        // and clean up the entries
202        while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
203            if block_ref.round <= gc_round {
204                subscribers.into_iter().for_each(|s| {
205                    let _ = s.send(BlockStatus::GarbageCollected(block_ref));
206                });
207            } else {
208                block_status_subscribers.insert(block_ref, subscribers);
209                break;
210            }
211        }
212    }
213
214    #[cfg(test)]
215    pub(crate) fn subscribe_for_block_status_testing(
216        &self,
217        block_ref: BlockRef,
218    ) -> oneshot::Receiver<BlockStatus> {
219        let (tx, rx) = oneshot::channel();
220        let mut block_status_subscribers = self.block_status_subscribers.lock();
221        block_status_subscribers
222            .entry(block_ref)
223            .or_default()
224            .push(tx);
225        rx
226    }
227
228    #[cfg(test)]
229    fn is_empty(&mut self) -> bool {
230        if self.pending_transactions.is_some() {
231            return false;
232        }
233        if let Ok(t) = self.tx_receiver.try_recv() {
234            self.pending_transactions = Some(t);
235            return false;
236        }
237        true
238    }
239}
240
241#[derive(Clone)]
242pub struct TransactionClient {
243    sender: Sender<TransactionsGuard>,
244    max_transaction_size: u64,
245    max_transactions_in_block_bytes: u64,
246    max_transactions_in_block_count: u64,
247}
248
249#[derive(Debug, Error)]
250pub enum ClientError {
251    #[error("Failed to submit transaction, consensus is shutting down: {0}")]
252    ConsensusShuttingDown(String),
253
254    #[error("Transaction size ({0}B) is over limit ({1}B)")]
255    OversizedTransaction(u64, u64),
256
257    #[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
258    OversizedTransactionBundleBytes(u64, u64),
259
260    #[error("Transaction bundle count ({0}) is over limit ({1})")]
261    OversizedTransactionBundleCount(u64, u64),
262}
263
264impl TransactionClient {
265    pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
266        let (sender, receiver) = channel("consensus_input", MAX_PENDING_TRANSACTIONS);
267
268        (
269            Self {
270                sender,
271                max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
272                max_transactions_in_block_bytes: context
273                    .protocol_config
274                    .max_transactions_in_block_bytes(),
275                max_transactions_in_block_count: context
276                    .protocol_config
277                    .max_num_transactions_in_block(),
278            },
279            receiver,
280        )
281    }
282
283    /// Submits a list of transactions to be sequenced. The method returns when
284    /// all the transactions have been successfully included
285    /// to next proposed blocks.
286    pub async fn submit(
287        &self,
288        transactions: Vec<Vec<u8>>,
289    ) -> Result<(BlockRef, oneshot::Receiver<BlockStatus>), ClientError> {
290        // TODO: Support returning the block refs for transactions that span multiple
291        // blocks
292        let included_in_block = self.submit_no_wait(transactions).await?;
293        included_in_block
294            .await
295            .tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
296            .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
297    }
298
299    /// Submits a list of transactions to be sequenced.
300    /// If any transaction's length exceeds `max_transaction_size`, no
301    /// transaction will be submitted. That shouldn't be the common case as
302    /// sizes should be aligned between consensus and client. The method returns
303    /// a receiver to wait on until the transactions has been included in the
304    /// next block to get proposed. The consumer should wait on it to
305    /// consider as inclusion acknowledgement. If the receiver errors then
306    /// consensus is shutting down and transaction has not been included to
307    /// any block. If multiple transactions are submitted, the method will
308    /// attempt to bundle them together in a single block. If the total size of
309    /// the transactions exceeds `max_transactions_in_block_bytes`, no
310    /// transaction will be submitted and an error will be returned instead.
311    /// Similar if transactions exceed `max_transactions_in_block_count` an
312    /// error will be returned.
313    pub(crate) async fn submit_no_wait(
314        &self,
315        transactions: Vec<Vec<u8>>,
316    ) -> Result<oneshot::Receiver<(BlockRef, oneshot::Receiver<BlockStatus>)>, ClientError> {
317        let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
318
319        let mut bundle_size = 0;
320
321        if transactions.len() as u64 > self.max_transactions_in_block_count {
322            return Err(ClientError::OversizedTransactionBundleCount(
323                transactions.len() as u64,
324                self.max_transactions_in_block_count,
325            ));
326        }
327
328        for transaction in &transactions {
329            if transaction.len() as u64 > self.max_transaction_size {
330                return Err(ClientError::OversizedTransaction(
331                    transaction.len() as u64,
332                    self.max_transaction_size,
333                ));
334            }
335            bundle_size += transaction.len() as u64;
336
337            if bundle_size > self.max_transactions_in_block_bytes {
338                return Err(ClientError::OversizedTransactionBundleBytes(
339                    bundle_size,
340                    self.max_transactions_in_block_bytes,
341                ));
342            }
343        }
344
345        let t = TransactionsGuard {
346            transactions: transactions.into_iter().map(Transaction::new).collect(),
347            included_in_block_ack: included_in_block_ack_send,
348        };
349        self.sender
350            .send(t)
351            .await
352            .tap_err(|e| error!("Submit transactions failed with {:?}", e))
353            .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
354        Ok(included_in_block_ack_receive)
355    }
356}
357
358/// `TransactionVerifier` implementation is supplied by IOTA to validate
359/// transactions in a block, before acceptance of the block.
360pub trait TransactionVerifier: Send + Sync + 'static {
361    /// Determines if this batch can be voted on
362    fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
363}
364
365#[derive(Debug, Error)]
366pub enum ValidationError {
367    #[error("Invalid transaction: {0}")]
368    InvalidTransaction(String),
369}
370
371/// `NoopTransactionVerifier` accepts all transactions.
372#[cfg(any(test, msim))]
373pub struct NoopTransactionVerifier;
374
375#[cfg(any(test, msim))]
376impl TransactionVerifier for NoopTransactionVerifier {
377    fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
378        Ok(())
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use std::{sync::Arc, time::Duration};
385
386    use consensus_config::AuthorityIndex;
387    use futures::{StreamExt, stream::FuturesUnordered};
388    use iota_protocol_config::ProtocolConfig;
389    use tokio::time::timeout;
390
391    use crate::{
392        block::{BlockDigest, BlockRef},
393        block_verifier::SignedBlockVerifier,
394        context::Context,
395        transaction::{
396            BlockStatus, LimitReached, NoopTransactionVerifier, TransactionClient,
397            TransactionConsumer,
398        },
399    };
400
401    #[tokio::test(flavor = "current_thread", start_paused = true)]
402    async fn basic_submit_and_consume() {
403        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
404            config.set_consensus_max_transaction_size_bytes_for_testing(2_000); // 2KB
405            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
406            config
407        });
408
409        let context = Arc::new(Context::new_for_test(4).0);
410        let (client, tx_receiver) = TransactionClient::new(context.clone());
411        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
412
413        // submit asynchronously the transactions and keep the waiters
414        let mut included_in_block_waiters = FuturesUnordered::new();
415        for i in 0..3 {
416            let transaction =
417                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
418            let w = client
419                .submit_no_wait(vec![transaction])
420                .await
421                .expect("Shouldn't submit successfully transaction");
422            included_in_block_waiters.push(w);
423        }
424
425        // now pull the transactions from the consumer
426        let (transactions, ack_transactions, _limit_reached) = consumer.next();
427        assert_eq!(transactions.len(), 3);
428
429        for (i, t) in transactions.iter().enumerate() {
430            let t: String = bcs::from_bytes(t.data()).unwrap();
431            assert_eq!(format!("transaction {i}").to_string(), t);
432        }
433
434        assert!(
435            timeout(Duration::from_secs(1), included_in_block_waiters.next())
436                .await
437                .is_err(),
438            "We should expect to timeout as none of the transactions have been acknowledged yet"
439        );
440
441        // Now acknowledge the inclusion of transactions
442        ack_transactions(BlockRef::MIN);
443
444        // Now make sure that all the waiters have returned
445        while let Some(result) = included_in_block_waiters.next().await {
446            assert!(result.is_ok());
447        }
448
449        // try to pull again transactions, result should be empty
450        assert!(consumer.is_empty());
451    }
452
453    #[tokio::test(flavor = "current_thread", start_paused = true)]
454    async fn block_status_update_gc_enabled() {
455        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
456            config.set_consensus_max_transaction_size_bytes_for_testing(2_000); // 2KB
457            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
458            config.set_consensus_gc_depth_for_testing(10);
459            config
460        });
461
462        let context = Arc::new(Context::new_for_test(4).0);
463        let (client, tx_receiver) = TransactionClient::new(context.clone());
464        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
465
466        // submit the transactions and include 2 of each on a new block
467        let mut included_in_block_waiters = FuturesUnordered::new();
468        for i in 1..=10 {
469            let transaction =
470                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
471            let w = client
472                .submit_no_wait(vec![transaction])
473                .await
474                .expect("Shouldn't submit successfully transaction");
475            included_in_block_waiters.push(w);
476
477            // Every 2 transactions simulate the creation of a new block and acknowledge the
478            // inclusion of the transactions
479            if i % 2 == 0 {
480                let (transactions, ack_transactions, _limit_reached) = consumer.next();
481                assert_eq!(transactions.len(), 2);
482                ack_transactions(BlockRef::new(
483                    i,
484                    AuthorityIndex::new_for_test(0),
485                    BlockDigest::MIN,
486                ));
487            }
488        }
489
490        // Now iterate over all the waiters. Everyone should have been acknowledged.
491        let mut block_status_waiters = Vec::new();
492        while let Some(result) = included_in_block_waiters.next().await {
493            let (block_ref, block_status_waiter) =
494                result.expect("Block inclusion waiter shouldn't fail");
495            block_status_waiters.push((block_ref, block_status_waiter));
496        }
497
498        // Now acknowledge the commit of the blocks 6, 8, 10 and set gc_round = 5, which
499        // should trigger the garbage collection of blocks 1..=5
500        let gc_round = 5;
501        consumer.notify_own_blocks_status(
502            vec![
503                BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
504                BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
505                BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
506            ],
507            gc_round,
508        );
509
510        // Now iterate over all the block status waiters. Everyone should have been
511        // notified.
512        for (block_ref, waiter) in block_status_waiters {
513            let block_status = waiter.await.expect("Block status waiter shouldn't fail");
514
515            if block_ref.round <= gc_round {
516                assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
517            } else {
518                assert!(matches!(block_status, BlockStatus::Sequenced(_)));
519            }
520        }
521
522        // Ensure internal structure is clear
523        assert!(consumer.block_status_subscribers.lock().is_empty());
524    }
525
526    #[tokio::test(flavor = "current_thread", start_paused = true)]
527    async fn block_status_update_gc_disabled() {
528        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
529            config.set_consensus_max_transaction_size_bytes_for_testing(2_000); // 2KB
530            config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
531            config.set_consensus_gc_depth_for_testing(0);
532            config
533        });
534
535        let context = Arc::new(Context::new_for_test(4).0);
536        let (client, tx_receiver) = TransactionClient::new(context.clone());
537        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
538
539        // submit the transactions and include 2 of each on a new block
540        let mut included_in_block_waiters = FuturesUnordered::new();
541        for i in 1..=10 {
542            let transaction =
543                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
544            let w = client
545                .submit_no_wait(vec![transaction])
546                .await
547                .expect("Shouldn't submit successfully transaction");
548            included_in_block_waiters.push(w);
549
550            // Every 2 transactions simulate the creation of a new block and acknowledge the
551            // inclusion of the transactions
552            if i % 2 == 0 {
553                let (transactions, ack_transactions, _limit_reached) = consumer.next();
554                assert_eq!(transactions.len(), 2);
555                ack_transactions(BlockRef::new(
556                    i,
557                    AuthorityIndex::new_for_test(0),
558                    BlockDigest::MIN,
559                ));
560            }
561        }
562
563        // Now iterate over all the waiters. Everyone should have been acknowledged.
564        let mut block_status_waiters = Vec::new();
565        while let Some(result) = included_in_block_waiters.next().await {
566            let (block_ref, block_status_waiter) =
567                result.expect("Block inclusion waiter shouldn't fail");
568            block_status_waiters.push((block_ref, block_status_waiter));
569        }
570
571        // Now iterate over all the block status waiters. Everyone should have been
572        // notified and everyone should be considered sequenced.
573        for (_block_ref, waiter) in block_status_waiters {
574            let block_status = waiter.await.expect("Block status waiter shouldn't fail");
575            assert!(matches!(block_status, BlockStatus::Sequenced(_)));
576        }
577
578        // Ensure internal structure is clear
579        assert!(consumer.block_status_subscribers.lock().is_empty());
580    }
581
582    #[tokio::test]
583    async fn submit_over_max_fetch_size_and_consume() {
584        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
585            config.set_consensus_max_transaction_size_bytes_for_testing(100);
586            config.set_consensus_max_transactions_in_block_bytes_for_testing(100);
587            config
588        });
589
590        let context = Arc::new(Context::new_for_test(4).0);
591        let (client, tx_receiver) = TransactionClient::new(context.clone());
592        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
593
594        // submit some transactions
595        for i in 0..10 {
596            let transaction =
597                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
598            let _w = client
599                .submit_no_wait(vec![transaction])
600                .await
601                .expect("Shouldn't submit successfully transaction");
602        }
603
604        // now pull the transactions from the consumer
605        let mut all_transactions = Vec::new();
606        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
607        assert_eq!(transactions.len(), 7);
608
609        // ensure their total size is less than `max_bytes_to_fetch`
610        let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
611        assert!(
612            total_size <= context.protocol_config.max_transactions_in_block_bytes(),
613            "Should have fetched transactions up to {}",
614            context.protocol_config.max_transactions_in_block_bytes()
615        );
616        all_transactions.extend(transactions);
617
618        // try to pull again transactions, next should be provided
619        let (transactions, _ack_transactions, _limit_reached) = consumer.next();
620        assert_eq!(transactions.len(), 3);
621
622        // ensure their total size is less than `max_bytes_to_fetch`
623        let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
624        assert!(
625            total_size <= context.protocol_config.max_transactions_in_block_bytes(),
626            "Should have fetched transactions up to {}",
627            context.protocol_config.max_transactions_in_block_bytes()
628        );
629        all_transactions.extend(transactions);
630
631        // try to pull again transactions, result should be empty
632        assert!(consumer.is_empty());
633
634        for (i, t) in all_transactions.iter().enumerate() {
635            let t: String = bcs::from_bytes(t.data()).unwrap();
636            assert_eq!(format!("transaction {i}").to_string(), t);
637        }
638    }
639
640    #[tokio::test]
641    async fn submit_large_batch_and_ack() {
642        let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
643            config.set_consensus_max_transaction_size_bytes_for_testing(15);
644            config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
645            config
646        });
647
648        let context = Arc::new(Context::new_for_test(4).0);
649        let (client, tx_receiver) = TransactionClient::new(context.clone());
650        let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
651        let mut all_receivers = Vec::new();
652        // submit a few transactions individually.
653        for i in 0..10 {
654            let transaction =
655                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
656            let w = client
657                .submit_no_wait(vec![transaction])
658                .await
659                .expect("Should submit successfully transaction");
660            all_receivers.push(w);
661        }
662
663        // construct an acceptable batch and submit, it should be accepted
664        {
665            let transactions: Vec<_> = (10..15)
666                .map(|i| {
667                    bcs::to_bytes(&format!("transaction {i}"))
668                        .expect("Serialization should not fail.")
669                })
670                .collect();
671            let w = client
672                .submit_no_wait(transactions)
673                .await
674                .expect("Should submit successfully transaction");
675            all_receivers.push(w);
676        }
677
678        // submit another individual transaction.
679        {
680            let i = 15;
681            let transaction =
682                bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
683            let w = client
684                .submit_no_wait(vec![transaction])
685                .await
686                .expect("Shouldn't submit successfully transaction");
687            all_receivers.push(w);
688        }
689
690        // construct a over-size-limit batch and submit, it should not be accepted
691        {
692            let transactions: Vec<_> = (16..32)
693                .map(|i| {
694                    bcs::to_bytes(&format!("transaction {i}"))
695                        .expect("Serialization should not fail.")
696                })
697                .collect();
698            let result = client.submit_no_wait(transactions).await.unwrap_err();
699            assert_eq!(
700                result.to_string(),
701                "Transaction bundle size (210B) is over limit (200B)"
702            );
703        }
704
705        // now pull the transactions from the consumer.
706        // we expect all transactions are fetched in order, not missing any, and not
707        // exceeding the size limit.
708        let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
709        let mut batch_index = 0;
710        while !consumer.is_empty() {
711            let (transactions, ack_transactions, _limit_reached) = consumer.next();
712
713            assert!(
714                transactions.len() as u64
715                    <= context.protocol_config.max_num_transactions_in_block(),
716                "Should have fetched transactions up to {}",
717                context.protocol_config.max_num_transactions_in_block()
718            );
719
720            let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
721            assert!(
722                total_size <= context.protocol_config.max_transactions_in_block_bytes(),
723                "Should have fetched transactions up to {}",
724                context.protocol_config.max_transactions_in_block_bytes()
725            );
726
727            // first batch should contain all transactions from 0..10. The softbundle it is
728            // to big to fit as well, so it's parked.
729            if batch_index == 0 {
730                assert_eq!(transactions.len(), 10);
731                for (i, transaction) in transactions.iter().enumerate() {
732                    let t: String = bcs::from_bytes(transaction.data()).unwrap();
733                    assert_eq!(format!("transaction {}", i).to_string(), t);
734                }
735            // second batch will contain the soft bundle and the additional last
736            // transaction.
737            } else if batch_index == 1 {
738                assert_eq!(transactions.len(), 6);
739                for (i, transaction) in transactions.iter().enumerate() {
740                    let t: String = bcs::from_bytes(transaction.data()).unwrap();
741                    assert_eq!(format!("transaction {}", i + 10).to_string(), t);
742                }
743            } else {
744                panic!("Unexpected batch index");
745            }
746
747            batch_index += 1;
748
749            all_acks.push(ack_transactions);
750        }
751
752        // now acknowledge the inclusion of all transactions.
753        for ack in all_acks {
754            ack(BlockRef::MIN);
755        }
756
757        // expect all receivers to be resolved.
758        for w in all_receivers {
759            let r = w.await;
760            assert!(r.is_ok());
761        }
762    }
763
764    #[tokio::test]
765    async fn test_submit_over_max_block_size_and_validate_block_size() {
766        // submit transactions individually so we make sure that we have reached the
767        // block size limit of 10
768        {
769            let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
770                config.set_consensus_max_transaction_size_bytes_for_testing(100);
771                config.set_consensus_max_num_transactions_in_block_for_testing(10);
772                config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
773                config
774            });
775
776            let context = Arc::new(Context::new_for_test(4).0);
777            let (client, tx_receiver) = TransactionClient::new(context.clone());
778            let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
779            let mut all_receivers = Vec::new();
780
781            // create enough transactions
782            let max_num_transactions_in_block =
783                context.protocol_config.max_num_transactions_in_block();
784            for i in 0..2 * max_num_transactions_in_block {
785                let transaction = bcs::to_bytes(&format!("transaction {i}"))
786                    .expect("Serialization should not fail.");
787                let w = client
788                    .submit_no_wait(vec![transaction])
789                    .await
790                    .expect("Should submit successfully transaction");
791                all_receivers.push(w);
792            }
793
794            // Fetch the next transactions to be included in a block
795            let (transactions, _ack_transactions, limit) = consumer.next();
796            assert_eq!(limit, LimitReached::MaxNumOfTransactions);
797            assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
798
799            // Now create a block and verify that transactions are within the size limits
800            let block_verifier =
801                SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
802
803            let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
804            assert!(
805                block_verifier.check_transactions(&batch).is_ok(),
806                "Number of transactions limit verification failed"
807            );
808        }
809
810        // submit transactions individually so we make sure that we have reached the
811        // block size bytes 300
812        {
813            let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
814                config.set_consensus_max_transaction_size_bytes_for_testing(100);
815                config.set_consensus_max_num_transactions_in_block_for_testing(1_000);
816                config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
817                config
818            });
819
820            let context = Arc::new(Context::new_for_test(4).0);
821            let (client, tx_receiver) = TransactionClient::new(context.clone());
822            let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
823            let mut all_receivers = Vec::new();
824
825            let max_transactions_in_block_bytes =
826                context.protocol_config.max_transactions_in_block_bytes();
827            let mut total_size = 0;
828            loop {
829                let transaction = bcs::to_bytes(&"transaction".to_string())
830                    .expect("Serialization should not fail.");
831                total_size += transaction.len() as u64;
832                let w = client
833                    .submit_no_wait(vec![transaction])
834                    .await
835                    .expect("Should submit successfully transaction");
836                all_receivers.push(w);
837
838                // create enough transactions to reach the block size limit
839                if total_size >= 2 * max_transactions_in_block_bytes {
840                    break;
841                }
842            }
843
844            // Fetch the next transactions to be included in a block
845            let (transactions, _ack_transactions, limit) = consumer.next();
846            let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
847            let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
848
849            assert_eq!(limit, LimitReached::MaxBytes);
850            assert!(
851                batch.len()
852                    < context
853                        .protocol_config
854                        .consensus_max_num_transactions_in_block() as usize,
855                "Should have submitted less than the max number of transactions in a block"
856            );
857            assert!(size <= max_transactions_in_block_bytes);
858
859            // Now create a block and verify that transactions are within the size limits
860            let block_verifier =
861                SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
862
863            assert!(
864                block_verifier.check_transactions(&batch).is_ok(),
865                "Total size of transactions limit verification failed"
866            );
867        }
868    }
869}