1use std::{cmp::Ordering, collections::HashMap};
6
7use iota_protocol_config::PerObjectCongestionControlMode;
8use iota_types::{
9 base_types::{CommitRound, ObjectID, TransactionDigest},
10 executable_transaction::VerifiedExecutableTransaction,
11 transaction::{SharedInputObject, TransactionDataAPI},
12};
13
14use super::transaction_deferral::DeferralKey;
15
16pub(crate) type ExecutionTime = u64;
18pub const MAX_EXECUTION_TIME: ExecutionTime = ExecutionTime::MAX;
19
20pub enum SequencingResult {
24 Schedule(ExecutionTime),
27
28 Defer(DeferralKey, Vec<ObjectID>),
31}
32
33#[derive(PartialEq, Eq, Clone, Debug, Copy)]
43struct ExecutionSlot {
44 start_time: ExecutionTime,
45 end_time: ExecutionTime,
46}
47
48impl ExecutionSlot {
49 fn new(start_time: ExecutionTime, end_time: ExecutionTime) -> Self {
52 debug_assert!(
53 start_time < end_time,
54 "invalid execution slot: start time must be less than end time"
55 );
56 Self {
57 start_time,
58 end_time,
59 }
60 }
61
62 fn duration(&self) -> ExecutionTime {
68 debug_assert!(
69 self.start_time < self.end_time,
70 "invalid execution slot: start time must be less than end time"
71 );
72
73 self.end_time - self.start_time
74 }
75
76 fn intersection(&self, other: &Self) -> Option<Self> {
79 let start_time = self.start_time.max(other.start_time);
80 let end_time = self.end_time.min(other.end_time);
81 if start_time < end_time {
82 Some(Self::new(start_time, end_time))
83 } else {
84 None
85 }
86 }
87
88 fn max_duration_slot() -> Self {
90 Self::new(0, MAX_EXECUTION_TIME)
91 }
92
93 fn contains(&self, other: &Self) -> Ordering {
101 if self.end_time < other.end_time {
102 Ordering::Less
103 } else if self.start_time > other.start_time {
104 Ordering::Greater
105 } else {
106 Ordering::Equal
107 }
108 }
109}
110
111#[derive(PartialEq, Eq, Clone, Debug)]
116struct ObjectExecutionSlots(Vec<ExecutionSlot>);
117
118impl ObjectExecutionSlots {
119 fn new() -> Self {
120 Self(vec![ExecutionSlot::max_duration_slot()])
121 }
122
123 fn max_object_free_slot_start_time(&self, tx_duration: ExecutionTime) -> Option<ExecutionTime> {
127 if let Some(last_free_slot) = self.0.last() {
128 if MAX_EXECUTION_TIME - last_free_slot.start_time >= tx_duration {
129 return Some(last_free_slot.start_time);
131 }
132 }
133 None
134 }
135 fn max_object_occupied_slot_end_time(&self) -> ExecutionTime {
138 self.max_object_free_slot_start_time(0)
142 .unwrap_or(MAX_EXECUTION_TIME)
143 }
144
145 fn remove(&mut self, slot: ExecutionSlot) {
146 let mut index = self
148 .0
149 .binary_search_by(|s| s.contains(&slot))
150 .expect("can't remove a slot that is not available");
151 let free_slot = self.0.remove(index);
176 if slot.start_time > free_slot.start_time {
179 self.0.insert(
180 index,
181 ExecutionSlot::new(free_slot.start_time, slot.start_time),
182 );
183 index += 1;
184 }
185 if slot.end_time < free_slot.end_time {
188 self.0
189 .insert(index, ExecutionSlot::new(slot.end_time, free_slot.end_time));
190 }
191 }
192}
193
194#[derive(PartialEq, Eq, Clone, Debug)]
213pub(crate) struct SharedObjectCongestionTracker {
214 object_execution_slots: HashMap<ObjectID, ObjectExecutionSlots>,
215 mode: PerObjectCongestionControlMode,
216 assign_min_free_execution_slot: bool,
217}
218
219impl SharedObjectCongestionTracker {
220 pub fn new(mode: PerObjectCongestionControlMode, assign_min_free_execution_slot: bool) -> Self {
221 Self {
222 object_execution_slots: HashMap::new(),
223 mode,
224 assign_min_free_execution_slot,
225 }
226 }
227 pub fn initialize_object_execution_slots(
230 &mut self,
231 shared_input_objects: &[SharedInputObject],
232 ) {
233 for obj in shared_input_objects {
234 self.object_execution_slots
235 .entry(obj.id)
236 .or_insert(ObjectExecutionSlots::new());
237 }
238 }
239
240 pub fn compute_tx_start_time(
251 &self,
252 shared_input_objects: &[SharedInputObject],
253 tx_duration: ExecutionTime,
254 ) -> Option<ExecutionTime> {
255 if self.assign_min_free_execution_slot {
256 let initial_free_slot = ExecutionSlot::max_duration_slot();
261 self.compute_min_free_execution_slot(
262 shared_input_objects,
263 tx_duration,
264 initial_free_slot,
265 )
266 } else {
267 let object_start_times: Vec<_> = shared_input_objects
271 .iter()
272 .map(|obj| {
273 self.object_execution_slots
274 .get(&obj.id)
275 .expect("object should have been inserted at the start of this function.")
276 })
277 .map(|slots| slots.max_object_free_slot_start_time(tx_duration))
278 .collect();
279
280 if object_start_times
281 .iter()
282 .all(|start_time| start_time.is_some())
283 {
284 Some(
285 object_start_times
286 .iter()
287 .map(|start_time| start_time.unwrap())
288 .max()
289 .unwrap(),
290 )
291 } else {
292 None
294 }
295 }
296 }
297
298 fn compute_min_free_execution_slot(
304 &self,
305 shared_input_objects: &[SharedInputObject],
306 tx_duration: ExecutionTime,
307 lookup_interval: ExecutionSlot,
308 ) -> Option<ExecutionTime> {
309 let (obj, remaining_objects) = shared_input_objects
312 .split_first()
313 .expect("shared_input_objects must not be empty.");
314
315 for intersection_slot in self
316 .object_execution_slots
317 .get(&obj.id)
318 .expect("object should have been inserted before.")
319 .0
320 .iter()
321 .filter_map(|slot| slot.intersection(&lookup_interval))
322 {
323 if intersection_slot.duration() < tx_duration {
326 continue;
327 }
328 if remaining_objects.is_empty() {
331 return Some(intersection_slot.start_time);
332 }
333 if let Some(lowest_overlap) = self.compute_min_free_execution_slot(
339 remaining_objects,
340 tx_duration,
341 intersection_slot,
342 ) {
343 return Some(lowest_overlap);
344 } else {
345 continue;
346 }
347 }
348 None
351 }
352
353 pub fn get_estimated_execution_duration(
359 &self,
360 cert: &VerifiedExecutableTransaction,
361 ) -> ExecutionTime {
362 match self.mode {
363 PerObjectCongestionControlMode::None => 0,
364 PerObjectCongestionControlMode::TotalGasBudget => cert.gas_budget(),
365 PerObjectCongestionControlMode::TotalTxCount => 1,
366 }
367 }
368
369 pub fn try_schedule(
374 &self,
375 cert: &VerifiedExecutableTransaction,
376 max_execution_duration_per_commit: u64,
377 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
378 commit_round: CommitRound,
379 ) -> SequencingResult {
380 let tx_duration = self.get_estimated_execution_duration(cert);
381 if tx_duration == 0 {
382 return SequencingResult::Schedule(0);
384 }
385
386 let shared_input_objects = cert
387 .data()
388 .inner()
389 .intent_message()
390 .value
391 .shared_input_objects();
392 if shared_input_objects.is_empty() {
393 return SequencingResult::Schedule(0);
395 }
396 if let Some(start_time) = self.compute_tx_start_time(&shared_input_objects, tx_duration) {
398 if start_time + tx_duration <= max_execution_duration_per_commit {
401 return SequencingResult::Schedule(start_time);
403 }
404 }
405
406 let congested_objects: Vec<ObjectID> = if self.assign_min_free_execution_slot {
409 shared_input_objects.iter().map(|obj| obj.id).collect()
412 } else {
413 shared_input_objects
416 .iter()
417 .filter(|obj| {
418 let (end_time, overflow) = self
419 .object_execution_slots
420 .get(&obj.id)
421 .expect("object should have been inserted before.")
422 .max_object_occupied_slot_end_time()
423 .overflowing_add(tx_duration);
424 overflow || end_time > max_execution_duration_per_commit
425 })
426 .map(|obj| obj.id)
427 .collect()
428 };
429 assert!(!congested_objects.is_empty());
430
431 let deferral_key =
432 if let Some(previous_key) = previously_deferred_tx_digests.get(cert.digest()) {
433 DeferralKey::new_for_consensus_round(
436 commit_round + 1,
437 previous_key.deferred_from_round(),
438 )
439 } else {
440 DeferralKey::new_for_consensus_round(commit_round + 1, commit_round)
443 };
444 SequencingResult::Defer(deferral_key, congested_objects)
445 }
446
447 pub fn bump_object_execution_slots(
454 &mut self,
455 cert: &VerifiedExecutableTransaction,
456 start_time: ExecutionTime,
457 ) {
458 let tx_duration = self.get_estimated_execution_duration(cert);
459 if tx_duration == 0 {
460 return;
461 }
462 let end_time = start_time.saturating_add(tx_duration);
463 let occupied_slot = ExecutionSlot::new(start_time, end_time);
464 for obj in cert.shared_input_objects().filter(|obj| obj.mutable) {
465 self.object_execution_slots
466 .get_mut(&obj.id)
467 .expect("object execution slot should have been initialized before.")
468 .remove(occupied_slot);
469 }
470 }
471
472 pub fn max_occupied_slot_end_time(&self) -> ExecutionTime {
474 self.object_execution_slots
475 .values()
476 .map(|slots| slots.max_object_occupied_slot_end_time())
477 .max()
478 .unwrap_or(0)
479 }
480}
481
482#[cfg(test)]
483mod execution_slot_tests {
484 use std::cmp::Ordering;
485
486 use super::ExecutionSlot;
487
488 #[test]
489 fn test_execution_slot_new_and_duration() {
490 let slot = ExecutionSlot::new(1, 3);
492 assert_eq!(slot.duration(), 2);
493 }
494
495 #[test]
496 #[should_panic]
497 fn test_execution_slot_new_zero_duration() {
498 ExecutionSlot::new(1, 1);
500 }
501
502 #[test]
503 #[should_panic]
504 fn test_execution_slot_new_negative_duration() {
505 ExecutionSlot::new(3, 1);
507 }
508
509 #[test]
510 fn test_execution_slot_intersection() {
511 let slot_1 = ExecutionSlot::new(1, 3);
513 let slot_2 = ExecutionSlot::new(1, 3);
514 if let Some(intersection) = slot_1.intersection(&slot_2) {
515 assert_eq!(intersection, ExecutionSlot::new(1, 3));
516 assert_eq!(intersection.duration(), 2);
517 } else {
518 panic!("Expected intersection to be Some");
519 }
520
521 let slot_1 = ExecutionSlot::new(1, 3);
523 let slot_2 = ExecutionSlot::new(4, 5);
524 let intersection = slot_1.intersection(&slot_2);
525 assert!(intersection.is_none());
526
527 let slot_1 = ExecutionSlot::new(1, 3);
529 let slot_2 = ExecutionSlot::new(3, 5);
530 let intersection = slot_1.intersection(&slot_2);
531 assert!(intersection.is_none());
532
533 let slot_1 = ExecutionSlot::new(3, 5);
536 let slot_2 = ExecutionSlot::new(1, 3);
537 let intersection = slot_1.intersection(&slot_2);
538 assert!(intersection.is_none());
539
540 let slot_1 = ExecutionSlot::new(1, 3);
543 let slot_2 = ExecutionSlot::new(3, 5);
544 let intersection = slot_1.intersection(&slot_2);
545 assert!(intersection.is_none());
546
547 let slot_1 = ExecutionSlot::new(1, 5);
550 let slot_2 = ExecutionSlot::new(3, 9);
551 if let Some(intersection) = slot_1.intersection(&slot_2) {
552 assert_eq!(intersection, ExecutionSlot::new(3, 5));
553 assert_eq!(intersection.duration(), 2);
554 } else {
555 panic!("Expected intersection to be Some");
556 }
557
558 let slot_1 = ExecutionSlot::new(4, 9);
560 let slot_2 = ExecutionSlot::new(1, 9);
561 if let Some(intersection) = slot_1.intersection(&slot_2) {
562 assert_eq!(intersection, ExecutionSlot::new(4, 9));
563 assert_eq!(intersection.duration(), 5);
564 } else {
565 panic!("Expected intersection to be Some");
566 }
567
568 let slot_1 = ExecutionSlot::new(1, 3);
570 let slot_2 = ExecutionSlot::new(5, 9);
571 assert!(slot_1.intersection(&slot_2).is_none());
572 }
573
574 #[test]
575 fn test_execution_slot_contains() {
576 let slot_1 = ExecutionSlot::new(1, 5);
578 let slot_2 = ExecutionSlot::new(2, 3);
579 assert_eq!(slot_1.contains(&slot_2), Ordering::Equal);
580
581 let slot_1 = ExecutionSlot::new(1, 5);
583 let slot_2 = ExecutionSlot::new(0, 3);
584 assert_eq!(slot_1.contains(&slot_2), Ordering::Greater);
585
586 let slot_1 = ExecutionSlot::new(2, 5);
588 let slot_2 = ExecutionSlot::new(0, 1);
589 assert_eq!(slot_1.contains(&slot_2), Ordering::Greater);
590
591 let slot_1 = ExecutionSlot::new(1, 5);
593 let slot_2 = ExecutionSlot::new(3, 6);
594 assert_eq!(slot_1.contains(&slot_2), Ordering::Less);
595
596 let slot_1 = ExecutionSlot::new(1, 5);
598 let slot_2 = ExecutionSlot::new(6, 7);
599 assert_eq!(slot_1.contains(&slot_2), Ordering::Less);
600
601 let slot_1 = ExecutionSlot::new(1, 5);
603 let slot_2 = ExecutionSlot::new(1, 5);
604 assert_eq!(slot_1.contains(&slot_2), Ordering::Equal);
605 }
606}
607
608#[cfg(test)]
609pub mod shared_object_test_utils {
610 use iota_protocol_config::PerObjectCongestionControlMode;
611 use iota_test_transaction_builder::TestTransactionBuilder;
612 use iota_types::{
613 base_types::{ObjectID, SequenceNumber, random_object_ref},
614 crypto::{AccountKeyPair, get_key_pair},
615 executable_transaction::VerifiedExecutableTransaction,
616 transaction::{CallArg, ObjectArg, VerifiedTransaction},
617 };
618
619 use super::*;
620
621 pub fn build_transaction(
626 objects: &[(ObjectID, bool)],
627 gas_budget: u64,
628 ) -> VerifiedExecutableTransaction {
629 let (sender, keypair): (_, AccountKeyPair) = get_key_pair();
630 let gas_object = random_object_ref();
631 VerifiedExecutableTransaction::new_system(
632 VerifiedTransaction::new_unchecked(
633 TestTransactionBuilder::new(sender, gas_object, 1000)
634 .with_gas_budget(gas_budget)
635 .move_call(
636 ObjectID::random(),
637 "unimportant_module",
638 "unimportant_function",
639 objects
640 .iter()
641 .map(|(id, mutable)| {
642 CallArg::Object(ObjectArg::SharedObject {
643 id: *id,
644 initial_shared_version: SequenceNumber::new(),
645 mutable: *mutable,
646 })
647 })
648 .collect(),
649 )
650 .build_and_sign(&keypair),
651 ),
652 0,
653 )
654 }
655
656 pub(crate) fn initialize_tracker_and_compute_tx_start_time(
657 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
658 shared_input_objects: &[SharedInputObject],
659 tx_duration: ExecutionTime,
660 ) -> Option<ExecutionTime> {
661 shared_object_congestion_tracker.initialize_object_execution_slots(shared_input_objects);
662 shared_object_congestion_tracker.compute_tx_start_time(shared_input_objects, tx_duration)
663 }
664
665 pub(crate) fn initialize_tracker_and_try_schedule(
666 shared_object_congestion_tracker: &mut SharedObjectCongestionTracker,
667 cert: &VerifiedExecutableTransaction,
668 max_execution_duration_per_commit: u64,
669 previously_deferred_tx_digests: &HashMap<TransactionDigest, DeferralKey>,
670 commit_round: CommitRound,
671 ) -> SequencingResult {
672 shared_object_congestion_tracker.initialize_object_execution_slots(
673 &cert
674 .data()
675 .inner()
676 .intent_message()
677 .value
678 .shared_input_objects(),
679 );
680 shared_object_congestion_tracker.try_schedule(
681 cert,
682 max_execution_duration_per_commit,
683 previously_deferred_tx_digests,
684 commit_round,
685 )
686 }
687
688 pub(crate) fn new_congestion_tracker_with_initial_value_for_test(
689 init_values: &[(ObjectID, ExecutionTime)],
690 mode: PerObjectCongestionControlMode,
691 assign_min_free_execution_slot: bool,
692 ) -> SharedObjectCongestionTracker {
693 let mut shared_object_congestion_tracker =
694 SharedObjectCongestionTracker::new(mode, assign_min_free_execution_slot);
695 for (object_id, duration) in init_values {
697 match mode {
698 PerObjectCongestionControlMode::None => {}
699 PerObjectCongestionControlMode::TotalGasBudget => {
700 let transaction = build_transaction(&[(*object_id, true)], *duration);
701 let start_time = initialize_tracker_and_compute_tx_start_time(&mut shared_object_congestion_tracker, &transaction.data().inner().intent_message().value.shared_input_objects(), *duration).expect("initial value should be fit within the available range of slots in the tracker");
702 shared_object_congestion_tracker
703 .bump_object_execution_slots(&transaction, start_time);
704 }
705 PerObjectCongestionControlMode::TotalTxCount => {
706 for _ in 0..*duration {
707 let transaction = build_transaction(&[(*object_id, true)], 1);
708 let start_time = initialize_tracker_and_compute_tx_start_time(&mut shared_object_congestion_tracker, &transaction.data().inner().intent_message().value.shared_input_objects(), 1).expect("initial value should be fit within the available range of slots in the tracker");
709 shared_object_congestion_tracker
710 .bump_object_execution_slots(&transaction, start_time);
711 }
712 }
713 }
714 }
715 shared_object_congestion_tracker
716 }
717
718 pub fn construct_shared_input_objects(objects: &[(ObjectID, bool)]) -> Vec<SharedInputObject> {
719 objects
720 .iter()
721 .map(|(id, mutable)| SharedInputObject {
722 id: *id,
723 initial_shared_version: SequenceNumber::new(),
724 mutable: *mutable,
725 })
726 .collect()
727 }
728}
729
730#[cfg(test)]
731mod object_cost_tests {
732 use rstest::rstest;
733
734 use super::{shared_object_test_utils::*, *};
735
736 #[rstest]
737 fn test_compute_tx_start_at_time(#[values(true, false)] assign_min_free_execution_slot: bool) {
738 let object_id_0 = ObjectID::random();
739 let object_id_1 = ObjectID::random();
740 let object_id_2 = ObjectID::random();
741 let object_id_3 = ObjectID::random();
742
743 let mut shared_object_congestion_tracker =
745 new_congestion_tracker_with_initial_value_for_test(
746 &[(object_id_0, 5), (object_id_1, 9)],
747 PerObjectCongestionControlMode::TotalGasBudget,
748 assign_min_free_execution_slot,
749 );
750
751 let objects = &[
767 (object_id_0, true),
768 (object_id_1, true),
769 (object_id_2, true),
770 ];
771 let shared_input_objects = construct_shared_input_objects(objects);
772 assert_eq!(
773 initialize_tracker_and_compute_tx_start_time(
774 &mut shared_object_congestion_tracker,
775 &shared_input_objects,
776 10
777 ),
778 Some(9)
779 );
780 let tx = build_transaction(objects, 1);
782 shared_object_congestion_tracker.bump_object_execution_slots(&tx, 9);
783
784 let shared_input_objects = construct_shared_input_objects(&[(object_id_0, false)]);
802 assert_eq!(
803 initialize_tracker_and_compute_tx_start_time(
804 &mut shared_object_congestion_tracker,
805 &shared_input_objects,
806 4
807 ),
808 if assign_min_free_execution_slot {
809 Some(5)
810 } else {
811 Some(10)
812 }
813 );
814 assert_eq!(
817 initialize_tracker_and_compute_tx_start_time(
818 &mut shared_object_congestion_tracker,
819 &shared_input_objects,
820 5
821 ),
822 Some(10)
823 );
824
825 let shared_input_objects = construct_shared_input_objects(&[(object_id_1, true)]);
828 assert_eq!(
829 initialize_tracker_and_compute_tx_start_time(
830 &mut shared_object_congestion_tracker,
831 &shared_input_objects,
832 5
833 ),
834 Some(10)
835 );
836
837 let shared_input_objects =
840 construct_shared_input_objects(&[(object_id_0, false), (object_id_1, false)]);
841 assert_eq!(
842 initialize_tracker_and_compute_tx_start_time(
843 &mut shared_object_congestion_tracker,
844 &shared_input_objects,
845 5
846 ),
847 Some(10)
848 );
849
850 let shared_input_objects =
853 construct_shared_input_objects(&[(object_id_0, true), (object_id_1, true)]);
854 assert_eq!(
855 initialize_tracker_and_compute_tx_start_time(
856 &mut shared_object_congestion_tracker,
857 &shared_input_objects,
858 5
859 ),
860 Some(10)
861 );
862
863 let shared_input_objects = construct_shared_input_objects(&[(object_id_2, true)]);
867 assert_eq!(
868 initialize_tracker_and_compute_tx_start_time(
869 &mut shared_object_congestion_tracker,
870 &shared_input_objects,
871 5
872 ),
873 if assign_min_free_execution_slot {
874 Some(0)
875 } else {
876 Some(10)
877 }
878 );
879
880 let shared_input_objects = construct_shared_input_objects(&[(object_id_3, true)]);
884 assert_eq!(
885 initialize_tracker_and_compute_tx_start_time(
886 &mut shared_object_congestion_tracker,
887 &shared_input_objects,
888 5
889 ),
890 Some(0)
891 );
892
893 let shared_input_objects =
897 construct_shared_input_objects(&[(object_id_0, false), (object_id_2, false)]);
898 assert_eq!(
899 initialize_tracker_and_compute_tx_start_time(
900 &mut shared_object_congestion_tracker,
901 &shared_input_objects,
902 3
903 ),
904 if assign_min_free_execution_slot {
905 Some(5)
906 } else {
907 Some(10)
908 }
909 );
910 }
911
912 #[rstest]
913 fn test_try_schedule_return_correct_congested_objects(
914 #[values(
915 PerObjectCongestionControlMode::TotalGasBudget,
916 PerObjectCongestionControlMode::TotalTxCount
917 )]
918 mode: PerObjectCongestionControlMode,
919 #[values(true, false)] assign_min_free_execution_slot: bool,
920 ) {
921 let shared_obj_0 = ObjectID::random();
924 let shared_obj_1 = ObjectID::random();
925
926 let tx_gas_budget = 5;
927
928 let max_execution_duration_per_commit = match mode {
931 PerObjectCongestionControlMode::None => unreachable!(),
932 PerObjectCongestionControlMode::TotalGasBudget => 12,
933 PerObjectCongestionControlMode::TotalTxCount => 3,
934 };
935
936 let mut shared_object_congestion_tracker = match mode {
937 PerObjectCongestionControlMode::None => unreachable!(),
938 PerObjectCongestionControlMode::TotalGasBudget => {
939 new_congestion_tracker_with_initial_value_for_test(
947 &[(shared_obj_0, 9), (shared_obj_1, 1)],
948 mode,
949 assign_min_free_execution_slot,
950 )
951 }
952 PerObjectCongestionControlMode::TotalTxCount => {
953 new_congestion_tracker_with_initial_value_for_test(
959 &[(shared_obj_0, 2), (shared_obj_1, 1)],
960 mode,
961 assign_min_free_execution_slot,
962 )
963 }
964 };
965 let tx = build_transaction(&[(shared_obj_0, true), (shared_obj_1, true)], 1);
967 shared_object_congestion_tracker.bump_object_execution_slots(
968 &tx,
969 match mode {
970 PerObjectCongestionControlMode::None => unreachable!(),
971 PerObjectCongestionControlMode::TotalGasBudget => 10,
979 PerObjectCongestionControlMode::TotalTxCount => 2,
985 },
986 );
987
988 for mutable in [true, false].iter() {
990 let tx = build_transaction(&[(shared_obj_0, *mutable)], tx_gas_budget);
991 if let SequencingResult::Defer(_, congested_objects) = shared_object_congestion_tracker
992 .try_schedule(&tx, max_execution_duration_per_commit, &HashMap::new(), 0)
993 {
994 assert_eq!(congested_objects.len(), 1);
995 assert_eq!(congested_objects[0], shared_obj_0);
996 } else {
997 panic!("should defer");
998 }
999 }
1000
1001 for mutable in [true, false].iter() {
1004 let tx = build_transaction(&[(shared_obj_1, *mutable)], tx_gas_budget);
1005 let sequencing_result = initialize_tracker_and_try_schedule(
1006 &mut shared_object_congestion_tracker,
1007 &tx,
1008 max_execution_duration_per_commit,
1009 &HashMap::new(),
1010 0,
1011 );
1012 if assign_min_free_execution_slot {
1013 matches!(sequencing_result, SequencingResult::Schedule(1));
1014 } else if let SequencingResult::Defer(_, congested_objects) = sequencing_result {
1015 assert_eq!(congested_objects.len(), 1);
1016 assert_eq!(congested_objects[0], shared_obj_1);
1017 } else {
1018 panic!("should defer");
1019 }
1020 }
1021
1022 for mutable_0 in [true, false].iter() {
1025 for mutable_1 in [true, false].iter() {
1026 let tx = build_transaction(
1027 &[(shared_obj_0, *mutable_0), (shared_obj_1, *mutable_1)],
1028 tx_gas_budget,
1029 );
1030 if let SequencingResult::Defer(_, congested_objects) =
1031 initialize_tracker_and_try_schedule(
1032 &mut shared_object_congestion_tracker,
1033 &tx,
1034 max_execution_duration_per_commit,
1035 &HashMap::new(),
1036 0,
1037 )
1038 {
1039 assert_eq!(congested_objects.len(), 2);
1041 assert_eq!(congested_objects[0], shared_obj_0);
1042 assert_eq!(congested_objects[1], shared_obj_1);
1043 } else {
1044 panic!("should defer");
1045 }
1046 }
1047 }
1048 }
1049
1050 #[rstest]
1051 fn test_try_schedule_return_correct_deferral_key(
1052 #[values(
1053 PerObjectCongestionControlMode::TotalGasBudget,
1054 PerObjectCongestionControlMode::TotalTxCount
1055 )]
1056 mode: PerObjectCongestionControlMode,
1057 ) {
1058 let shared_obj_0 = ObjectID::random();
1059 let tx = build_transaction(&[(shared_obj_0, true)], 100);
1060 let max_execution_duration_per_commit = 0;
1062 let mut shared_object_congestion_tracker = SharedObjectCongestionTracker::new(mode, false);
1063
1064 let mut previously_deferred_tx_digests = HashMap::new();
1066 previously_deferred_tx_digests.insert(
1067 TransactionDigest::random(),
1068 DeferralKey::ConsensusRound {
1069 future_round: 10,
1070 deferred_from_round: 5,
1071 },
1072 );
1073
1074 if let SequencingResult::Defer(
1076 DeferralKey::ConsensusRound {
1077 future_round,
1078 deferred_from_round,
1079 },
1080 _,
1081 ) = initialize_tracker_and_try_schedule(
1082 &mut shared_object_congestion_tracker,
1083 &tx,
1084 max_execution_duration_per_commit,
1085 &previously_deferred_tx_digests,
1086 10,
1087 ) {
1088 assert_eq!(future_round, 11);
1089 assert_eq!(deferred_from_round, 10);
1090 } else {
1091 panic!("should defer");
1092 }
1093
1094 previously_deferred_tx_digests.insert(
1096 *tx.digest(),
1097 DeferralKey::Randomness {
1098 deferred_from_round: 4,
1099 },
1100 );
1101
1102 if let SequencingResult::Defer(
1105 DeferralKey::ConsensusRound {
1106 future_round,
1107 deferred_from_round,
1108 },
1109 _,
1110 ) = initialize_tracker_and_try_schedule(
1111 &mut shared_object_congestion_tracker,
1112 &tx,
1113 max_execution_duration_per_commit,
1114 &previously_deferred_tx_digests,
1115 10,
1116 ) {
1117 assert_eq!(future_round, 11);
1118 assert_eq!(deferred_from_round, 4);
1119 } else {
1120 panic!("should defer");
1121 }
1122
1123 previously_deferred_tx_digests.insert(
1125 *tx.digest(),
1126 DeferralKey::ConsensusRound {
1127 future_round: 10,
1128 deferred_from_round: 5,
1129 },
1130 );
1131
1132 if let SequencingResult::Defer(
1135 DeferralKey::ConsensusRound {
1136 future_round,
1137 deferred_from_round,
1138 },
1139 _,
1140 ) = initialize_tracker_and_try_schedule(
1141 &mut shared_object_congestion_tracker,
1142 &tx,
1143 max_execution_duration_per_commit,
1144 &previously_deferred_tx_digests,
1145 10,
1146 ) {
1147 assert_eq!(future_round, 11);
1148 assert_eq!(deferred_from_round, 5);
1149 } else {
1150 panic!("should defer");
1151 }
1152 }
1153
1154 #[rstest]
1155 fn test_bump_object_execution_slots(
1156 #[values(
1157 PerObjectCongestionControlMode::TotalGasBudget,
1158 PerObjectCongestionControlMode::TotalTxCount
1159 )]
1160 mode: PerObjectCongestionControlMode,
1161 #[values(true, false)] assign_min_free_execution_slot: bool,
1162 ) {
1163 let object_id_0 = ObjectID::random();
1164 let object_id_1 = ObjectID::random();
1165 let object_id_2 = ObjectID::random();
1166
1167 let mut shared_object_congestion_tracker =
1168 new_congestion_tracker_with_initial_value_for_test(
1169 &[(object_id_0, 5), (object_id_1, 10)],
1170 mode,
1171 assign_min_free_execution_slot,
1172 );
1173 assert_eq!(
1174 shared_object_congestion_tracker.max_occupied_slot_end_time(),
1175 10
1176 );
1177
1178 let cert = build_transaction(&[(object_id_0, false), (object_id_1, false)], 10);
1180 let cert_duration =
1181 shared_object_congestion_tracker.get_estimated_execution_duration(&cert);
1182 let start_time = initialize_tracker_and_compute_tx_start_time(
1183 &mut shared_object_congestion_tracker,
1184 &cert
1185 .data()
1186 .inner()
1187 .intent_message()
1188 .value
1189 .shared_input_objects(),
1190 cert_duration,
1191 )
1192 .expect("start time should be computable");
1193
1194 shared_object_congestion_tracker.bump_object_execution_slots(&cert, start_time);
1195 assert_eq!(
1196 shared_object_congestion_tracker,
1197 new_congestion_tracker_with_initial_value_for_test(
1198 &[(object_id_0, 5), (object_id_1, 10)],
1199 mode,
1200 assign_min_free_execution_slot,
1201 )
1202 );
1203 assert_eq!(
1204 shared_object_congestion_tracker.max_occupied_slot_end_time(),
1205 10
1206 );
1207
1208 let cert = build_transaction(&[(object_id_0, true), (object_id_1, false)], 10);
1211 let cert_duration =
1212 shared_object_congestion_tracker.get_estimated_execution_duration(&cert);
1213 let start_time = initialize_tracker_and_compute_tx_start_time(
1214 &mut shared_object_congestion_tracker,
1215 &cert
1216 .data()
1217 .inner()
1218 .intent_message()
1219 .value
1220 .shared_input_objects(),
1221 cert_duration,
1222 )
1223 .expect("start time should be computable");
1224 shared_object_congestion_tracker.bump_object_execution_slots(&cert, start_time);
1225 let expected_object_0_duration = match mode {
1226 PerObjectCongestionControlMode::None => unreachable!(),
1227 PerObjectCongestionControlMode::TotalGasBudget => 20,
1228 PerObjectCongestionControlMode::TotalTxCount => 11,
1229 };
1230 assert_eq!(
1231 shared_object_congestion_tracker
1232 .object_execution_slots
1233 .get(&object_id_0)
1234 .unwrap()
1235 .max_object_occupied_slot_end_time(),
1236 expected_object_0_duration
1237 );
1238 assert_eq!(
1239 shared_object_congestion_tracker
1240 .object_execution_slots
1241 .get(&object_id_1)
1242 .unwrap()
1243 .max_object_occupied_slot_end_time(),
1244 10
1245 );
1246 assert_eq!(
1247 shared_object_congestion_tracker.max_occupied_slot_end_time(),
1248 expected_object_0_duration
1249 );
1250
1251 let cert = build_transaction(
1254 &[
1255 (object_id_0, true),
1256 (object_id_1, true),
1257 (object_id_2, true),
1258 ],
1259 10,
1260 );
1261 let expected_object_duration = match mode {
1262 PerObjectCongestionControlMode::None => unreachable!(),
1263 PerObjectCongestionControlMode::TotalGasBudget => 30,
1264 PerObjectCongestionControlMode::TotalTxCount => 12,
1265 };
1266 let cert_duration =
1267 shared_object_congestion_tracker.get_estimated_execution_duration(&cert);
1268 let start_time = initialize_tracker_and_compute_tx_start_time(
1269 &mut shared_object_congestion_tracker,
1270 &cert
1271 .data()
1272 .inner()
1273 .intent_message()
1274 .value
1275 .shared_input_objects(),
1276 cert_duration,
1277 )
1278 .expect("start time should be computable");
1279 shared_object_congestion_tracker.bump_object_execution_slots(&cert, start_time);
1280 assert_eq!(
1281 shared_object_congestion_tracker
1282 .object_execution_slots
1283 .get(&object_id_0)
1284 .unwrap()
1285 .max_object_occupied_slot_end_time(),
1286 expected_object_duration
1287 );
1288 assert_eq!(
1289 shared_object_congestion_tracker
1290 .object_execution_slots
1291 .get(&object_id_1)
1292 .unwrap()
1293 .max_object_occupied_slot_end_time(),
1294 expected_object_duration
1295 );
1296 assert_eq!(
1297 shared_object_congestion_tracker
1298 .object_execution_slots
1299 .get(&object_id_2)
1300 .unwrap()
1301 .max_object_occupied_slot_end_time(),
1302 expected_object_duration
1303 );
1304 assert_eq!(
1305 shared_object_congestion_tracker.max_occupied_slot_end_time(),
1306 expected_object_duration
1307 );
1308 }
1309
1310 #[rstest]
1311 fn test_slots_overflow(#[values(true, false)] assign_min_free_execution_slot: bool) {
1312 let object_id_0 = ObjectID::random();
1313 let object_id_1 = ObjectID::random();
1314 let object_id_2 = ObjectID::random();
1315 let max_execution_duration_per_commit = u64::MAX;
1317
1318 let mut shared_object_congestion_tracker =
1328 new_congestion_tracker_with_initial_value_for_test(
1329 &[(object_id_0, u64::MAX - 1), (object_id_1, u64::MAX - 1)],
1330 PerObjectCongestionControlMode::TotalGasBudget,
1331 assign_min_free_execution_slot,
1332 );
1333
1334 let tx = build_transaction(&[(object_id_0, true)], 1);
1335 if let SequencingResult::Schedule(start_time) = initialize_tracker_and_try_schedule(
1336 &mut shared_object_congestion_tracker,
1337 &tx,
1338 max_execution_duration_per_commit,
1339 &HashMap::new(),
1340 0,
1341 ) {
1342 shared_object_congestion_tracker.bump_object_execution_slots(&tx, start_time);
1351 assert_eq!(
1352 shared_object_congestion_tracker
1353 .object_execution_slots
1354 .get(&object_id_0)
1355 .unwrap()
1356 .max_object_occupied_slot_end_time(),
1357 MAX_EXECUTION_TIME
1358 );
1359 assert_eq!(
1360 shared_object_congestion_tracker
1361 .object_execution_slots
1362 .get(&object_id_1)
1363 .unwrap()
1364 .max_object_occupied_slot_end_time(),
1365 MAX_EXECUTION_TIME - 1
1366 );
1367 } else {
1368 panic!("transaction is not congesting, should not defer");
1369 }
1370
1371 let tx = build_transaction(&[(object_id_0, true), (object_id_1, true)], 1);
1372 if let SequencingResult::Defer(_, congested_objects) = initialize_tracker_and_try_schedule(
1373 &mut shared_object_congestion_tracker,
1374 &tx,
1375 max_execution_duration_per_commit,
1376 &HashMap::new(),
1377 0,
1378 ) {
1379 assert_eq!(congested_objects[0], object_id_0);
1381 if assign_min_free_execution_slot {
1382 assert_eq!(congested_objects.len(), 2);
1383 assert_eq!(congested_objects[1], object_id_1);
1384 } else {
1385 assert_eq!(congested_objects.len(), 1);
1386 }
1387 } else {
1388 panic!("transaction is congesting, should defer");
1389 }
1390 let cert_duration = shared_object_congestion_tracker.get_estimated_execution_duration(&tx);
1391 assert!(
1392 initialize_tracker_and_compute_tx_start_time(
1393 &mut shared_object_congestion_tracker,
1394 &tx.data()
1395 .inner()
1396 .intent_message()
1397 .value
1398 .shared_input_objects(),
1399 cert_duration,
1400 )
1401 .is_none()
1402 );
1403
1404 let mut shared_object_congestion_tracker =
1411 new_congestion_tracker_with_initial_value_for_test(
1412 &[(object_id_0, 0), (object_id_1, 1), (object_id_2, 2)],
1413 PerObjectCongestionControlMode::TotalGasBudget,
1414 assign_min_free_execution_slot,
1415 );
1416
1417 let tx = build_transaction(
1418 &[
1419 (object_id_0, true),
1420 (object_id_1, true),
1421 (object_id_2, true),
1422 ],
1423 MAX_EXECUTION_TIME - 1,
1424 );
1425 if let SequencingResult::Defer(_, congested_objects) = initialize_tracker_and_try_schedule(
1426 &mut shared_object_congestion_tracker,
1427 &tx,
1428 max_execution_duration_per_commit,
1429 &HashMap::new(),
1430 0,
1431 ) {
1432 if assign_min_free_execution_slot {
1435 assert_eq!(congested_objects.len(), 3);
1436 assert_eq!(congested_objects[0], object_id_0);
1437 assert_eq!(congested_objects[1], object_id_1);
1438 assert_eq!(congested_objects[2], object_id_2);
1439 } else {
1440 assert_eq!(congested_objects.len(), 1);
1441 assert_eq!(congested_objects[0], object_id_2);
1442 }
1443 } else {
1444 panic!("case 2: object 2 is congested, should defer");
1445 }
1446
1447 let cert_duration = shared_object_congestion_tracker.get_estimated_execution_duration(&tx);
1448 assert!(
1449 initialize_tracker_and_compute_tx_start_time(
1450 &mut shared_object_congestion_tracker,
1451 &tx.data()
1452 .inner()
1453 .intent_message()
1454 .value
1455 .shared_input_objects(),
1456 cert_duration,
1457 )
1458 .is_none()
1459 );
1460
1461 let mut shared_object_congestion_tracker =
1469 new_congestion_tracker_with_initial_value_for_test(
1470 &[(object_id_0, u64::MAX)],
1471 PerObjectCongestionControlMode::TotalGasBudget,
1472 assign_min_free_execution_slot,
1473 );
1474
1475 let tx = build_transaction(&[(object_id_0, true)], u64::MAX);
1476 if let SequencingResult::Defer(_, congested_objects) = initialize_tracker_and_try_schedule(
1477 &mut shared_object_congestion_tracker,
1478 &tx,
1479 max_execution_duration_per_commit,
1480 &HashMap::new(),
1481 0,
1482 ) {
1483 assert_eq!(congested_objects.len(), 1);
1484 assert_eq!(congested_objects[0], object_id_0);
1485 } else {
1486 panic!("case 3: object 0 is congested, should defer");
1487 }
1488
1489 let cert_duration = shared_object_congestion_tracker.get_estimated_execution_duration(&tx);
1490 assert!(
1491 initialize_tracker_and_compute_tx_start_time(
1492 &mut shared_object_congestion_tracker,
1493 &tx.data()
1494 .inner()
1495 .intent_message()
1496 .value
1497 .shared_input_objects(),
1498 cert_duration,
1499 )
1500 .is_none()
1501 );
1502 }
1503}