iota_core/
transaction_manager.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::{Reverse, max},
7    collections::{BTreeSet, BinaryHeap, HashMap, HashSet, hash_map},
8    sync::Arc,
9    time::Duration,
10};
11
12use iota_common::fatal;
13use iota_config::node::AuthorityOverloadConfig;
14use iota_metrics::monitored_scope;
15use iota_types::{
16    base_types::{ObjectID, SequenceNumber, TransactionDigest},
17    committee::EpochId,
18    digests::TransactionEffectsDigest,
19    error::{IotaError, IotaResult},
20    executable_transaction::VerifiedExecutableTransaction,
21    fp_bail, fp_ensure,
22    message_envelope::Message,
23    storage::InputKey,
24    transaction::{SenderSignedData, TransactionDataAPI, VerifiedCertificate},
25};
26use lru::LruCache;
27use parking_lot::RwLock;
28use tap::TapOptional;
29use tokio::{sync::mpsc::UnboundedSender, time::Instant};
30use tracing::{error, info, instrument, trace, warn};
31
32use crate::{
33    authority::{AuthorityMetrics, authority_per_epoch_store::AuthorityPerEpochStore},
34    execution_cache::{ObjectCacheRead, TransactionCacheRead},
35};
36
37#[cfg(test)]
38#[path = "unit_tests/transaction_manager_tests.rs"]
39mod transaction_manager_tests;
40
41/// Minimum capacity of HashMaps used in TransactionManager.
42const MIN_HASHMAP_CAPACITY: usize = 1000;
43
44/// TransactionManager is responsible for managing object dependencies of
45/// pending transactions, and publishing a stream of certified transactions
46/// (certificates) ready to execute. It receives certificates from consensus,
47/// validator RPC handlers, and checkpoint executor. Execution driver subscribes
48/// to the stream of ready certificates from TransactionManager, and
49/// executes them in parallel.
50/// The actual execution logic is inside AuthorityState. After a transaction
51/// commits and updates storage, committed objects and certificates are notified
52/// back to TransactionManager.
53pub struct TransactionManager {
54    object_cache_read: Arc<dyn ObjectCacheRead>,
55    transaction_cache_read: Arc<dyn TransactionCacheRead>,
56    tx_ready_certificates: UnboundedSender<PendingCertificate>,
57    metrics: Arc<AuthorityMetrics>,
58    // inner is a doubly nested lock so that we can enforce that an outer lock (for read) is held
59    // before the inner lock (for read or write) can be acquired. During reconfiguration, we
60    // acquire the outer lock for write, to ensure that no other threads can be running while
61    // we reconfigure.
62    inner: RwLock<RwLock<Inner>>,
63}
64
65#[derive(Clone, Debug)]
66pub struct PendingCertificateStats {
67    // The time this certificate enters transaction manager.
68    #[cfg(test)]
69    pub enqueue_time: Instant,
70    // The time this certificate becomes ready for execution.
71    pub ready_time: Option<Instant>,
72}
73
74#[derive(Clone, Debug)]
75pub struct PendingCertificate {
76    // Certified transaction to be executed.
77    pub certificate: VerifiedExecutableTransaction,
78    // When executing from checkpoint, the certified effects digest is provided, so that forks can
79    // be detected prior to committing the transaction.
80    pub expected_effects_digest: Option<TransactionEffectsDigest>,
81    // The input object this certificate is waiting for to become available in order to be
82    // executed.
83    pub waiting_input_objects: BTreeSet<InputKey>,
84    // Stores stats about this transaction.
85    pub stats: PendingCertificateStats,
86}
87
88struct CacheInner {
89    versioned_cache: LruCache<ObjectID, SequenceNumber>,
90    // we cache packages separately, because they are more expensive to look up in the db, so we
91    // don't want to evict packages in favor of mutable objects.
92    unversioned_cache: LruCache<ObjectID, ()>,
93
94    max_size: usize,
95    metrics: Arc<AuthorityMetrics>,
96}
97
98impl CacheInner {
99    fn new(max_size: usize, metrics: Arc<AuthorityMetrics>) -> Self {
100        Self {
101            versioned_cache: LruCache::unbounded(),
102            unversioned_cache: LruCache::unbounded(),
103            max_size,
104            metrics,
105        }
106    }
107}
108
109impl CacheInner {
110    fn shrink(&mut self) {
111        while self.versioned_cache.len() > self.max_size {
112            self.versioned_cache.pop_lru();
113            self.metrics
114                .transaction_manager_object_cache_evictions
115                .inc();
116        }
117        while self.unversioned_cache.len() > self.max_size {
118            self.unversioned_cache.pop_lru();
119            self.metrics
120                .transaction_manager_object_cache_evictions
121                .inc();
122        }
123        self.metrics
124            .transaction_manager_object_cache_size
125            .set(self.versioned_cache.len() as i64);
126        self.metrics
127            .transaction_manager_package_cache_size
128            .set(self.unversioned_cache.len() as i64);
129    }
130
131    fn insert(&mut self, object: &InputKey) {
132        if let Some(version) = object.version() {
133            if let Some((previous_id, previous_version)) =
134                self.versioned_cache.push(object.id(), version)
135            {
136                if previous_id == object.id() && previous_version > version {
137                    // do not allow highest known version to decrease
138                    // This should not be possible unless bugs are introduced elsewhere in this
139                    // module.
140                    self.versioned_cache.put(object.id(), previous_version);
141                } else {
142                    self.metrics
143                        .transaction_manager_object_cache_evictions
144                        .inc();
145                }
146            }
147            self.metrics
148                .transaction_manager_object_cache_size
149                .set(self.versioned_cache.len() as i64);
150        } else if let Some((previous_id, _)) = self.unversioned_cache.push(object.id(), ()) {
151            // lru_cache will does not check if the value being evicted is the same as the
152            // value being inserted, so we do need to check if the id is
153            // different before counting this as an eviction.
154            if previous_id != object.id() {
155                self.metrics
156                    .transaction_manager_package_cache_evictions
157                    .inc();
158            }
159            self.metrics
160                .transaction_manager_package_cache_size
161                .set(self.unversioned_cache.len() as i64);
162        }
163    }
164
165    // Returns Some(true/false) for a definitive result. Returns None if the caller
166    // must defer to the db.
167    fn is_object_available(&mut self, object: &InputKey) -> Option<bool> {
168        if let Some(version) = object.version() {
169            if let Some(current) = self.versioned_cache.get(&object.id()) {
170                self.metrics.transaction_manager_object_cache_hits.inc();
171                Some(*current >= version)
172            } else {
173                self.metrics.transaction_manager_object_cache_misses.inc();
174                None
175            }
176        } else {
177            self.unversioned_cache
178                .get(&object.id())
179                .tap_some(|_| self.metrics.transaction_manager_package_cache_hits.inc())
180                .tap_none(|| self.metrics.transaction_manager_package_cache_misses.inc())
181                .map(|_| true)
182        }
183    }
184}
185
186struct AvailableObjectsCache {
187    cache: CacheInner,
188    unbounded_cache_enabled: usize,
189}
190
191impl AvailableObjectsCache {
192    fn new(metrics: Arc<AuthorityMetrics>) -> Self {
193        Self::new_with_size(metrics, 100000)
194    }
195
196    fn new_with_size(metrics: Arc<AuthorityMetrics>, size: usize) -> Self {
197        Self {
198            cache: CacheInner::new(size, metrics),
199            unbounded_cache_enabled: 0,
200        }
201    }
202
203    fn enable_unbounded_cache(&mut self) {
204        self.unbounded_cache_enabled += 1;
205    }
206
207    fn disable_unbounded_cache(&mut self) {
208        assert!(self.unbounded_cache_enabled > 0);
209        self.unbounded_cache_enabled -= 1;
210    }
211
212    fn insert(&mut self, object: &InputKey) {
213        self.cache.insert(object);
214        if self.unbounded_cache_enabled == 0 {
215            self.cache.shrink();
216        }
217    }
218
219    fn is_object_available(&mut self, object: &InputKey) -> Option<bool> {
220        self.cache.is_object_available(object)
221    }
222}
223
224struct Inner {
225    // Current epoch of TransactionManager.
226    epoch: EpochId,
227
228    // Maps missing input objects to transactions in pending_certificates.
229    missing_inputs: HashMap<InputKey, BTreeSet<TransactionDigest>>,
230
231    // Stores age info for all transactions depending on each object.
232    // Used for throttling signing and submitting transactions depending on hot objects.
233    // An `IndexMap` is used to ensure that the insertion order is preserved.
234    input_objects: HashMap<ObjectID, TransactionQueue>,
235
236    // Maps object IDs to the highest observed sequence number of the object. When the value is
237    // None, indicates that the object is immutable, corresponding to an InputKey with no sequence
238    // number.
239    available_objects_cache: AvailableObjectsCache,
240
241    // A transaction enqueued to TransactionManager must be in either pending_certificates or
242    // executing_certificates.
243
244    // Maps transaction digests to their content and missing input objects.
245    pending_certificates: HashMap<TransactionDigest, PendingCertificate>,
246
247    // Transactions that have all input objects available, but have not finished execution.
248    executing_certificates: HashSet<TransactionDigest>,
249}
250
251impl Inner {
252    fn new(epoch: EpochId, metrics: Arc<AuthorityMetrics>) -> Inner {
253        Inner {
254            epoch,
255            missing_inputs: HashMap::with_capacity(MIN_HASHMAP_CAPACITY),
256            input_objects: HashMap::with_capacity(MIN_HASHMAP_CAPACITY),
257            available_objects_cache: AvailableObjectsCache::new(metrics),
258            pending_certificates: HashMap::with_capacity(MIN_HASHMAP_CAPACITY),
259            executing_certificates: HashSet::with_capacity(MIN_HASHMAP_CAPACITY),
260        }
261    }
262
263    // Checks if there is any transaction waiting on `input_key`. Returns all the
264    // pending transactions that are ready to be executed.
265    // Must ensure input_key is available in storage before calling this function.
266    fn find_ready_transactions(
267        &mut self,
268        input_key: InputKey,
269        update_cache: bool,
270        metrics: &Arc<AuthorityMetrics>,
271    ) -> Vec<PendingCertificate> {
272        if update_cache {
273            self.available_objects_cache.insert(&input_key);
274        }
275
276        let mut ready_certificates = Vec::new();
277
278        let Some(digests) = self.missing_inputs.remove(&input_key) else {
279            // No transaction is waiting on the object yet.
280            return ready_certificates;
281        };
282
283        let input_txns = self
284            .input_objects
285            .get_mut(&input_key.id())
286            .unwrap_or_else(|| {
287                panic!(
288                    "# of transactions waiting on object {:?} cannot be 0",
289                    input_key.id()
290                )
291            });
292        for digest in digests.iter() {
293            let age_opt = input_txns.remove(digest).expect("digest must be in map");
294            metrics
295                .transaction_manager_transaction_queue_age_s
296                .observe(age_opt.elapsed().as_secs_f64());
297        }
298
299        if input_txns.is_empty() {
300            self.input_objects.remove(&input_key.id());
301        }
302
303        for digest in digests {
304            // Pending certificate must exist.
305            let pending_cert = self.pending_certificates.get_mut(&digest).unwrap();
306            assert!(pending_cert.waiting_input_objects.remove(&input_key));
307            // When a certificate has all its input objects, it is ready to execute.
308            if pending_cert.waiting_input_objects.is_empty() {
309                let pending_cert = self.pending_certificates.remove(&digest).unwrap();
310                ready_certificates.push(pending_cert);
311            } else {
312                // TODO: we should start logging this at a higher level after some period of
313                // time has elapsed.
314                trace!(tx_digest = ?digest,missing = ?pending_cert.waiting_input_objects, "Certificate waiting on missing inputs");
315            }
316        }
317
318        ready_certificates
319    }
320
321    fn maybe_reserve_capacity(&mut self) {
322        self.missing_inputs.maybe_reserve_capacity();
323        self.input_objects.maybe_reserve_capacity();
324        self.pending_certificates.maybe_reserve_capacity();
325        self.executing_certificates.maybe_reserve_capacity();
326    }
327
328    /// After reaching 1/4 load in hashmaps, decrease capacity to increase load
329    /// to 1/2.
330    fn maybe_shrink_capacity(&mut self) {
331        self.missing_inputs.maybe_shrink_capacity();
332        self.input_objects.maybe_shrink_capacity();
333        self.pending_certificates.maybe_shrink_capacity();
334        self.executing_certificates.maybe_shrink_capacity();
335    }
336}
337
338impl TransactionManager {
339    /// If a node restarts, transaction manager recovers in-memory data from
340    /// pending_certificates, which contains certified transactions from
341    /// consensus output and RPC that are not executed. Transactions from
342    /// other sources, e.g. checkpoint executor, have own persistent storage to
343    /// retry transactions.
344    pub(crate) fn new(
345        object_cache_read: Arc<dyn ObjectCacheRead>,
346        transaction_cache_read: Arc<dyn TransactionCacheRead>,
347        epoch_store: &AuthorityPerEpochStore,
348        tx_ready_certificates: UnboundedSender<PendingCertificate>,
349        metrics: Arc<AuthorityMetrics>,
350    ) -> TransactionManager {
351        let transaction_manager = TransactionManager {
352            object_cache_read,
353            transaction_cache_read,
354            metrics: metrics.clone(),
355            inner: RwLock::new(RwLock::new(Inner::new(epoch_store.epoch(), metrics))),
356            tx_ready_certificates,
357        };
358        transaction_manager.enqueue(epoch_store.all_pending_execution().unwrap(), epoch_store);
359        transaction_manager
360    }
361
362    /// Enqueues certificates / verified transactions into TransactionManager.
363    /// Once all of the input objects are available locally for a
364    /// certificate, the certified transaction will be sent to execution driver.
365    ///
366    /// REQUIRED: Shared object locks must be taken before calling enqueueing
367    /// transactions with shared objects!
368    #[instrument(level = "trace", skip_all)]
369    pub(crate) fn enqueue_certificates(
370        &self,
371        certs: Vec<VerifiedCertificate>,
372        epoch_store: &AuthorityPerEpochStore,
373    ) {
374        let executable_txns = certs
375            .into_iter()
376            .map(VerifiedExecutableTransaction::new_from_certificate)
377            .collect();
378        self.enqueue(executable_txns, epoch_store)
379    }
380
381    #[instrument(level = "trace", skip_all)]
382    pub(crate) fn enqueue(
383        &self,
384        certs: Vec<VerifiedExecutableTransaction>,
385        epoch_store: &AuthorityPerEpochStore,
386    ) {
387        let certs = certs.into_iter().map(|cert| (cert, None)).collect();
388        self.enqueue_impl(certs, epoch_store)
389    }
390
391    #[instrument(level = "trace", skip_all)]
392    pub(crate) fn enqueue_with_expected_effects_digest(
393        &self,
394        certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
395        epoch_store: &AuthorityPerEpochStore,
396    ) {
397        let certs = certs
398            .into_iter()
399            .map(|(cert, fx)| (cert, Some(fx)))
400            .collect();
401        self.enqueue_impl(certs, epoch_store)
402    }
403
404    fn enqueue_impl(
405        &self,
406        certs: Vec<(
407            VerifiedExecutableTransaction,
408            Option<TransactionEffectsDigest>,
409        )>,
410        epoch_store: &AuthorityPerEpochStore,
411    ) {
412        let reconfig_lock = self.inner.read();
413
414        // filter out already executed certs
415        let certs: Vec<_> = certs
416            .into_iter()
417            .filter(|(cert, _)| {
418                let digest = *cert.digest();
419                // skip already executed txes
420                if self
421                    .transaction_cache_read
422                    .is_tx_already_executed(&digest)
423                    .unwrap_or_else(|err| {
424                        fatal!("Failed to check if tx is already executed: {:?}", err)
425                    })
426                {
427                    self.metrics
428                        .transaction_manager_num_enqueued_certificates
429                        .with_label_values(&["already_executed"])
430                        .inc();
431                    false
432                } else {
433                    true
434                }
435            })
436            .collect();
437
438        let mut object_availability: HashMap<InputKey, Option<bool>> = HashMap::new();
439        let mut receiving_objects: HashSet<InputKey> = HashSet::new();
440        let certs: Vec<_> = certs
441            .into_iter()
442            .filter_map(|(cert, fx_digest)| {
443                let input_object_kinds = cert
444                    .data()
445                    .intent_message()
446                    .value
447                    .input_objects()
448                    .expect("input_objects() cannot fail");
449                let mut input_object_keys =
450                    match epoch_store.get_input_object_keys(&cert.key(), &input_object_kinds) {
451                        Ok(keys) => keys,
452                        Err(e) => {
453                            // Because we do not hold the transaction lock during enqueue, it is
454                            // possible that the transaction was executed and the shared version
455                            // assignments deleted since the earlier check. This is a rare race
456                            // condition, and it is better to handle it ad-hoc here than to hold tx
457                            // locks for every cert for the duration of this function in order to
458                            // remove the race.
459                            if self
460                                .transaction_cache_read
461                                .is_tx_already_executed(cert.digest())
462                                .expect("is_tx_already_executed cannot fail")
463                            {
464                                return None;
465                            }
466                            fatal!("Failed to get input object keys: {:?}", e);
467                        }
468                    };
469
470                if input_object_kinds.len() != input_object_keys.len() {
471                    error!("Duplicated input objects: {:?}", input_object_kinds);
472                }
473
474                let receiving_object_entries =
475                    cert.data().intent_message().value.receiving_objects();
476                for entry in receiving_object_entries {
477                    let key = InputKey::VersionedObject {
478                        id: entry.0,
479                        version: entry.1,
480                    };
481                    receiving_objects.insert(key);
482                    input_object_keys.insert(key);
483                }
484
485                for key in input_object_keys.iter() {
486                    if key.is_cancelled() {
487                        // Cancelled txn objects should always be available immediately.
488                        // Don't need to wait on these objects for execution.
489                        object_availability.insert(*key, Some(true));
490                    } else {
491                        object_availability.insert(*key, None);
492                    }
493                }
494
495                Some((cert, fx_digest, input_object_keys))
496            })
497            .collect();
498
499        {
500            let mut inner = reconfig_lock.write();
501            for (key, value) in object_availability.iter_mut() {
502                if value.is_some_and(|available| available) {
503                    continue;
504                }
505                if let Some(available) = inner.available_objects_cache.is_object_available(key) {
506                    *value = Some(available);
507                }
508            }
509            // make sure we don't miss any cache entries while the lock is not held.
510            inner.available_objects_cache.enable_unbounded_cache();
511        }
512
513        let input_object_cache_misses = object_availability
514            .iter()
515            .filter_map(|(key, value)| if value.is_none() { Some(*key) } else { None })
516            .collect::<Vec<_>>();
517
518        // Checking object availability without holding TM lock to reduce contention.
519        // But input objects can become available before TM lock is acquired.
520        // So missing objects' availability are checked again after acquiring TM lock.
521        let cache_miss_availability = self
522            .object_cache_read
523            .multi_input_objects_available(
524                &input_object_cache_misses,
525                receiving_objects,
526                epoch_store.epoch(),
527            )
528            .unwrap_or_else(|err| panic!("Checking object existence cannot fail: {:?}", err))
529            .into_iter()
530            .zip(input_object_cache_misses);
531
532        // After this point, the function cannot return early and must run to the end.
533        // Otherwise, it can lead to data inconsistencies and potentially some
534        // transactions will never get executed.
535
536        // Internal lock is held only for updating the internal state.
537        let mut inner = reconfig_lock.write();
538
539        let _scope = monitored_scope("TransactionManager::enqueue::wlock");
540
541        for (available, key) in cache_miss_availability {
542            if available && key.version().is_none() {
543                // Mutable objects obtained from cache_miss_availability usually will not be
544                // read again, so we do not want to evict other objects in order
545                // to insert them into the cache. However, packages will likely
546                // be read often, so we do want to insert them even if they
547                // cause evictions.
548                inner.available_objects_cache.insert(&key);
549            }
550            object_availability
551                .insert(key, Some(available))
552                .expect("entry must already exist");
553        }
554
555        // Now recheck the cache for anything that became available (via notify_commit)
556        // since we read cache_miss_availability - because the cache is
557        // unbounded mode it is guaranteed to contain all notifications that
558        // arrived since we released the lock on self.inner.
559        for (key, value) in object_availability.iter_mut() {
560            if !value.expect("all objects must have been checked by now") {
561                if let Some(true) = inner.available_objects_cache.is_object_available(key) {
562                    *value = Some(true);
563                }
564            }
565        }
566
567        inner.available_objects_cache.disable_unbounded_cache();
568
569        let mut pending = Vec::new();
570        let pending_cert_enqueue_time = Instant::now();
571
572        for (cert, expected_effects_digest, input_object_keys) in certs {
573            pending.push(PendingCertificate {
574                certificate: cert,
575                expected_effects_digest,
576                waiting_input_objects: input_object_keys,
577                stats: PendingCertificateStats {
578                    #[cfg(test)]
579                    enqueue_time: pending_cert_enqueue_time,
580                    ready_time: None,
581                },
582            });
583        }
584
585        for mut pending_cert in pending {
586            // Tx lock is not held here, which makes it possible to send duplicated
587            // transactions to the execution driver after crash-recovery, when
588            // the same transaction is recovered from recovery log and pending
589            // certificates table. The transaction will still only execute once,
590            // because tx lock is acquired in execution driver and executed effects
591            // table is consulted. So this behavior is benigh.
592            let digest = *pending_cert.certificate.digest();
593
594            if inner.epoch != pending_cert.certificate.epoch() {
595                warn!(
596                    "Ignoring enqueued certificate from wrong epoch. Expected={} Certificate={:?}",
597                    inner.epoch, pending_cert.certificate
598                );
599                continue;
600            }
601
602            // skip already pending txes
603            if inner.pending_certificates.contains_key(&digest) {
604                self.metrics
605                    .transaction_manager_num_enqueued_certificates
606                    .with_label_values(&["already_pending"])
607                    .inc();
608                continue;
609            }
610            // skip already executing txes
611            if inner.executing_certificates.contains(&digest) {
612                self.metrics
613                    .transaction_manager_num_enqueued_certificates
614                    .with_label_values(&["already_executing"])
615                    .inc();
616                continue;
617            }
618            // skip already executed txes
619            let is_tx_already_executed = self
620                .transaction_cache_read
621                .is_tx_already_executed(&digest)
622                .expect("Check if tx is already executed should not fail");
623            if is_tx_already_executed {
624                self.metrics
625                    .transaction_manager_num_enqueued_certificates
626                    .with_label_values(&["already_executed"])
627                    .inc();
628                continue;
629            }
630
631            let mut waiting_input_objects = BTreeSet::new();
632            std::mem::swap(
633                &mut waiting_input_objects,
634                &mut pending_cert.waiting_input_objects,
635            );
636            for key in waiting_input_objects {
637                if !object_availability[&key].unwrap() {
638                    // The input object is not yet available.
639                    pending_cert.waiting_input_objects.insert(key);
640
641                    assert!(
642                        inner.missing_inputs.entry(key).or_default().insert(digest),
643                        "Duplicated certificate {:?} for missing object {:?}",
644                        digest,
645                        key
646                    );
647                    let input_txns = inner.input_objects.entry(key.id()).or_default();
648                    input_txns.insert(digest, pending_cert_enqueue_time);
649                }
650            }
651
652            // Ready transactions can start to execute.
653            if pending_cert.waiting_input_objects.is_empty() {
654                self.metrics
655                    .transaction_manager_num_enqueued_certificates
656                    .with_label_values(&["ready"])
657                    .inc();
658                pending_cert.stats.ready_time = Some(Instant::now());
659                // Send to execution driver for execution.
660                self.certificate_ready(&mut inner, pending_cert);
661                continue;
662            }
663
664            assert!(
665                inner
666                    .pending_certificates
667                    .insert(digest, pending_cert)
668                    .is_none(),
669                "Duplicated pending certificate {:?}",
670                digest
671            );
672
673            self.metrics
674                .transaction_manager_num_enqueued_certificates
675                .with_label_values(&["pending"])
676                .inc();
677        }
678
679        self.metrics
680            .transaction_manager_num_missing_objects
681            .set(inner.missing_inputs.len() as i64);
682        self.metrics
683            .transaction_manager_num_pending_certificates
684            .set(inner.pending_certificates.len() as i64);
685
686        inner.maybe_reserve_capacity();
687    }
688
689    #[cfg(test)]
690    pub(crate) fn objects_available(
691        &self,
692        input_keys: Vec<InputKey>,
693        epoch_store: &AuthorityPerEpochStore,
694    ) {
695        let reconfig_lock = self.inner.read();
696        let mut inner = reconfig_lock.write();
697        let _scope = monitored_scope("TransactionManager::objects_available::wlock");
698        self.objects_available_locked(&mut inner, epoch_store, input_keys, true, Instant::now());
699        inner.maybe_shrink_capacity();
700    }
701
702    #[instrument(level = "trace", skip_all)]
703    fn objects_available_locked(
704        &self,
705        inner: &mut Inner,
706        epoch_store: &AuthorityPerEpochStore,
707        input_keys: Vec<InputKey>,
708        update_cache: bool,
709        available_time: Instant,
710    ) {
711        if inner.epoch != epoch_store.epoch() {
712            warn!(
713                "Ignoring objects committed from wrong epoch. Expected={} Actual={} \
714                 Objects={:?}",
715                inner.epoch,
716                epoch_store.epoch(),
717                input_keys,
718            );
719            return;
720        }
721
722        for input_key in input_keys {
723            trace!(?input_key, "object available");
724            for mut ready_cert in
725                inner.find_ready_transactions(input_key, update_cache, &self.metrics)
726            {
727                ready_cert.stats.ready_time = Some(available_time);
728                self.certificate_ready(inner, ready_cert);
729            }
730        }
731
732        self.metrics
733            .transaction_manager_num_missing_objects
734            .set(inner.missing_inputs.len() as i64);
735        self.metrics
736            .transaction_manager_num_pending_certificates
737            .set(inner.pending_certificates.len() as i64);
738        self.metrics
739            .transaction_manager_num_executing_certificates
740            .set(inner.executing_certificates.len() as i64);
741    }
742
743    /// Notifies TransactionManager about a transaction that has been committed.
744    #[instrument(level = "trace", skip_all)]
745    pub(crate) fn notify_commit(
746        &self,
747        digest: &TransactionDigest,
748        output_object_keys: Vec<InputKey>,
749        epoch_store: &AuthorityPerEpochStore,
750    ) {
751        let reconfig_lock = self.inner.read();
752        {
753            let commit_time = Instant::now();
754            let mut inner = reconfig_lock.write();
755            let _scope = monitored_scope("TransactionManager::notify_commit::wlock");
756
757            if inner.epoch != epoch_store.epoch() {
758                warn!(
759                    "Ignoring committed certificate from wrong epoch. Expected={} Actual={} CertificateDigest={:?}",
760                    inner.epoch,
761                    epoch_store.epoch(),
762                    digest
763                );
764                return;
765            }
766
767            self.objects_available_locked(
768                &mut inner,
769                epoch_store,
770                output_object_keys,
771                true,
772                commit_time,
773            );
774
775            if !inner.executing_certificates.remove(digest) {
776                trace!(
777                    "{:?} not found in executing certificates, likely because it is a system transaction",
778                    digest
779                );
780                return;
781            }
782
783            self.metrics
784                .transaction_manager_num_executing_certificates
785                .set(inner.executing_certificates.len() as i64);
786
787            inner.maybe_shrink_capacity();
788        }
789    }
790
791    /// Sends the ready certificate for execution.
792    fn certificate_ready(&self, inner: &mut Inner, pending_certificate: PendingCertificate) {
793        trace!(tx_digest = ?pending_certificate.certificate.digest(), "certificate ready");
794        assert_eq!(pending_certificate.waiting_input_objects.len(), 0);
795        // Record as an executing certificate.
796        assert!(
797            inner
798                .executing_certificates
799                .insert(*pending_certificate.certificate.digest())
800        );
801        self.metrics.txn_ready_rate_tracker.lock().record();
802        let _ = self.tx_ready_certificates.send(pending_certificate);
803        self.metrics.transaction_manager_num_ready.inc();
804        self.metrics.execution_driver_dispatch_queue.inc();
805    }
806
807    /// Gets the missing input object keys for the given transaction.
808    pub(crate) fn get_missing_input(&self, digest: &TransactionDigest) -> Option<Vec<InputKey>> {
809        let reconfig_lock = self.inner.read();
810        let inner = reconfig_lock.read();
811        inner
812            .pending_certificates
813            .get(digest)
814            .map(|cert| cert.waiting_input_objects.clone().into_iter().collect())
815    }
816
817    // Returns the number of transactions waiting on each object ID, as well as the
818    // age of the oldest transaction in the queue.
819    pub(crate) fn objects_queue_len_and_age(
820        &self,
821        keys: Vec<ObjectID>,
822    ) -> Vec<(ObjectID, usize, Option<Duration>)> {
823        let reconfig_lock = self.inner.read();
824        let inner = reconfig_lock.read();
825        keys.into_iter()
826            .map(|key| {
827                let default_map = TransactionQueue::default();
828                let txns = inner.input_objects.get(&key).unwrap_or(&default_map);
829                (
830                    key,
831                    txns.len(),
832                    txns.first().map(|(time, _)| time.elapsed()),
833                )
834            })
835            .collect()
836    }
837
838    // Returns the number of transactions pending or being executed right now.
839    pub(crate) fn inflight_queue_len(&self) -> usize {
840        let reconfig_lock = self.inner.read();
841        let inner = reconfig_lock.read();
842        inner.pending_certificates.len() + inner.executing_certificates.len()
843    }
844
845    // Reconfigures the TransactionManager for a new epoch. Existing transactions
846    // will be dropped because they are no longer relevant and may be incorrect
847    // in the new epoch.
848    pub(crate) fn reconfigure(&self, new_epoch: EpochId) {
849        let reconfig_lock = self.inner.write();
850        let mut inner = reconfig_lock.write();
851        *inner = Inner::new(new_epoch, self.metrics.clone());
852    }
853
854    pub(crate) fn check_execution_overload(
855        &self,
856        overload_config: &AuthorityOverloadConfig,
857        tx_data: &SenderSignedData,
858    ) -> IotaResult {
859        // Too many transactions are pending execution.
860        let inflight_queue_len = self.inflight_queue_len();
861        fp_ensure!(
862            inflight_queue_len < overload_config.max_transaction_manager_queue_length,
863            IotaError::TooManyTransactionsPendingExecution {
864                queue_len: inflight_queue_len,
865                threshold: overload_config.max_transaction_manager_queue_length,
866            }
867        );
868        tx_data.digest();
869
870        for (object_id, queue_len, txn_age) in self.objects_queue_len_and_age(
871            tx_data
872                .transaction_data()
873                .shared_input_objects()
874                .into_iter()
875                .filter_map(|r| r.mutable.then_some(r.id))
876                .collect(),
877        ) {
878            // When this occurs, most likely transactions piled up on a shared object.
879            if queue_len >= overload_config.max_transaction_manager_per_object_queue_length {
880                info!(
881                    "Overload detected on object {:?} with {} pending transactions",
882                    object_id, queue_len
883                );
884                fp_bail!(IotaError::TooManyTransactionsPendingOnObject {
885                    object_id,
886                    queue_len,
887                    threshold: overload_config.max_transaction_manager_per_object_queue_length,
888                });
889            }
890            if let Some(age) = txn_age {
891                // Check that we don't have a txn that has been waiting for a long time in the
892                // queue.
893                if age >= overload_config.max_txn_age_in_queue {
894                    info!(
895                        "Overload detected on object {:?} with oldest transaction pending for {}ms",
896                        object_id,
897                        age.as_millis()
898                    );
899                    fp_bail!(IotaError::TooOldTransactionPendingOnObject {
900                        object_id,
901                        txn_age_sec: age.as_secs(),
902                        threshold: overload_config.max_txn_age_in_queue.as_secs(),
903                    });
904                }
905            }
906        }
907        Ok(())
908    }
909
910    // Verify TM has no pending item for tests.
911    #[cfg(test)]
912    fn check_empty_for_testing(&self) {
913        let reconfig_lock = self.inner.read();
914        let inner = reconfig_lock.read();
915        assert!(
916            inner.missing_inputs.is_empty(),
917            "Missing inputs: {:?}",
918            inner.missing_inputs
919        );
920        assert!(
921            inner.input_objects.is_empty(),
922            "Input objects: {:?}",
923            inner.input_objects
924        );
925        assert!(
926            inner.pending_certificates.is_empty(),
927            "Pending certificates: {:?}",
928            inner.pending_certificates
929        );
930        assert!(
931            inner.executing_certificates.is_empty(),
932            "Executing certificates: {:?}",
933            inner.executing_certificates
934        );
935    }
936}
937
938trait ResizableHashMap<K, V> {
939    fn maybe_reserve_capacity(&mut self);
940    fn maybe_shrink_capacity(&mut self);
941}
942
943impl<K, V> ResizableHashMap<K, V> for HashMap<K, V>
944where
945    K: std::cmp::Eq + std::hash::Hash,
946{
947    /// After reaching 3/4 load in hashmaps, increase capacity to decrease load
948    /// to 1/2.
949    fn maybe_reserve_capacity(&mut self) {
950        if self.len() > self.capacity() * 3 / 4 {
951            self.reserve(self.capacity() / 2);
952        }
953    }
954
955    /// After reaching 1/4 load in hashmaps, decrease capacity to increase load
956    /// to 1/2.
957    fn maybe_shrink_capacity(&mut self) {
958        if self.len() > MIN_HASHMAP_CAPACITY && self.len() < self.capacity() / 4 {
959            self.shrink_to(max(self.capacity() / 2, MIN_HASHMAP_CAPACITY))
960        }
961    }
962}
963
964trait ResizableHashSet<K> {
965    fn maybe_reserve_capacity(&mut self);
966    fn maybe_shrink_capacity(&mut self);
967}
968
969impl<K> ResizableHashSet<K> for HashSet<K>
970where
971    K: std::cmp::Eq + std::hash::Hash,
972{
973    /// After reaching 3/4 load in hashset, increase capacity to decrease load
974    /// to 1/2.
975    fn maybe_reserve_capacity(&mut self) {
976        if self.len() > self.capacity() * 3 / 4 {
977            self.reserve(self.capacity() / 2);
978        }
979    }
980
981    /// After reaching 1/4 load in hashset, decrease capacity to increase load
982    /// to 1/2.
983    fn maybe_shrink_capacity(&mut self) {
984        if self.len() > MIN_HASHMAP_CAPACITY && self.len() < self.capacity() / 4 {
985            self.shrink_to(max(self.capacity() / 2, MIN_HASHMAP_CAPACITY))
986        }
987    }
988}
989
990#[derive(Default, Debug)]
991struct TransactionQueue {
992    digests: HashMap<TransactionDigest, Instant>,
993    ages: BinaryHeap<(Reverse<Instant>, TransactionDigest)>,
994}
995
996impl TransactionQueue {
997    fn len(&self) -> usize {
998        self.digests.len()
999    }
1000
1001    fn is_empty(&self) -> bool {
1002        self.digests.is_empty()
1003    }
1004
1005    /// Insert the digest into the queue with the given time. If the digest is
1006    /// already in the queue, this is a no-op.
1007    fn insert(&mut self, digest: TransactionDigest, time: Instant) {
1008        if let hash_map::Entry::Vacant(entry) = self.digests.entry(digest) {
1009            entry.insert(time);
1010            self.ages.push((Reverse(time), digest));
1011        }
1012    }
1013
1014    /// Remove the digest from the queue. Returns the time the digest was
1015    /// inserted into the queue, if it was present.
1016    ///
1017    /// After removing the digest, first() will return the new oldest entry
1018    /// in the queue (which may be unchanged).
1019    fn remove(&mut self, digest: &TransactionDigest) -> Option<Instant> {
1020        let when = self.digests.remove(digest)?;
1021
1022        // This loop removes all previously inserted entries that no longer
1023        // correspond to live entries in self.digests. When the loop terminates,
1024        // the top of the heap will be the oldest live entry.
1025        // Amortized complexity of `remove` is O(lg(n)).
1026        while !self.ages.is_empty() {
1027            let first = self.ages.peek().expect("heap cannot be empty");
1028
1029            // We compare the exact time of the entry, because there may be an
1030            // entry in the heap that was previously inserted and removed from
1031            // digests, and we want to ignore it. (see
1032            // test_transaction_queue_remove_in_order)
1033            if self.digests.get(&first.1) == Some(&first.0.0) {
1034                break;
1035            }
1036
1037            self.ages.pop();
1038        }
1039
1040        Some(when)
1041    }
1042
1043    /// Return the oldest entry in the queue.
1044    fn first(&self) -> Option<(Instant, TransactionDigest)> {
1045        self.ages.peek().map(|(time, digest)| (time.0, *digest))
1046    }
1047}
1048
1049#[cfg(test)]
1050mod test {
1051    use prometheus::Registry;
1052    use rand::{Rng, RngCore};
1053
1054    use super::*;
1055
1056    #[test]
1057    #[cfg_attr(msim, ignore)]
1058    fn test_available_objects_cache() {
1059        let metrics = Arc::new(AuthorityMetrics::new(&Registry::default()));
1060        let mut cache = AvailableObjectsCache::new_with_size(metrics, 5);
1061
1062        // insert 10 unique unversioned objects
1063        for i in 0..10 {
1064            let object = ObjectID::new([i; 32]);
1065            let input_key = InputKey::Package { id: object };
1066            assert_eq!(cache.is_object_available(&input_key), None);
1067            cache.insert(&input_key);
1068            assert_eq!(cache.is_object_available(&input_key), Some(true));
1069        }
1070
1071        // first 5 have been evicted
1072        for i in 0..5 {
1073            let object = ObjectID::new([i; 32]);
1074            let input_key = InputKey::Package { id: object };
1075            assert_eq!(cache.is_object_available(&input_key), None);
1076        }
1077
1078        // insert 10 unique versioned objects
1079        for i in 0..10 {
1080            let object = ObjectID::new([i; 32]);
1081            let input_key = InputKey::VersionedObject {
1082                id: object,
1083                version: (i as u64).into(),
1084            };
1085            assert_eq!(cache.is_object_available(&input_key), None);
1086            cache.insert(&input_key);
1087            assert_eq!(cache.is_object_available(&input_key), Some(true));
1088        }
1089
1090        // first 5 versioned objects have been evicted
1091        for i in 0..5 {
1092            let object = ObjectID::new([i; 32]);
1093            let input_key = InputKey::VersionedObject {
1094                id: object,
1095                version: (i as u64).into(),
1096            };
1097            assert_eq!(cache.is_object_available(&input_key), None);
1098        }
1099
1100        // but versioned objects do not cause evictions of unversioned objects
1101        for i in 5..10 {
1102            let object = ObjectID::new([i; 32]);
1103            let input_key = InputKey::Package { id: object };
1104            assert_eq!(cache.is_object_available(&input_key), Some(true));
1105        }
1106
1107        // object 9 is available at version 9
1108        let object = ObjectID::new([9; 32]);
1109        let input_key = InputKey::VersionedObject {
1110            id: object,
1111            version: 9.into(),
1112        };
1113        assert_eq!(cache.is_object_available(&input_key), Some(true));
1114        // but not at version 10
1115        let input_key = InputKey::VersionedObject {
1116            id: object,
1117            version: 10.into(),
1118        };
1119        assert_eq!(cache.is_object_available(&input_key), Some(false));
1120        // it is available at version 8 (this case can be used by readonly shared
1121        // objects)
1122        let input_key = InputKey::VersionedObject {
1123            id: object,
1124            version: 8.into(),
1125        };
1126        assert_eq!(cache.is_object_available(&input_key), Some(true));
1127    }
1128
1129    #[test]
1130    #[cfg_attr(msim, ignore)]
1131    fn test_transaction_queue() {
1132        let mut queue = TransactionQueue::default();
1133
1134        // insert and remove an item
1135        let time = Instant::now();
1136        let digest = TransactionDigest::new([1; 32]);
1137        queue.insert(digest, time);
1138        assert_eq!(queue.first(), Some((time, digest)));
1139        queue.remove(&digest);
1140        assert_eq!(queue.first(), None);
1141
1142        // remove a non-existent item
1143        assert_eq!(queue.remove(&digest), None);
1144    }
1145
1146    #[test]
1147    #[cfg_attr(msim, ignore)]
1148    fn test_transaction_queue_remove_in_order() {
1149        // insert two items, remove them in insertion order
1150        let time1 = Instant::now();
1151        let digest1 = TransactionDigest::new([1; 32]);
1152        let time2 = time1 + Duration::from_secs(1);
1153        let digest2 = TransactionDigest::new([2; 32]);
1154
1155        let mut queue = TransactionQueue::default();
1156        queue.insert(digest1, time1);
1157        queue.insert(digest2, time2);
1158
1159        assert_eq!(queue.first(), Some((time1, digest1)));
1160        assert_eq!(queue.remove(&digest1), Some(time1));
1161        assert_eq!(queue.first(), Some((time2, digest2)));
1162        assert_eq!(queue.remove(&digest2), Some(time2));
1163        assert_eq!(queue.first(), None);
1164    }
1165
1166    #[test]
1167    #[cfg_attr(msim, ignore)]
1168    fn test_transaction_queue_remove_in_reverse_order() {
1169        // insert two items, remove them in reverse order
1170        let time1 = Instant::now();
1171        let digest1 = TransactionDigest::new([1; 32]);
1172        let time2 = time1 + Duration::from_secs(1);
1173        let digest2 = TransactionDigest::new([2; 32]);
1174
1175        let mut queue = TransactionQueue::default();
1176        queue.insert(digest1, time1);
1177        queue.insert(digest2, time2);
1178
1179        assert_eq!(queue.first(), Some((time1, digest1)));
1180        assert_eq!(queue.remove(&digest2), Some(time2));
1181
1182        // after removing digest2, digest1 is still the first item
1183        assert_eq!(queue.first(), Some((time1, digest1)));
1184        assert_eq!(queue.remove(&digest1), Some(time1));
1185
1186        assert_eq!(queue.first(), None);
1187    }
1188
1189    #[test]
1190    #[cfg_attr(msim, ignore)]
1191    fn test_transaction_queue_reinsert() {
1192        // insert two items
1193        let time1 = Instant::now();
1194        let digest1 = TransactionDigest::new([1; 32]);
1195        let time2 = time1 + Duration::from_secs(1);
1196        let digest2 = TransactionDigest::new([2; 32]);
1197
1198        let mut queue = TransactionQueue::default();
1199        queue.insert(digest1, time1);
1200        queue.insert(digest2, time2);
1201
1202        // remove the second item
1203        queue.remove(&digest2);
1204        assert_eq!(queue.first(), Some((time1, digest1)));
1205
1206        // insert the second item again
1207        let time3 = time2 + Duration::from_secs(1);
1208        queue.insert(digest2, time3);
1209
1210        // remove the first item
1211        queue.remove(&digest1);
1212
1213        // time3 should be in first()
1214        assert_eq!(queue.first(), Some((time3, digest2)));
1215    }
1216
1217    #[test]
1218    #[cfg_attr(msim, ignore)]
1219    fn test_transaction_queue_double_insert() {
1220        let time1 = Instant::now();
1221        let digest1 = TransactionDigest::new([1; 32]);
1222        let time2 = time1 + Duration::from_secs(1);
1223        let digest2 = TransactionDigest::new([2; 32]);
1224        let time3 = time2 + Duration::from_secs(1);
1225
1226        let mut queue = TransactionQueue::default();
1227        queue.insert(digest1, time1);
1228        queue.insert(digest2, time2);
1229        queue.insert(digest2, time3);
1230
1231        // re-insertion of digest2 should not change its time
1232        assert_eq!(queue.first(), Some((time1, digest1)));
1233        queue.remove(&digest1);
1234        assert_eq!(queue.first(), Some((time2, digest2)));
1235    }
1236
1237    #[test]
1238    #[cfg_attr(msim, ignore)]
1239    fn transaction_queue_random_test() {
1240        let mut rng = rand::thread_rng();
1241        let mut digests = Vec::new();
1242        for _ in 0..100 {
1243            let mut digest = [0; 32];
1244            rng.fill_bytes(&mut digest);
1245            digests.push(TransactionDigest::new(digest));
1246        }
1247
1248        let mut verifier = HashMap::new();
1249        let mut queue = TransactionQueue::default();
1250
1251        let mut now = Instant::now();
1252
1253        // first insert some random digests so that the queue starts
1254        // out well-populated
1255        for _ in 0..70 {
1256            now += Duration::from_secs(1);
1257            let digest = digests[rng.gen_range(0..digests.len())];
1258            let time = now;
1259            queue.insert(digest, time);
1260            verifier.entry(digest).or_insert(time);
1261        }
1262
1263        // Do random operations on both the queue and the verifier, and
1264        // verify that the two structures always agree
1265        for _ in 0..100000 {
1266            // advance time
1267            now += Duration::from_secs(1);
1268
1269            // pick a random digest
1270            let digest = digests[rng.gen_range(0..digests.len())];
1271
1272            // either insert or remove it
1273            if rng.gen_bool(0.5) {
1274                let time = now;
1275                queue.insert(digest, time);
1276                verifier.entry(digest).or_insert(time);
1277            } else {
1278                let time = verifier.remove(&digest);
1279                assert_eq!(queue.remove(&digest), time);
1280            }
1281
1282            assert_eq!(
1283                queue.first(),
1284                verifier
1285                    .iter()
1286                    .min_by_key(|(_, time)| **time)
1287                    .map(|(digest, time)| (*time, *digest))
1288            );
1289        }
1290    }
1291}