iota_core/
execution_driver.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, Weak};
6
7use iota_common::fatal;
8use iota_macros::fail_point_async;
9use iota_metrics::{monitored_scope, spawn_monitored_task};
10use iota_types::error::IotaError;
11use rand::{
12    Rng, SeedableRng,
13    rngs::{OsRng, StdRng},
14};
15use tokio::sync::{Semaphore, mpsc::UnboundedReceiver, oneshot};
16use tracing::{Instrument, error_span, info, trace, warn};
17
18use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
19
20#[cfg(test)]
21#[path = "unit_tests/execution_driver_tests.rs"]
22mod execution_driver_tests;
23
24const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
25
26/// When a notification that a new pending transaction is received we activate
27/// processing the transaction in a loop.
28pub async fn execution_process(
29    authority_state: Weak<AuthorityState>,
30    mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
31    mut rx_execution_shutdown: oneshot::Receiver<()>,
32) {
33    info!("Starting pending certificates execution process.");
34
35    // Rate limit concurrent executions to # of cpus.
36    let limit = Arc::new(Semaphore::new(num_cpus::get()));
37    let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
38
39    // Loop whenever there is a signal that a new transactions is ready to process.
40    loop {
41        let _scope = monitored_scope("ExecutionDriver::loop");
42
43        let certificate;
44        let expected_effects_digest;
45        let txn_ready_time;
46        tokio::select! {
47            result = rx_ready_certificates.recv() => {
48                if let Some(pending_cert) = result {
49                    certificate = pending_cert.certificate;
50                    expected_effects_digest = pending_cert.expected_effects_digest;
51                    txn_ready_time = pending_cert.stats.ready_time.unwrap();
52                } else {
53                    // Should only happen after the AuthorityState has shut down and tx_ready_certificate
54                    // has been dropped by TransactionManager.
55                    info!("No more certificate will be received. Exiting executor ...");
56                    return;
57                };
58            }
59            _ = &mut rx_execution_shutdown => {
60                info!("Shutdown signal received. Exiting executor ...");
61                return;
62            }
63        };
64
65        let authority = if let Some(authority) = authority_state.upgrade() {
66            authority
67        } else {
68            // Terminate the execution if authority has already shutdown, even if there can
69            // be more items in rx_ready_certificates.
70            info!("Authority state has shutdown. Exiting ...");
71            return;
72        };
73        authority.metrics.execution_driver_dispatch_queue.dec();
74
75        // TODO: Ideally execution_driver should own a copy of epoch store and recreate
76        // each epoch.
77        let epoch_store = authority.load_epoch_store_one_call_per_task();
78
79        let digest = *certificate.digest();
80        trace!(?digest, "Pending certificate execution activated.");
81
82        if epoch_store.epoch() != certificate.epoch() {
83            info!(
84                ?digest,
85                cur_epoch = epoch_store.epoch(),
86                cert_epoch = certificate.epoch(),
87                "Ignoring certificate from previous epoch."
88            );
89            continue;
90        }
91
92        let limit = limit.clone();
93        // hold semaphore permit until task completes. unwrap ok because we never close
94        // the semaphore in this context.
95        let permit = limit.acquire_owned().await.unwrap();
96
97        if rng.gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
98            authority
99                .metrics
100                .execution_queueing_latency
101                .report(txn_ready_time.elapsed());
102            if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
103                authority
104                    .metrics
105                    .execution_queueing_delay_s
106                    .observe(latency.as_secs_f64());
107            }
108        }
109
110        authority.metrics.execution_rate_tracker.lock().record();
111
112        // Certificate execution can take significant time, so run it in a separate
113        // task.
114        let epoch_store_clone = epoch_store.clone();
115        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
116            let _scope = monitored_scope("ExecutionDriver::task");
117            let _guard = permit;
118            if let Ok(true) = authority.try_is_tx_already_executed(&digest) {
119                return;
120            }
121
122            fail_point_async!("transaction_execution_delay");
123
124            match authority.try_execute_immediately(
125                &certificate,
126                expected_effects_digest,
127                &epoch_store_clone,
128            ) {
129                Err(IotaError::ValidatorHaltedAtEpochEnd) => {
130                    warn!("Could not execute transaction {digest:?} because validator is halted at epoch end. certificate={certificate:?}");
131                    return;
132                }
133                Err(e) => {
134                    fatal!("Failed to execute certified transaction {digest:?}! error={e} certificate={certificate:?}");
135                }
136                _ => (),
137            }
138            authority
139                .metrics
140                .execution_driver_executed_transactions
141                .inc();
142        }.instrument(error_span!("execution_driver", tx_digest = ?digest))));
143    }
144}