Skip to main content

iota_core/
execution_driver.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::{Arc, Weak};
6
7use iota_common::{fatal, random::get_rng};
8use iota_macros::fail_point_async;
9use iota_metrics::{monitored_scope, spawn_monitored_task};
10use iota_types::error::IotaError;
11use rand::Rng;
12use tokio::sync::{Semaphore, mpsc::UnboundedReceiver, oneshot};
13use tracing::{Instrument, error_span, info, instrument, warn};
14
15use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
16
17#[cfg(test)]
18#[path = "unit_tests/execution_driver_tests.rs"]
19mod execution_driver_tests;
20
21const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
22
23/// When a notification that a new pending transaction is received we activate
24/// processing the transaction in a loop.
25#[instrument("start_execute_pending_certs", level = "trace", skip_all)]
26pub async fn execution_process(
27    authority_state: Weak<AuthorityState>,
28    mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
29    mut rx_execution_shutdown: oneshot::Receiver<()>,
30) {
31    info!("Starting pending certificates execution process.");
32
33    // Rate limit concurrent executions to # of cpus.
34    let limit = Arc::new(Semaphore::new(num_cpus::get()));
35
36    // Loop whenever there is a signal that a new transactions is ready to process.
37    loop {
38        let _scope = monitored_scope("ExecutionDriver::loop");
39
40        let certificate;
41        let expected_effects_digest;
42        let txn_ready_time;
43        tokio::select! {
44            result = rx_ready_certificates.recv() => {
45                if let Some(pending_cert) = result {
46                    certificate = pending_cert.certificate;
47                    expected_effects_digest = pending_cert.expected_effects_digest;
48                    txn_ready_time = pending_cert.stats.ready_time.unwrap();
49                } else {
50                    // Should only happen after the AuthorityState has shut down and tx_ready_certificate
51                    // has been dropped by TransactionManager.
52                    info!("No more certificate will be received. Exiting executor ...");
53                    return;
54                };
55            }
56            _ = &mut rx_execution_shutdown => {
57                info!("Shutdown signal received. Exiting executor ...");
58                return;
59            }
60        };
61
62        let authority = if let Some(authority) = authority_state.upgrade() {
63            authority
64        } else {
65            // Terminate the execution if authority has already shutdown, even if there can
66            // be more items in rx_ready_certificates.
67            info!("Authority state has shutdown. Exiting ...");
68            return;
69        };
70        authority.metrics.execution_driver_dispatch_queue.dec();
71
72        // TODO: Ideally execution_driver should own a copy of epoch store and recreate
73        // each epoch.
74        let epoch_store = authority.load_epoch_store_one_call_per_task();
75
76        let digest = *certificate.digest();
77
78        if epoch_store.epoch() != certificate.epoch() {
79            info!(
80                ?digest,
81                cur_epoch = epoch_store.epoch(),
82                cert_epoch = certificate.epoch(),
83                "Ignoring certificate from previous epoch."
84            );
85            continue;
86        }
87
88        let limit = limit.clone();
89        // hold semaphore permit until task completes. unwrap ok because we never close
90        // the semaphore in this context.
91        let permit = limit.acquire_owned().await.unwrap();
92
93        if get_rng().gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
94            authority
95                .metrics
96                .execution_queueing_latency
97                .report(txn_ready_time.elapsed());
98            if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
99                authority
100                    .metrics
101                    .execution_queueing_delay_s
102                    .observe(latency.as_secs_f64());
103            }
104        }
105
106        authority.metrics.execution_rate_tracker.lock().record();
107
108        // Certificate execution can take significant time, so run it in a separate
109        // task.
110        let epoch_store_clone = epoch_store.clone();
111        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
112            let _scope = monitored_scope("ExecutionDriver::task");
113            let _guard = permit;
114            if let Ok(true) = authority.try_is_tx_already_executed(&digest) {
115                return;
116            }
117
118            fail_point_async!("transaction_execution_delay");
119
120            match authority.try_execute_immediately(
121                &certificate,
122                expected_effects_digest,
123                &epoch_store_clone,
124            ) {
125                Err(IotaError::ValidatorHaltedAtEpochEnd) => {
126                    warn!("Could not execute transaction {digest} because validator is halted at epoch end. certificate={certificate:?}");
127                    return;
128                }
129                Err(e) => {
130                    fatal!("Failed to execute certified transaction {digest}! error={e} certificate={certificate:?}");
131                }
132                _ => (),
133            }
134            authority
135                .metrics
136                .execution_driver_executed_transactions
137                .inc();
138        }.instrument(error_span!("executing_pending_cert", tx_digest = ?digest))));
139    }
140}