1use std::{
6 cmp::{Reverse, max},
7 collections::{BTreeSet, BinaryHeap, HashMap, HashSet, hash_map},
8 sync::Arc,
9 time::Duration,
10};
11
12use iota_common::fatal;
13use iota_config::node::AuthorityOverloadConfig;
14use iota_metrics::monitored_scope;
15use iota_types::{
16 base_types::{ObjectID, SequenceNumber, TransactionDigest},
17 committee::EpochId,
18 digests::TransactionEffectsDigest,
19 error::{IotaError, IotaResult},
20 executable_transaction::VerifiedExecutableTransaction,
21 fp_bail, fp_ensure,
22 message_envelope::Message,
23 storage::InputKey,
24 transaction::{SenderSignedData, TransactionDataAPI, VerifiedCertificate},
25};
26use lru::LruCache;
27use parking_lot::RwLock;
28use tap::TapOptional;
29use tokio::{sync::mpsc::UnboundedSender, time::Instant};
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33 authority::{AuthorityMetrics, authority_per_epoch_store::AuthorityPerEpochStore},
34 execution_cache::{ObjectCacheRead, TransactionCacheRead},
35};
36
37#[cfg(test)]
38#[path = "unit_tests/transaction_manager_tests.rs"]
39mod transaction_manager_tests;
40
41const MIN_HASHMAP_CAPACITY: usize = 1000;
43
44pub struct TransactionManager {
54 object_cache_read: Arc<dyn ObjectCacheRead>,
55 transaction_cache_read: Arc<dyn TransactionCacheRead>,
56 tx_ready_certificates: UnboundedSender<PendingCertificate>,
57 metrics: Arc<AuthorityMetrics>,
58 inner: RwLock<RwLock<Inner>>,
63}
64
65#[derive(Clone, Debug)]
66pub struct PendingCertificateStats {
67 #[cfg(test)]
69 pub enqueue_time: Instant,
70 pub ready_time: Option<Instant>,
72}
73
74#[derive(Clone, Debug)]
75pub struct PendingCertificate {
76 pub certificate: VerifiedExecutableTransaction,
78 pub expected_effects_digest: Option<TransactionEffectsDigest>,
81 pub waiting_input_objects: BTreeSet<InputKey>,
84 pub stats: PendingCertificateStats,
86}
87
88struct CacheInner {
89 versioned_cache: LruCache<ObjectID, SequenceNumber>,
90 unversioned_cache: LruCache<ObjectID, ()>,
93
94 max_size: usize,
95 metrics: Arc<AuthorityMetrics>,
96}
97
98impl CacheInner {
99 fn new(max_size: usize, metrics: Arc<AuthorityMetrics>) -> Self {
100 Self {
101 versioned_cache: LruCache::unbounded(),
102 unversioned_cache: LruCache::unbounded(),
103 max_size,
104 metrics,
105 }
106 }
107}
108
109impl CacheInner {
110 fn shrink(&mut self) {
111 while self.versioned_cache.len() > self.max_size {
112 self.versioned_cache.pop_lru();
113 self.metrics
114 .transaction_manager_object_cache_evictions
115 .inc();
116 }
117 while self.unversioned_cache.len() > self.max_size {
118 self.unversioned_cache.pop_lru();
119 self.metrics
120 .transaction_manager_object_cache_evictions
121 .inc();
122 }
123 self.metrics
124 .transaction_manager_object_cache_size
125 .set(self.versioned_cache.len() as i64);
126 self.metrics
127 .transaction_manager_package_cache_size
128 .set(self.unversioned_cache.len() as i64);
129 }
130
131 fn insert(&mut self, object: &InputKey) {
132 if let Some(version) = object.version() {
133 if let Some((previous_id, previous_version)) =
134 self.versioned_cache.push(object.id(), version)
135 {
136 if previous_id == object.id() && previous_version > version {
137 self.versioned_cache.put(object.id(), previous_version);
141 } else {
142 self.metrics
143 .transaction_manager_object_cache_evictions
144 .inc();
145 }
146 }
147 self.metrics
148 .transaction_manager_object_cache_size
149 .set(self.versioned_cache.len() as i64);
150 } else if let Some((previous_id, _)) = self.unversioned_cache.push(object.id(), ()) {
151 if previous_id != object.id() {
155 self.metrics
156 .transaction_manager_package_cache_evictions
157 .inc();
158 }
159 self.metrics
160 .transaction_manager_package_cache_size
161 .set(self.unversioned_cache.len() as i64);
162 }
163 }
164
165 fn is_object_available(&mut self, object: &InputKey) -> Option<bool> {
168 if let Some(version) = object.version() {
169 if let Some(current) = self.versioned_cache.get(&object.id()) {
170 self.metrics.transaction_manager_object_cache_hits.inc();
171 Some(*current >= version)
172 } else {
173 self.metrics.transaction_manager_object_cache_misses.inc();
174 None
175 }
176 } else {
177 self.unversioned_cache
178 .get(&object.id())
179 .tap_some(|_| self.metrics.transaction_manager_package_cache_hits.inc())
180 .tap_none(|| self.metrics.transaction_manager_package_cache_misses.inc())
181 .map(|_| true)
182 }
183 }
184}
185
186struct AvailableObjectsCache {
187 cache: CacheInner,
188 unbounded_cache_enabled: usize,
189}
190
191impl AvailableObjectsCache {
192 fn new(metrics: Arc<AuthorityMetrics>) -> Self {
193 Self::new_with_size(metrics, 100000)
194 }
195
196 fn new_with_size(metrics: Arc<AuthorityMetrics>, size: usize) -> Self {
197 Self {
198 cache: CacheInner::new(size, metrics),
199 unbounded_cache_enabled: 0,
200 }
201 }
202
203 fn enable_unbounded_cache(&mut self) {
204 self.unbounded_cache_enabled += 1;
205 }
206
207 fn disable_unbounded_cache(&mut self) {
208 assert!(self.unbounded_cache_enabled > 0);
209 self.unbounded_cache_enabled -= 1;
210 }
211
212 fn insert(&mut self, object: &InputKey) {
213 self.cache.insert(object);
214 if self.unbounded_cache_enabled == 0 {
215 self.cache.shrink();
216 }
217 }
218
219 fn is_object_available(&mut self, object: &InputKey) -> Option<bool> {
220 self.cache.is_object_available(object)
221 }
222}
223
224struct Inner {
225 epoch: EpochId,
227
228 missing_inputs: HashMap<InputKey, BTreeSet<TransactionDigest>>,
230
231 input_objects: HashMap<ObjectID, TransactionQueue>,
235
236 available_objects_cache: AvailableObjectsCache,
240
241 pending_certificates: HashMap<TransactionDigest, PendingCertificate>,
246
247 executing_certificates: HashSet<TransactionDigest>,
249}
250
251impl Inner {
252 fn new(epoch: EpochId, metrics: Arc<AuthorityMetrics>) -> Inner {
253 Inner {
254 epoch,
255 missing_inputs: HashMap::with_capacity(MIN_HASHMAP_CAPACITY),
256 input_objects: HashMap::with_capacity(MIN_HASHMAP_CAPACITY),
257 available_objects_cache: AvailableObjectsCache::new(metrics),
258 pending_certificates: HashMap::with_capacity(MIN_HASHMAP_CAPACITY),
259 executing_certificates: HashSet::with_capacity(MIN_HASHMAP_CAPACITY),
260 }
261 }
262
263 fn find_ready_transactions(
267 &mut self,
268 input_key: InputKey,
269 update_cache: bool,
270 metrics: &Arc<AuthorityMetrics>,
271 ) -> Vec<PendingCertificate> {
272 if update_cache {
273 self.available_objects_cache.insert(&input_key);
274 }
275
276 let mut ready_certificates = Vec::new();
277
278 let Some(digests) = self.missing_inputs.remove(&input_key) else {
279 return ready_certificates;
281 };
282
283 let input_txns = self
284 .input_objects
285 .get_mut(&input_key.id())
286 .unwrap_or_else(|| {
287 panic!(
288 "# of transactions waiting on object {:?} cannot be 0",
289 input_key.id()
290 )
291 });
292 for digest in digests.iter() {
293 let age_opt = input_txns.remove(digest).expect("digest must be in map");
294 metrics
295 .transaction_manager_transaction_queue_age_s
296 .observe(age_opt.elapsed().as_secs_f64());
297 }
298
299 if input_txns.is_empty() {
300 self.input_objects.remove(&input_key.id());
301 }
302
303 for digest in digests {
304 let pending_cert = self.pending_certificates.get_mut(&digest).unwrap();
306 assert!(pending_cert.waiting_input_objects.remove(&input_key));
307 if pending_cert.waiting_input_objects.is_empty() {
309 let pending_cert = self.pending_certificates.remove(&digest).unwrap();
310 ready_certificates.push(pending_cert);
311 } else {
312 trace!(tx_digest = ?digest,missing = ?pending_cert.waiting_input_objects, "Certificate waiting on missing inputs");
315 }
316 }
317
318 ready_certificates
319 }
320
321 fn maybe_reserve_capacity(&mut self) {
322 self.missing_inputs.maybe_reserve_capacity();
323 self.input_objects.maybe_reserve_capacity();
324 self.pending_certificates.maybe_reserve_capacity();
325 self.executing_certificates.maybe_reserve_capacity();
326 }
327
328 fn maybe_shrink_capacity(&mut self) {
331 self.missing_inputs.maybe_shrink_capacity();
332 self.input_objects.maybe_shrink_capacity();
333 self.pending_certificates.maybe_shrink_capacity();
334 self.executing_certificates.maybe_shrink_capacity();
335 }
336}
337
338impl TransactionManager {
339 pub(crate) fn new(
345 object_cache_read: Arc<dyn ObjectCacheRead>,
346 transaction_cache_read: Arc<dyn TransactionCacheRead>,
347 epoch_store: &AuthorityPerEpochStore,
348 tx_ready_certificates: UnboundedSender<PendingCertificate>,
349 metrics: Arc<AuthorityMetrics>,
350 ) -> TransactionManager {
351 let transaction_manager = TransactionManager {
352 object_cache_read,
353 transaction_cache_read,
354 metrics: metrics.clone(),
355 inner: RwLock::new(RwLock::new(Inner::new(epoch_store.epoch(), metrics))),
356 tx_ready_certificates,
357 };
358 transaction_manager.enqueue(epoch_store.all_pending_execution().unwrap(), epoch_store);
359 transaction_manager
360 }
361
362 #[instrument(level = "trace", skip_all)]
369 pub(crate) fn enqueue_certificates(
370 &self,
371 certs: Vec<VerifiedCertificate>,
372 epoch_store: &AuthorityPerEpochStore,
373 ) {
374 let executable_txns = certs
375 .into_iter()
376 .map(VerifiedExecutableTransaction::new_from_certificate)
377 .collect();
378 self.enqueue(executable_txns, epoch_store)
379 }
380
381 #[instrument(level = "trace", skip_all)]
382 pub(crate) fn enqueue(
383 &self,
384 certs: Vec<VerifiedExecutableTransaction>,
385 epoch_store: &AuthorityPerEpochStore,
386 ) {
387 let certs = certs.into_iter().map(|cert| (cert, None)).collect();
388 self.enqueue_impl(certs, epoch_store)
389 }
390
391 #[instrument(level = "trace", skip_all)]
392 pub(crate) fn enqueue_with_expected_effects_digest(
393 &self,
394 certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
395 epoch_store: &AuthorityPerEpochStore,
396 ) {
397 let certs = certs
398 .into_iter()
399 .map(|(cert, fx)| (cert, Some(fx)))
400 .collect();
401 self.enqueue_impl(certs, epoch_store)
402 }
403
404 fn enqueue_impl(
405 &self,
406 certs: Vec<(
407 VerifiedExecutableTransaction,
408 Option<TransactionEffectsDigest>,
409 )>,
410 epoch_store: &AuthorityPerEpochStore,
411 ) {
412 let reconfig_lock = self.inner.read();
413
414 let certs: Vec<_> = certs
416 .into_iter()
417 .filter(|(cert, _)| {
418 let digest = *cert.digest();
419 if self
421 .transaction_cache_read
422 .is_tx_already_executed(&digest)
423 .unwrap_or_else(|err| {
424 fatal!("Failed to check if tx is already executed: {:?}", err)
425 })
426 {
427 self.metrics
428 .transaction_manager_num_enqueued_certificates
429 .with_label_values(&["already_executed"])
430 .inc();
431 false
432 } else {
433 true
434 }
435 })
436 .collect();
437
438 let mut object_availability: HashMap<InputKey, Option<bool>> = HashMap::new();
439 let mut receiving_objects: HashSet<InputKey> = HashSet::new();
440 let certs: Vec<_> = certs
441 .into_iter()
442 .filter_map(|(cert, fx_digest)| {
443 let input_object_kinds = cert
444 .data()
445 .intent_message()
446 .value
447 .input_objects()
448 .expect("input_objects() cannot fail");
449 let mut input_object_keys =
450 match epoch_store.get_input_object_keys(&cert.key(), &input_object_kinds) {
451 Ok(keys) => keys,
452 Err(e) => {
453 if self
460 .transaction_cache_read
461 .is_tx_already_executed(cert.digest())
462 .expect("is_tx_already_executed cannot fail")
463 {
464 return None;
465 }
466 fatal!("Failed to get input object keys: {:?}", e);
467 }
468 };
469
470 if input_object_kinds.len() != input_object_keys.len() {
471 error!("Duplicated input objects: {:?}", input_object_kinds);
472 }
473
474 let receiving_object_entries =
475 cert.data().intent_message().value.receiving_objects();
476 for entry in receiving_object_entries {
477 let key = InputKey::VersionedObject {
478 id: entry.0,
479 version: entry.1,
480 };
481 receiving_objects.insert(key);
482 input_object_keys.insert(key);
483 }
484
485 for key in input_object_keys.iter() {
486 if key.is_cancelled() {
487 object_availability.insert(*key, Some(true));
490 } else {
491 object_availability.insert(*key, None);
492 }
493 }
494
495 Some((cert, fx_digest, input_object_keys))
496 })
497 .collect();
498
499 {
500 let mut inner = reconfig_lock.write();
501 for (key, value) in object_availability.iter_mut() {
502 if value.is_some_and(|available| available) {
503 continue;
504 }
505 if let Some(available) = inner.available_objects_cache.is_object_available(key) {
506 *value = Some(available);
507 }
508 }
509 inner.available_objects_cache.enable_unbounded_cache();
511 }
512
513 let input_object_cache_misses = object_availability
514 .iter()
515 .filter_map(|(key, value)| if value.is_none() { Some(*key) } else { None })
516 .collect::<Vec<_>>();
517
518 let cache_miss_availability = self
522 .object_cache_read
523 .multi_input_objects_available(
524 &input_object_cache_misses,
525 receiving_objects,
526 epoch_store.epoch(),
527 )
528 .unwrap_or_else(|err| panic!("Checking object existence cannot fail: {:?}", err))
529 .into_iter()
530 .zip(input_object_cache_misses);
531
532 let mut inner = reconfig_lock.write();
538
539 let _scope = monitored_scope("TransactionManager::enqueue::wlock");
540
541 for (available, key) in cache_miss_availability {
542 if available && key.version().is_none() {
543 inner.available_objects_cache.insert(&key);
549 }
550 object_availability
551 .insert(key, Some(available))
552 .expect("entry must already exist");
553 }
554
555 for (key, value) in object_availability.iter_mut() {
560 if !value.expect("all objects must have been checked by now") {
561 if let Some(true) = inner.available_objects_cache.is_object_available(key) {
562 *value = Some(true);
563 }
564 }
565 }
566
567 inner.available_objects_cache.disable_unbounded_cache();
568
569 let mut pending = Vec::new();
570 let pending_cert_enqueue_time = Instant::now();
571
572 for (cert, expected_effects_digest, input_object_keys) in certs {
573 pending.push(PendingCertificate {
574 certificate: cert,
575 expected_effects_digest,
576 waiting_input_objects: input_object_keys,
577 stats: PendingCertificateStats {
578 #[cfg(test)]
579 enqueue_time: pending_cert_enqueue_time,
580 ready_time: None,
581 },
582 });
583 }
584
585 for mut pending_cert in pending {
586 let digest = *pending_cert.certificate.digest();
593
594 if inner.epoch != pending_cert.certificate.epoch() {
595 warn!(
596 "Ignoring enqueued certificate from wrong epoch. Expected={} Certificate={:?}",
597 inner.epoch, pending_cert.certificate
598 );
599 continue;
600 }
601
602 if inner.pending_certificates.contains_key(&digest) {
604 self.metrics
605 .transaction_manager_num_enqueued_certificates
606 .with_label_values(&["already_pending"])
607 .inc();
608 continue;
609 }
610 if inner.executing_certificates.contains(&digest) {
612 self.metrics
613 .transaction_manager_num_enqueued_certificates
614 .with_label_values(&["already_executing"])
615 .inc();
616 continue;
617 }
618 let is_tx_already_executed = self
620 .transaction_cache_read
621 .is_tx_already_executed(&digest)
622 .expect("Check if tx is already executed should not fail");
623 if is_tx_already_executed {
624 self.metrics
625 .transaction_manager_num_enqueued_certificates
626 .with_label_values(&["already_executed"])
627 .inc();
628 continue;
629 }
630
631 let mut waiting_input_objects = BTreeSet::new();
632 std::mem::swap(
633 &mut waiting_input_objects,
634 &mut pending_cert.waiting_input_objects,
635 );
636 for key in waiting_input_objects {
637 if !object_availability[&key].unwrap() {
638 pending_cert.waiting_input_objects.insert(key);
640
641 assert!(
642 inner.missing_inputs.entry(key).or_default().insert(digest),
643 "Duplicated certificate {:?} for missing object {:?}",
644 digest,
645 key
646 );
647 let input_txns = inner.input_objects.entry(key.id()).or_default();
648 input_txns.insert(digest, pending_cert_enqueue_time);
649 }
650 }
651
652 if pending_cert.waiting_input_objects.is_empty() {
654 self.metrics
655 .transaction_manager_num_enqueued_certificates
656 .with_label_values(&["ready"])
657 .inc();
658 pending_cert.stats.ready_time = Some(Instant::now());
659 self.certificate_ready(&mut inner, pending_cert);
661 continue;
662 }
663
664 assert!(
665 inner
666 .pending_certificates
667 .insert(digest, pending_cert)
668 .is_none(),
669 "Duplicated pending certificate {:?}",
670 digest
671 );
672
673 self.metrics
674 .transaction_manager_num_enqueued_certificates
675 .with_label_values(&["pending"])
676 .inc();
677 }
678
679 self.metrics
680 .transaction_manager_num_missing_objects
681 .set(inner.missing_inputs.len() as i64);
682 self.metrics
683 .transaction_manager_num_pending_certificates
684 .set(inner.pending_certificates.len() as i64);
685
686 inner.maybe_reserve_capacity();
687 }
688
689 #[cfg(test)]
690 pub(crate) fn objects_available(
691 &self,
692 input_keys: Vec<InputKey>,
693 epoch_store: &AuthorityPerEpochStore,
694 ) {
695 let reconfig_lock = self.inner.read();
696 let mut inner = reconfig_lock.write();
697 let _scope = monitored_scope("TransactionManager::objects_available::wlock");
698 self.objects_available_locked(&mut inner, epoch_store, input_keys, true, Instant::now());
699 inner.maybe_shrink_capacity();
700 }
701
702 #[instrument(level = "trace", skip_all)]
703 fn objects_available_locked(
704 &self,
705 inner: &mut Inner,
706 epoch_store: &AuthorityPerEpochStore,
707 input_keys: Vec<InputKey>,
708 update_cache: bool,
709 available_time: Instant,
710 ) {
711 if inner.epoch != epoch_store.epoch() {
712 warn!(
713 "Ignoring objects committed from wrong epoch. Expected={} Actual={} \
714 Objects={:?}",
715 inner.epoch,
716 epoch_store.epoch(),
717 input_keys,
718 );
719 return;
720 }
721
722 for input_key in input_keys {
723 trace!(?input_key, "object available");
724 for mut ready_cert in
725 inner.find_ready_transactions(input_key, update_cache, &self.metrics)
726 {
727 ready_cert.stats.ready_time = Some(available_time);
728 self.certificate_ready(inner, ready_cert);
729 }
730 }
731
732 self.metrics
733 .transaction_manager_num_missing_objects
734 .set(inner.missing_inputs.len() as i64);
735 self.metrics
736 .transaction_manager_num_pending_certificates
737 .set(inner.pending_certificates.len() as i64);
738 self.metrics
739 .transaction_manager_num_executing_certificates
740 .set(inner.executing_certificates.len() as i64);
741 }
742
743 #[instrument(level = "trace", skip_all)]
745 pub(crate) fn notify_commit(
746 &self,
747 digest: &TransactionDigest,
748 output_object_keys: Vec<InputKey>,
749 epoch_store: &AuthorityPerEpochStore,
750 ) {
751 let reconfig_lock = self.inner.read();
752 {
753 let commit_time = Instant::now();
754 let mut inner = reconfig_lock.write();
755 let _scope = monitored_scope("TransactionManager::notify_commit::wlock");
756
757 if inner.epoch != epoch_store.epoch() {
758 warn!(
759 "Ignoring committed certificate from wrong epoch. Expected={} Actual={} CertificateDigest={:?}",
760 inner.epoch,
761 epoch_store.epoch(),
762 digest
763 );
764 return;
765 }
766
767 self.objects_available_locked(
768 &mut inner,
769 epoch_store,
770 output_object_keys,
771 true,
772 commit_time,
773 );
774
775 if !inner.executing_certificates.remove(digest) {
776 trace!(
777 "{:?} not found in executing certificates, likely because it is a system transaction",
778 digest
779 );
780 return;
781 }
782
783 self.metrics
784 .transaction_manager_num_executing_certificates
785 .set(inner.executing_certificates.len() as i64);
786
787 inner.maybe_shrink_capacity();
788 }
789 }
790
791 fn certificate_ready(&self, inner: &mut Inner, pending_certificate: PendingCertificate) {
793 trace!(tx_digest = ?pending_certificate.certificate.digest(), "certificate ready");
794 assert_eq!(pending_certificate.waiting_input_objects.len(), 0);
795 assert!(
797 inner
798 .executing_certificates
799 .insert(*pending_certificate.certificate.digest())
800 );
801 self.metrics.txn_ready_rate_tracker.lock().record();
802 let _ = self.tx_ready_certificates.send(pending_certificate);
803 self.metrics.transaction_manager_num_ready.inc();
804 self.metrics.execution_driver_dispatch_queue.inc();
805 }
806
807 pub(crate) fn get_missing_input(&self, digest: &TransactionDigest) -> Option<Vec<InputKey>> {
809 let reconfig_lock = self.inner.read();
810 let inner = reconfig_lock.read();
811 inner
812 .pending_certificates
813 .get(digest)
814 .map(|cert| cert.waiting_input_objects.clone().into_iter().collect())
815 }
816
817 pub(crate) fn objects_queue_len_and_age(
820 &self,
821 keys: Vec<ObjectID>,
822 ) -> Vec<(ObjectID, usize, Option<Duration>)> {
823 let reconfig_lock = self.inner.read();
824 let inner = reconfig_lock.read();
825 keys.into_iter()
826 .map(|key| {
827 let default_map = TransactionQueue::default();
828 let txns = inner.input_objects.get(&key).unwrap_or(&default_map);
829 (
830 key,
831 txns.len(),
832 txns.first().map(|(time, _)| time.elapsed()),
833 )
834 })
835 .collect()
836 }
837
838 pub(crate) fn inflight_queue_len(&self) -> usize {
840 let reconfig_lock = self.inner.read();
841 let inner = reconfig_lock.read();
842 inner.pending_certificates.len() + inner.executing_certificates.len()
843 }
844
845 pub(crate) fn reconfigure(&self, new_epoch: EpochId) {
849 let reconfig_lock = self.inner.write();
850 let mut inner = reconfig_lock.write();
851 *inner = Inner::new(new_epoch, self.metrics.clone());
852 }
853
854 pub(crate) fn check_execution_overload(
855 &self,
856 overload_config: &AuthorityOverloadConfig,
857 tx_data: &SenderSignedData,
858 ) -> IotaResult {
859 let inflight_queue_len = self.inflight_queue_len();
861 fp_ensure!(
862 inflight_queue_len < overload_config.max_transaction_manager_queue_length,
863 IotaError::TooManyTransactionsPendingExecution {
864 queue_len: inflight_queue_len,
865 threshold: overload_config.max_transaction_manager_queue_length,
866 }
867 );
868 tx_data.digest();
869
870 for (object_id, queue_len, txn_age) in self.objects_queue_len_and_age(
871 tx_data
872 .transaction_data()
873 .shared_input_objects()
874 .into_iter()
875 .filter_map(|r| r.mutable.then_some(r.id))
876 .collect(),
877 ) {
878 if queue_len >= overload_config.max_transaction_manager_per_object_queue_length {
880 info!(
881 "Overload detected on object {:?} with {} pending transactions",
882 object_id, queue_len
883 );
884 fp_bail!(IotaError::TooManyTransactionsPendingOnObject {
885 object_id,
886 queue_len,
887 threshold: overload_config.max_transaction_manager_per_object_queue_length,
888 });
889 }
890 if let Some(age) = txn_age {
891 if age >= overload_config.max_txn_age_in_queue {
894 info!(
895 "Overload detected on object {:?} with oldest transaction pending for {}ms",
896 object_id,
897 age.as_millis()
898 );
899 fp_bail!(IotaError::TooOldTransactionPendingOnObject {
900 object_id,
901 txn_age_sec: age.as_secs(),
902 threshold: overload_config.max_txn_age_in_queue.as_secs(),
903 });
904 }
905 }
906 }
907 Ok(())
908 }
909
910 #[cfg(test)]
912 fn check_empty_for_testing(&self) {
913 let reconfig_lock = self.inner.read();
914 let inner = reconfig_lock.read();
915 assert!(
916 inner.missing_inputs.is_empty(),
917 "Missing inputs: {:?}",
918 inner.missing_inputs
919 );
920 assert!(
921 inner.input_objects.is_empty(),
922 "Input objects: {:?}",
923 inner.input_objects
924 );
925 assert!(
926 inner.pending_certificates.is_empty(),
927 "Pending certificates: {:?}",
928 inner.pending_certificates
929 );
930 assert!(
931 inner.executing_certificates.is_empty(),
932 "Executing certificates: {:?}",
933 inner.executing_certificates
934 );
935 }
936}
937
938trait ResizableHashMap<K, V> {
939 fn maybe_reserve_capacity(&mut self);
940 fn maybe_shrink_capacity(&mut self);
941}
942
943impl<K, V> ResizableHashMap<K, V> for HashMap<K, V>
944where
945 K: std::cmp::Eq + std::hash::Hash,
946{
947 fn maybe_reserve_capacity(&mut self) {
950 if self.len() > self.capacity() * 3 / 4 {
951 self.reserve(self.capacity() / 2);
952 }
953 }
954
955 fn maybe_shrink_capacity(&mut self) {
958 if self.len() > MIN_HASHMAP_CAPACITY && self.len() < self.capacity() / 4 {
959 self.shrink_to(max(self.capacity() / 2, MIN_HASHMAP_CAPACITY))
960 }
961 }
962}
963
964trait ResizableHashSet<K> {
965 fn maybe_reserve_capacity(&mut self);
966 fn maybe_shrink_capacity(&mut self);
967}
968
969impl<K> ResizableHashSet<K> for HashSet<K>
970where
971 K: std::cmp::Eq + std::hash::Hash,
972{
973 fn maybe_reserve_capacity(&mut self) {
976 if self.len() > self.capacity() * 3 / 4 {
977 self.reserve(self.capacity() / 2);
978 }
979 }
980
981 fn maybe_shrink_capacity(&mut self) {
984 if self.len() > MIN_HASHMAP_CAPACITY && self.len() < self.capacity() / 4 {
985 self.shrink_to(max(self.capacity() / 2, MIN_HASHMAP_CAPACITY))
986 }
987 }
988}
989
990#[derive(Default, Debug)]
991struct TransactionQueue {
992 digests: HashMap<TransactionDigest, Instant>,
993 ages: BinaryHeap<(Reverse<Instant>, TransactionDigest)>,
994}
995
996impl TransactionQueue {
997 fn len(&self) -> usize {
998 self.digests.len()
999 }
1000
1001 fn is_empty(&self) -> bool {
1002 self.digests.is_empty()
1003 }
1004
1005 fn insert(&mut self, digest: TransactionDigest, time: Instant) {
1008 if let hash_map::Entry::Vacant(entry) = self.digests.entry(digest) {
1009 entry.insert(time);
1010 self.ages.push((Reverse(time), digest));
1011 }
1012 }
1013
1014 fn remove(&mut self, digest: &TransactionDigest) -> Option<Instant> {
1020 let when = self.digests.remove(digest)?;
1021
1022 while !self.ages.is_empty() {
1027 let first = self.ages.peek().expect("heap cannot be empty");
1028
1029 if self.digests.get(&first.1) == Some(&first.0.0) {
1034 break;
1035 }
1036
1037 self.ages.pop();
1038 }
1039
1040 Some(when)
1041 }
1042
1043 fn first(&self) -> Option<(Instant, TransactionDigest)> {
1045 self.ages.peek().map(|(time, digest)| (time.0, *digest))
1046 }
1047}
1048
1049#[cfg(test)]
1050mod test {
1051 use prometheus::Registry;
1052 use rand::{Rng, RngCore};
1053
1054 use super::*;
1055
1056 #[test]
1057 #[cfg_attr(msim, ignore)]
1058 fn test_available_objects_cache() {
1059 let metrics = Arc::new(AuthorityMetrics::new(&Registry::default()));
1060 let mut cache = AvailableObjectsCache::new_with_size(metrics, 5);
1061
1062 for i in 0..10 {
1064 let object = ObjectID::new([i; 32]);
1065 let input_key = InputKey::Package { id: object };
1066 assert_eq!(cache.is_object_available(&input_key), None);
1067 cache.insert(&input_key);
1068 assert_eq!(cache.is_object_available(&input_key), Some(true));
1069 }
1070
1071 for i in 0..5 {
1073 let object = ObjectID::new([i; 32]);
1074 let input_key = InputKey::Package { id: object };
1075 assert_eq!(cache.is_object_available(&input_key), None);
1076 }
1077
1078 for i in 0..10 {
1080 let object = ObjectID::new([i; 32]);
1081 let input_key = InputKey::VersionedObject {
1082 id: object,
1083 version: (i as u64).into(),
1084 };
1085 assert_eq!(cache.is_object_available(&input_key), None);
1086 cache.insert(&input_key);
1087 assert_eq!(cache.is_object_available(&input_key), Some(true));
1088 }
1089
1090 for i in 0..5 {
1092 let object = ObjectID::new([i; 32]);
1093 let input_key = InputKey::VersionedObject {
1094 id: object,
1095 version: (i as u64).into(),
1096 };
1097 assert_eq!(cache.is_object_available(&input_key), None);
1098 }
1099
1100 for i in 5..10 {
1102 let object = ObjectID::new([i; 32]);
1103 let input_key = InputKey::Package { id: object };
1104 assert_eq!(cache.is_object_available(&input_key), Some(true));
1105 }
1106
1107 let object = ObjectID::new([9; 32]);
1109 let input_key = InputKey::VersionedObject {
1110 id: object,
1111 version: 9.into(),
1112 };
1113 assert_eq!(cache.is_object_available(&input_key), Some(true));
1114 let input_key = InputKey::VersionedObject {
1116 id: object,
1117 version: 10.into(),
1118 };
1119 assert_eq!(cache.is_object_available(&input_key), Some(false));
1120 let input_key = InputKey::VersionedObject {
1123 id: object,
1124 version: 8.into(),
1125 };
1126 assert_eq!(cache.is_object_available(&input_key), Some(true));
1127 }
1128
1129 #[test]
1130 #[cfg_attr(msim, ignore)]
1131 fn test_transaction_queue() {
1132 let mut queue = TransactionQueue::default();
1133
1134 let time = Instant::now();
1136 let digest = TransactionDigest::new([1; 32]);
1137 queue.insert(digest, time);
1138 assert_eq!(queue.first(), Some((time, digest)));
1139 queue.remove(&digest);
1140 assert_eq!(queue.first(), None);
1141
1142 assert_eq!(queue.remove(&digest), None);
1144 }
1145
1146 #[test]
1147 #[cfg_attr(msim, ignore)]
1148 fn test_transaction_queue_remove_in_order() {
1149 let time1 = Instant::now();
1151 let digest1 = TransactionDigest::new([1; 32]);
1152 let time2 = time1 + Duration::from_secs(1);
1153 let digest2 = TransactionDigest::new([2; 32]);
1154
1155 let mut queue = TransactionQueue::default();
1156 queue.insert(digest1, time1);
1157 queue.insert(digest2, time2);
1158
1159 assert_eq!(queue.first(), Some((time1, digest1)));
1160 assert_eq!(queue.remove(&digest1), Some(time1));
1161 assert_eq!(queue.first(), Some((time2, digest2)));
1162 assert_eq!(queue.remove(&digest2), Some(time2));
1163 assert_eq!(queue.first(), None);
1164 }
1165
1166 #[test]
1167 #[cfg_attr(msim, ignore)]
1168 fn test_transaction_queue_remove_in_reverse_order() {
1169 let time1 = Instant::now();
1171 let digest1 = TransactionDigest::new([1; 32]);
1172 let time2 = time1 + Duration::from_secs(1);
1173 let digest2 = TransactionDigest::new([2; 32]);
1174
1175 let mut queue = TransactionQueue::default();
1176 queue.insert(digest1, time1);
1177 queue.insert(digest2, time2);
1178
1179 assert_eq!(queue.first(), Some((time1, digest1)));
1180 assert_eq!(queue.remove(&digest2), Some(time2));
1181
1182 assert_eq!(queue.first(), Some((time1, digest1)));
1184 assert_eq!(queue.remove(&digest1), Some(time1));
1185
1186 assert_eq!(queue.first(), None);
1187 }
1188
1189 #[test]
1190 #[cfg_attr(msim, ignore)]
1191 fn test_transaction_queue_reinsert() {
1192 let time1 = Instant::now();
1194 let digest1 = TransactionDigest::new([1; 32]);
1195 let time2 = time1 + Duration::from_secs(1);
1196 let digest2 = TransactionDigest::new([2; 32]);
1197
1198 let mut queue = TransactionQueue::default();
1199 queue.insert(digest1, time1);
1200 queue.insert(digest2, time2);
1201
1202 queue.remove(&digest2);
1204 assert_eq!(queue.first(), Some((time1, digest1)));
1205
1206 let time3 = time2 + Duration::from_secs(1);
1208 queue.insert(digest2, time3);
1209
1210 queue.remove(&digest1);
1212
1213 assert_eq!(queue.first(), Some((time3, digest2)));
1215 }
1216
1217 #[test]
1218 #[cfg_attr(msim, ignore)]
1219 fn test_transaction_queue_double_insert() {
1220 let time1 = Instant::now();
1221 let digest1 = TransactionDigest::new([1; 32]);
1222 let time2 = time1 + Duration::from_secs(1);
1223 let digest2 = TransactionDigest::new([2; 32]);
1224 let time3 = time2 + Duration::from_secs(1);
1225
1226 let mut queue = TransactionQueue::default();
1227 queue.insert(digest1, time1);
1228 queue.insert(digest2, time2);
1229 queue.insert(digest2, time3);
1230
1231 assert_eq!(queue.first(), Some((time1, digest1)));
1233 queue.remove(&digest1);
1234 assert_eq!(queue.first(), Some((time2, digest2)));
1235 }
1236
1237 #[test]
1238 #[cfg_attr(msim, ignore)]
1239 fn transaction_queue_random_test() {
1240 let mut rng = rand::thread_rng();
1241 let mut digests = Vec::new();
1242 for _ in 0..100 {
1243 let mut digest = [0; 32];
1244 rng.fill_bytes(&mut digest);
1245 digests.push(TransactionDigest::new(digest));
1246 }
1247
1248 let mut verifier = HashMap::new();
1249 let mut queue = TransactionQueue::default();
1250
1251 let mut now = Instant::now();
1252
1253 for _ in 0..70 {
1256 now += Duration::from_secs(1);
1257 let digest = digests[rng.gen_range(0..digests.len())];
1258 let time = now;
1259 queue.insert(digest, time);
1260 verifier.entry(digest).or_insert(time);
1261 }
1262
1263 for _ in 0..100000 {
1266 now += Duration::from_secs(1);
1268
1269 let digest = digests[rng.gen_range(0..digests.len())];
1271
1272 if rng.gen_bool(0.5) {
1274 let time = now;
1275 queue.insert(digest, time);
1276 verifier.entry(digest).or_insert(time);
1277 } else {
1278 let time = verifier.remove(&digest);
1279 assert_eq!(queue.remove(&digest), time);
1280 }
1281
1282 assert_eq!(
1283 queue.first(),
1284 verifier
1285 .iter()
1286 .min_by_key(|(_, time)| **time)
1287 .map(|(digest, time)| (*time, *digest))
1288 );
1289 }
1290 }
1291}