1use std::{collections::BTreeMap, sync::Arc};
6
7use iota_common::debug_fatal;
8use iota_metrics::monitored_mpsc::{Receiver, Sender, channel};
9use parking_lot::Mutex;
10use tap::tap::TapFallible;
11use thiserror::Error;
12use tokio::sync::oneshot;
13use tracing::{error, warn};
14
15use crate::{
16 Round,
17 block::{BlockRef, Transaction},
18 context::Context,
19};
20
21const MAX_PENDING_TRANSACTIONS: usize = 2_000;
24
25pub(crate) struct TransactionsGuard {
31 transactions: Vec<Transaction>,
35
36 included_in_block_ack: oneshot::Sender<(BlockRef, oneshot::Receiver<BlockStatus>)>,
37}
38
39pub(crate) struct TransactionConsumer {
44 context: Arc<Context>,
45 tx_receiver: Receiver<TransactionsGuard>,
46 max_transactions_in_block_bytes: u64,
47 max_num_transactions_in_block: u64,
48 pending_transactions: Option<TransactionsGuard>,
49 block_status_subscribers: Arc<Mutex<BTreeMap<BlockRef, Vec<oneshot::Sender<BlockStatus>>>>>,
50}
51
52#[derive(Debug, Clone, Eq, PartialEq)]
53#[allow(unused)]
54pub enum BlockStatus {
55 Sequenced(BlockRef),
59 GarbageCollected(BlockRef),
64}
65
66#[derive(Debug, Clone, Eq, PartialEq)]
67pub enum LimitReached {
68 MaxNumOfTransactions,
70 MaxBytes,
72 AllTransactionsIncluded,
74}
75
76impl TransactionConsumer {
77 pub(crate) fn new(tx_receiver: Receiver<TransactionsGuard>, context: Arc<Context>) -> Self {
78 Self {
79 tx_receiver,
80 max_transactions_in_block_bytes: context
81 .protocol_config
82 .max_transactions_in_block_bytes(),
83 max_num_transactions_in_block: context.protocol_config.max_num_transactions_in_block(),
84 context,
85 pending_transactions: None,
86 block_status_subscribers: Arc::new(Mutex::new(BTreeMap::new())),
87 }
88 }
89
90 pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>, LimitReached) {
97 let mut transactions = Vec::new();
98 let mut acks = Vec::new();
99 let mut total_bytes = 0;
100 let mut limit_reached = LimitReached::AllTransactionsIncluded;
101
102 let mut handle_txs = |t: TransactionsGuard| -> Option<TransactionsGuard> {
107 let transactions_bytes =
108 t.transactions.iter().map(|t| t.data().len()).sum::<usize>() as u64;
109 let transactions_num = t.transactions.len() as u64;
110
111 if total_bytes + transactions_bytes > self.max_transactions_in_block_bytes {
112 limit_reached = LimitReached::MaxBytes;
113 return Some(t);
114 }
115 if transactions.len() as u64 + transactions_num > self.max_num_transactions_in_block {
116 limit_reached = LimitReached::MaxNumOfTransactions;
117 return Some(t);
118 }
119
120 total_bytes += transactions_bytes;
121
122 acks.push(t.included_in_block_ack);
124 transactions.extend(t.transactions);
125 None
126 };
127
128 if let Some(t) = self.pending_transactions.take() {
129 if let Some(pending_transactions) = handle_txs(t) {
130 debug_fatal!(
131 "Previously pending transaction(s) should fit into an empty block! Dropping: {:?}",
132 pending_transactions.transactions
133 );
134 }
135 }
136
137 while self.pending_transactions.is_none() {
141 if let Ok(t) = self.tx_receiver.try_recv() {
142 self.pending_transactions = handle_txs(t);
143 } else {
144 break;
145 }
146 }
147
148 let block_status_subscribers = self.block_status_subscribers.clone();
149 let gc_enabled = self.context.protocol_config.gc_depth() > 0;
150
151 (
152 transactions,
153 Box::new(move |block_ref: BlockRef| {
154 let mut block_status_subscribers = block_status_subscribers.lock();
155
156 for ack in acks {
157 let (status_tx, status_rx) = oneshot::channel();
158
159 if gc_enabled {
160 block_status_subscribers
161 .entry(block_ref)
162 .or_default()
163 .push(status_tx);
164 } else {
165 status_tx.send(BlockStatus::Sequenced(block_ref)).ok();
170 }
171
172 let _ = ack.send((block_ref, status_rx));
173 }
174 }),
175 limit_reached,
176 )
177 }
178
179 pub(crate) fn notify_own_blocks_status(
186 &self,
187 committed_blocks: Vec<BlockRef>,
188 gc_round: Round,
189 ) {
190 let mut block_status_subscribers = self.block_status_subscribers.lock();
192 for block_ref in committed_blocks {
193 if let Some(subscribers) = block_status_subscribers.remove(&block_ref) {
194 subscribers.into_iter().for_each(|s| {
195 let _ = s.send(BlockStatus::Sequenced(block_ref));
196 });
197 }
198 }
199
200 while let Some((block_ref, subscribers)) = block_status_subscribers.pop_first() {
203 if block_ref.round <= gc_round {
204 subscribers.into_iter().for_each(|s| {
205 let _ = s.send(BlockStatus::GarbageCollected(block_ref));
206 });
207 } else {
208 block_status_subscribers.insert(block_ref, subscribers);
209 break;
210 }
211 }
212 }
213
214 #[cfg(test)]
215 pub(crate) fn subscribe_for_block_status_testing(
216 &self,
217 block_ref: BlockRef,
218 ) -> oneshot::Receiver<BlockStatus> {
219 let (tx, rx) = oneshot::channel();
220 let mut block_status_subscribers = self.block_status_subscribers.lock();
221 block_status_subscribers
222 .entry(block_ref)
223 .or_default()
224 .push(tx);
225 rx
226 }
227
228 #[cfg(test)]
229 fn is_empty(&mut self) -> bool {
230 if self.pending_transactions.is_some() {
231 return false;
232 }
233 if let Ok(t) = self.tx_receiver.try_recv() {
234 self.pending_transactions = Some(t);
235 return false;
236 }
237 true
238 }
239}
240
241#[derive(Clone)]
242pub struct TransactionClient {
243 sender: Sender<TransactionsGuard>,
244 max_transaction_size: u64,
245 max_transactions_in_block_bytes: u64,
246 max_transactions_in_block_count: u64,
247}
248
249#[derive(Debug, Error)]
250pub enum ClientError {
251 #[error("Failed to submit transaction, consensus is shutting down: {0}")]
252 ConsensusShuttingDown(String),
253
254 #[error("Transaction size ({0}B) is over limit ({1}B)")]
255 OversizedTransaction(u64, u64),
256
257 #[error("Transaction bundle size ({0}B) is over limit ({1}B)")]
258 OversizedTransactionBundleBytes(u64, u64),
259
260 #[error("Transaction bundle count ({0}) is over limit ({1})")]
261 OversizedTransactionBundleCount(u64, u64),
262}
263
264impl TransactionClient {
265 pub(crate) fn new(context: Arc<Context>) -> (Self, Receiver<TransactionsGuard>) {
266 let (sender, receiver) = channel("consensus_input", MAX_PENDING_TRANSACTIONS);
267
268 (
269 Self {
270 sender,
271 max_transaction_size: context.protocol_config.max_transaction_size_bytes(),
272 max_transactions_in_block_bytes: context
273 .protocol_config
274 .max_transactions_in_block_bytes(),
275 max_transactions_in_block_count: context
276 .protocol_config
277 .max_num_transactions_in_block(),
278 },
279 receiver,
280 )
281 }
282
283 pub async fn submit(
287 &self,
288 transactions: Vec<Vec<u8>>,
289 ) -> Result<(BlockRef, oneshot::Receiver<BlockStatus>), ClientError> {
290 let included_in_block = self.submit_no_wait(transactions).await?;
293 included_in_block
294 .await
295 .tap_err(|e| warn!("Transaction acknowledge failed with {:?}", e))
296 .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))
297 }
298
299 pub(crate) async fn submit_no_wait(
314 &self,
315 transactions: Vec<Vec<u8>>,
316 ) -> Result<oneshot::Receiver<(BlockRef, oneshot::Receiver<BlockStatus>)>, ClientError> {
317 let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
318
319 let mut bundle_size = 0;
320
321 if transactions.len() as u64 > self.max_transactions_in_block_count {
322 return Err(ClientError::OversizedTransactionBundleCount(
323 transactions.len() as u64,
324 self.max_transactions_in_block_count,
325 ));
326 }
327
328 for transaction in &transactions {
329 if transaction.len() as u64 > self.max_transaction_size {
330 return Err(ClientError::OversizedTransaction(
331 transaction.len() as u64,
332 self.max_transaction_size,
333 ));
334 }
335 bundle_size += transaction.len() as u64;
336
337 if bundle_size > self.max_transactions_in_block_bytes {
338 return Err(ClientError::OversizedTransactionBundleBytes(
339 bundle_size,
340 self.max_transactions_in_block_bytes,
341 ));
342 }
343 }
344
345 let t = TransactionsGuard {
346 transactions: transactions.into_iter().map(Transaction::new).collect(),
347 included_in_block_ack: included_in_block_ack_send,
348 };
349 self.sender
350 .send(t)
351 .await
352 .tap_err(|e| error!("Submit transactions failed with {:?}", e))
353 .map_err(|e| ClientError::ConsensusShuttingDown(e.to_string()))?;
354 Ok(included_in_block_ack_receive)
355 }
356}
357
358pub trait TransactionVerifier: Send + Sync + 'static {
361 fn verify_batch(&self, batch: &[&[u8]]) -> Result<(), ValidationError>;
363}
364
365#[derive(Debug, Error)]
366pub enum ValidationError {
367 #[error("Invalid transaction: {0}")]
368 InvalidTransaction(String),
369}
370
371#[cfg(any(test, msim))]
373pub struct NoopTransactionVerifier;
374
375#[cfg(any(test, msim))]
376impl TransactionVerifier for NoopTransactionVerifier {
377 fn verify_batch(&self, _batch: &[&[u8]]) -> Result<(), ValidationError> {
378 Ok(())
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use std::{sync::Arc, time::Duration};
385
386 use consensus_config::AuthorityIndex;
387 use futures::{StreamExt, stream::FuturesUnordered};
388 use iota_protocol_config::ProtocolConfig;
389 use tokio::time::timeout;
390
391 use crate::{
392 block::{BlockDigest, BlockRef},
393 block_verifier::SignedBlockVerifier,
394 context::Context,
395 transaction::{
396 BlockStatus, LimitReached, NoopTransactionVerifier, TransactionClient,
397 TransactionConsumer,
398 },
399 };
400
401 #[tokio::test(flavor = "current_thread", start_paused = true)]
402 async fn basic_submit_and_consume() {
403 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
404 config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
406 config
407 });
408
409 let context = Arc::new(Context::new_for_test(4).0);
410 let (client, tx_receiver) = TransactionClient::new(context.clone());
411 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
412
413 let mut included_in_block_waiters = FuturesUnordered::new();
415 for i in 0..3 {
416 let transaction =
417 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
418 let w = client
419 .submit_no_wait(vec![transaction])
420 .await
421 .expect("Shouldn't submit successfully transaction");
422 included_in_block_waiters.push(w);
423 }
424
425 let (transactions, ack_transactions, _limit_reached) = consumer.next();
427 assert_eq!(transactions.len(), 3);
428
429 for (i, t) in transactions.iter().enumerate() {
430 let t: String = bcs::from_bytes(t.data()).unwrap();
431 assert_eq!(format!("transaction {i}").to_string(), t);
432 }
433
434 assert!(
435 timeout(Duration::from_secs(1), included_in_block_waiters.next())
436 .await
437 .is_err(),
438 "We should expect to timeout as none of the transactions have been acknowledged yet"
439 );
440
441 ack_transactions(BlockRef::MIN);
443
444 while let Some(result) = included_in_block_waiters.next().await {
446 assert!(result.is_ok());
447 }
448
449 assert!(consumer.is_empty());
451 }
452
453 #[tokio::test(flavor = "current_thread", start_paused = true)]
454 async fn block_status_update_gc_enabled() {
455 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
456 config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
458 config.set_consensus_gc_depth_for_testing(10);
459 config
460 });
461
462 let context = Arc::new(Context::new_for_test(4).0);
463 let (client, tx_receiver) = TransactionClient::new(context.clone());
464 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
465
466 let mut included_in_block_waiters = FuturesUnordered::new();
468 for i in 1..=10 {
469 let transaction =
470 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
471 let w = client
472 .submit_no_wait(vec![transaction])
473 .await
474 .expect("Shouldn't submit successfully transaction");
475 included_in_block_waiters.push(w);
476
477 if i % 2 == 0 {
480 let (transactions, ack_transactions, _limit_reached) = consumer.next();
481 assert_eq!(transactions.len(), 2);
482 ack_transactions(BlockRef::new(
483 i,
484 AuthorityIndex::new_for_test(0),
485 BlockDigest::MIN,
486 ));
487 }
488 }
489
490 let mut block_status_waiters = Vec::new();
492 while let Some(result) = included_in_block_waiters.next().await {
493 let (block_ref, block_status_waiter) =
494 result.expect("Block inclusion waiter shouldn't fail");
495 block_status_waiters.push((block_ref, block_status_waiter));
496 }
497
498 let gc_round = 5;
501 consumer.notify_own_blocks_status(
502 vec![
503 BlockRef::new(6, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
504 BlockRef::new(8, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
505 BlockRef::new(10, AuthorityIndex::new_for_test(0), BlockDigest::MIN),
506 ],
507 gc_round,
508 );
509
510 for (block_ref, waiter) in block_status_waiters {
513 let block_status = waiter.await.expect("Block status waiter shouldn't fail");
514
515 if block_ref.round <= gc_round {
516 assert!(matches!(block_status, BlockStatus::GarbageCollected(_)))
517 } else {
518 assert!(matches!(block_status, BlockStatus::Sequenced(_)));
519 }
520 }
521
522 assert!(consumer.block_status_subscribers.lock().is_empty());
524 }
525
526 #[tokio::test(flavor = "current_thread", start_paused = true)]
527 async fn block_status_update_gc_disabled() {
528 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
529 config.set_consensus_max_transaction_size_bytes_for_testing(2_000); config.set_consensus_max_transactions_in_block_bytes_for_testing(2_000);
531 config.set_consensus_gc_depth_for_testing(0);
532 config
533 });
534
535 let context = Arc::new(Context::new_for_test(4).0);
536 let (client, tx_receiver) = TransactionClient::new(context.clone());
537 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
538
539 let mut included_in_block_waiters = FuturesUnordered::new();
541 for i in 1..=10 {
542 let transaction =
543 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
544 let w = client
545 .submit_no_wait(vec![transaction])
546 .await
547 .expect("Shouldn't submit successfully transaction");
548 included_in_block_waiters.push(w);
549
550 if i % 2 == 0 {
553 let (transactions, ack_transactions, _limit_reached) = consumer.next();
554 assert_eq!(transactions.len(), 2);
555 ack_transactions(BlockRef::new(
556 i,
557 AuthorityIndex::new_for_test(0),
558 BlockDigest::MIN,
559 ));
560 }
561 }
562
563 let mut block_status_waiters = Vec::new();
565 while let Some(result) = included_in_block_waiters.next().await {
566 let (block_ref, block_status_waiter) =
567 result.expect("Block inclusion waiter shouldn't fail");
568 block_status_waiters.push((block_ref, block_status_waiter));
569 }
570
571 for (_block_ref, waiter) in block_status_waiters {
574 let block_status = waiter.await.expect("Block status waiter shouldn't fail");
575 assert!(matches!(block_status, BlockStatus::Sequenced(_)));
576 }
577
578 assert!(consumer.block_status_subscribers.lock().is_empty());
580 }
581
582 #[tokio::test]
583 async fn submit_over_max_fetch_size_and_consume() {
584 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
585 config.set_consensus_max_transaction_size_bytes_for_testing(100);
586 config.set_consensus_max_transactions_in_block_bytes_for_testing(100);
587 config
588 });
589
590 let context = Arc::new(Context::new_for_test(4).0);
591 let (client, tx_receiver) = TransactionClient::new(context.clone());
592 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
593
594 for i in 0..10 {
596 let transaction =
597 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
598 let _w = client
599 .submit_no_wait(vec![transaction])
600 .await
601 .expect("Shouldn't submit successfully transaction");
602 }
603
604 let mut all_transactions = Vec::new();
606 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
607 assert_eq!(transactions.len(), 7);
608
609 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
611 assert!(
612 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
613 "Should have fetched transactions up to {}",
614 context.protocol_config.max_transactions_in_block_bytes()
615 );
616 all_transactions.extend(transactions);
617
618 let (transactions, _ack_transactions, _limit_reached) = consumer.next();
620 assert_eq!(transactions.len(), 3);
621
622 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
624 assert!(
625 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
626 "Should have fetched transactions up to {}",
627 context.protocol_config.max_transactions_in_block_bytes()
628 );
629 all_transactions.extend(transactions);
630
631 assert!(consumer.is_empty());
633
634 for (i, t) in all_transactions.iter().enumerate() {
635 let t: String = bcs::from_bytes(t.data()).unwrap();
636 assert_eq!(format!("transaction {i}").to_string(), t);
637 }
638 }
639
640 #[tokio::test]
641 async fn submit_large_batch_and_ack() {
642 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
643 config.set_consensus_max_transaction_size_bytes_for_testing(15);
644 config.set_consensus_max_transactions_in_block_bytes_for_testing(200);
645 config
646 });
647
648 let context = Arc::new(Context::new_for_test(4).0);
649 let (client, tx_receiver) = TransactionClient::new(context.clone());
650 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
651 let mut all_receivers = Vec::new();
652 for i in 0..10 {
654 let transaction =
655 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
656 let w = client
657 .submit_no_wait(vec![transaction])
658 .await
659 .expect("Should submit successfully transaction");
660 all_receivers.push(w);
661 }
662
663 {
665 let transactions: Vec<_> = (10..15)
666 .map(|i| {
667 bcs::to_bytes(&format!("transaction {i}"))
668 .expect("Serialization should not fail.")
669 })
670 .collect();
671 let w = client
672 .submit_no_wait(transactions)
673 .await
674 .expect("Should submit successfully transaction");
675 all_receivers.push(w);
676 }
677
678 {
680 let i = 15;
681 let transaction =
682 bcs::to_bytes(&format!("transaction {i}")).expect("Serialization should not fail.");
683 let w = client
684 .submit_no_wait(vec![transaction])
685 .await
686 .expect("Shouldn't submit successfully transaction");
687 all_receivers.push(w);
688 }
689
690 {
692 let transactions: Vec<_> = (16..32)
693 .map(|i| {
694 bcs::to_bytes(&format!("transaction {i}"))
695 .expect("Serialization should not fail.")
696 })
697 .collect();
698 let result = client.submit_no_wait(transactions).await.unwrap_err();
699 assert_eq!(
700 result.to_string(),
701 "Transaction bundle size (210B) is over limit (200B)"
702 );
703 }
704
705 let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
709 let mut batch_index = 0;
710 while !consumer.is_empty() {
711 let (transactions, ack_transactions, _limit_reached) = consumer.next();
712
713 assert!(
714 transactions.len() as u64
715 <= context.protocol_config.max_num_transactions_in_block(),
716 "Should have fetched transactions up to {}",
717 context.protocol_config.max_num_transactions_in_block()
718 );
719
720 let total_size: u64 = transactions.iter().map(|t| t.data().len() as u64).sum();
721 assert!(
722 total_size <= context.protocol_config.max_transactions_in_block_bytes(),
723 "Should have fetched transactions up to {}",
724 context.protocol_config.max_transactions_in_block_bytes()
725 );
726
727 if batch_index == 0 {
730 assert_eq!(transactions.len(), 10);
731 for (i, transaction) in transactions.iter().enumerate() {
732 let t: String = bcs::from_bytes(transaction.data()).unwrap();
733 assert_eq!(format!("transaction {}", i).to_string(), t);
734 }
735 } else if batch_index == 1 {
738 assert_eq!(transactions.len(), 6);
739 for (i, transaction) in transactions.iter().enumerate() {
740 let t: String = bcs::from_bytes(transaction.data()).unwrap();
741 assert_eq!(format!("transaction {}", i + 10).to_string(), t);
742 }
743 } else {
744 panic!("Unexpected batch index");
745 }
746
747 batch_index += 1;
748
749 all_acks.push(ack_transactions);
750 }
751
752 for ack in all_acks {
754 ack(BlockRef::MIN);
755 }
756
757 for w in all_receivers {
759 let r = w.await;
760 assert!(r.is_ok());
761 }
762 }
763
764 #[tokio::test]
765 async fn test_submit_over_max_block_size_and_validate_block_size() {
766 {
769 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
770 config.set_consensus_max_transaction_size_bytes_for_testing(100);
771 config.set_consensus_max_num_transactions_in_block_for_testing(10);
772 config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
773 config
774 });
775
776 let context = Arc::new(Context::new_for_test(4).0);
777 let (client, tx_receiver) = TransactionClient::new(context.clone());
778 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
779 let mut all_receivers = Vec::new();
780
781 let max_num_transactions_in_block =
783 context.protocol_config.max_num_transactions_in_block();
784 for i in 0..2 * max_num_transactions_in_block {
785 let transaction = bcs::to_bytes(&format!("transaction {i}"))
786 .expect("Serialization should not fail.");
787 let w = client
788 .submit_no_wait(vec![transaction])
789 .await
790 .expect("Should submit successfully transaction");
791 all_receivers.push(w);
792 }
793
794 let (transactions, _ack_transactions, limit) = consumer.next();
796 assert_eq!(limit, LimitReached::MaxNumOfTransactions);
797 assert_eq!(transactions.len() as u64, max_num_transactions_in_block);
798
799 let block_verifier =
801 SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
802
803 let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
804 assert!(
805 block_verifier.check_transactions(&batch).is_ok(),
806 "Number of transactions limit verification failed"
807 );
808 }
809
810 {
813 let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
814 config.set_consensus_max_transaction_size_bytes_for_testing(100);
815 config.set_consensus_max_num_transactions_in_block_for_testing(1_000);
816 config.set_consensus_max_transactions_in_block_bytes_for_testing(300);
817 config
818 });
819
820 let context = Arc::new(Context::new_for_test(4).0);
821 let (client, tx_receiver) = TransactionClient::new(context.clone());
822 let mut consumer = TransactionConsumer::new(tx_receiver, context.clone());
823 let mut all_receivers = Vec::new();
824
825 let max_transactions_in_block_bytes =
826 context.protocol_config.max_transactions_in_block_bytes();
827 let mut total_size = 0;
828 loop {
829 let transaction = bcs::to_bytes(&"transaction".to_string())
830 .expect("Serialization should not fail.");
831 total_size += transaction.len() as u64;
832 let w = client
833 .submit_no_wait(vec![transaction])
834 .await
835 .expect("Should submit successfully transaction");
836 all_receivers.push(w);
837
838 if total_size >= 2 * max_transactions_in_block_bytes {
840 break;
841 }
842 }
843
844 let (transactions, _ack_transactions, limit) = consumer.next();
846 let batch: Vec<_> = transactions.iter().map(|t| t.data()).collect();
847 let size = batch.iter().map(|t| t.len() as u64).sum::<u64>();
848
849 assert_eq!(limit, LimitReached::MaxBytes);
850 assert!(
851 batch.len()
852 < context
853 .protocol_config
854 .consensus_max_num_transactions_in_block() as usize,
855 "Should have submitted less than the max number of transactions in a block"
856 );
857 assert!(size <= max_transactions_in_block_bytes);
858
859 let block_verifier =
861 SignedBlockVerifier::new(context.clone(), Arc::new(NoopTransactionVerifier {}));
862
863 assert!(
864 block_verifier.check_transactions(&batch).is_ok(),
865 "Total size of transactions limit verification failed"
866 );
867 }
868 }
869}