iota_core/
execution_driver.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    sync::{Arc, Weak},
7    time::Duration,
8};
9
10use iota_macros::fail_point_async;
11use iota_metrics::{monitored_scope, spawn_monitored_task};
12use rand::{
13    Rng, SeedableRng,
14    rngs::{OsRng, StdRng},
15};
16use tokio::{
17    sync::{Semaphore, mpsc::UnboundedReceiver, oneshot},
18    time::sleep,
19};
20use tracing::{Instrument, error, error_span, info, trace};
21
22use crate::{authority::AuthorityState, transaction_manager::PendingCertificate};
23
24#[cfg(test)]
25#[path = "unit_tests/execution_driver_tests.rs"]
26mod execution_driver_tests;
27
28// Execution should not encounter permanent failures, so any failure can and
29// needs to be retried.
30pub const EXECUTION_MAX_ATTEMPTS: u32 = 10;
31const EXECUTION_FAILURE_RETRY_INTERVAL: Duration = Duration::from_secs(1);
32const QUEUEING_DELAY_SAMPLING_RATIO: f64 = 0.05;
33
34/// When a notification that a new pending transaction is received we activate
35/// processing the transaction in a loop.
36pub async fn execution_process(
37    authority_state: Weak<AuthorityState>,
38    mut rx_ready_certificates: UnboundedReceiver<PendingCertificate>,
39    mut rx_execution_shutdown: oneshot::Receiver<()>,
40) {
41    info!("Starting pending certificates execution process.");
42
43    // Rate limit concurrent executions to # of cpus.
44    let limit = Arc::new(Semaphore::new(num_cpus::get()));
45    let mut rng = StdRng::from_rng(&mut OsRng).unwrap();
46
47    // Loop whenever there is a signal that a new transactions is ready to process.
48    loop {
49        let _scope = monitored_scope("ExecutionDriver::loop");
50
51        let certificate;
52        let expected_effects_digest;
53        let txn_ready_time;
54        tokio::select! {
55            result = rx_ready_certificates.recv() => {
56                if let Some(pending_cert) = result {
57                    certificate = pending_cert.certificate;
58                    expected_effects_digest = pending_cert.expected_effects_digest;
59                    txn_ready_time = pending_cert.stats.ready_time.unwrap();
60                } else {
61                    // Should only happen after the AuthorityState has shut down and tx_ready_certificate
62                    // has been dropped by TransactionManager.
63                    info!("No more certificate will be received. Exiting executor ...");
64                    return;
65                };
66            }
67            _ = &mut rx_execution_shutdown => {
68                info!("Shutdown signal received. Exiting executor ...");
69                return;
70            }
71        };
72
73        let authority = if let Some(authority) = authority_state.upgrade() {
74            authority
75        } else {
76            // Terminate the execution if authority has already shutdown, even if there can
77            // be more items in rx_ready_certificates.
78            info!("Authority state has shutdown. Exiting ...");
79            return;
80        };
81        authority.metrics.execution_driver_dispatch_queue.dec();
82
83        // TODO: Ideally execution_driver should own a copy of epoch store and recreate
84        // each epoch.
85        let epoch_store = authority.load_epoch_store_one_call_per_task();
86
87        let digest = *certificate.digest();
88        trace!(?digest, "Pending certificate execution activated.");
89
90        if epoch_store.epoch() != certificate.epoch() {
91            info!(
92                ?digest,
93                cur_epoch = epoch_store.epoch(),
94                cert_epoch = certificate.epoch(),
95                "Ignoring certificate from previous epoch."
96            );
97            continue;
98        }
99
100        let limit = limit.clone();
101        // hold semaphore permit until task completes. unwrap ok because we never close
102        // the semaphore in this context.
103        let permit = limit.acquire_owned().await.unwrap();
104
105        if rng.gen_range(0.0..1.0) < QUEUEING_DELAY_SAMPLING_RATIO {
106            authority
107                .metrics
108                .execution_queueing_latency
109                .report(txn_ready_time.elapsed());
110            if let Some(latency) = authority.metrics.execution_queueing_latency.latency() {
111                authority
112                    .metrics
113                    .execution_queueing_delay_s
114                    .observe(latency.as_secs_f64());
115            }
116        }
117
118        authority.metrics.execution_rate_tracker.lock().record();
119
120        // Certificate execution can take significant time, so run it in a separate
121        // task.
122        let epoch_store_clone = epoch_store.clone();
123        spawn_monitored_task!(epoch_store.within_alive_epoch(async move {
124            let _scope = monitored_scope("ExecutionDriver::task");
125            let _guard = permit;
126            if let Ok(true) = authority.is_tx_already_executed(&digest) {
127                return;
128            }
129            let mut attempts = 0;
130            loop {
131                fail_point_async!("transaction_execution_delay");
132                attempts += 1;
133                let res = authority
134                    .try_execute_immediately(&certificate, expected_effects_digest, &epoch_store_clone)
135                    .await;
136                if let Err(e) = res {
137                    if attempts == EXECUTION_MAX_ATTEMPTS {
138                        panic!("Failed to execute certified transaction {digest:?} after {attempts} attempts! error={e} certificate={certificate:?}");
139                    }
140                    // Assume only transient failure can happen. Permanent failure is probably
141                    // a bug. There is nothing that can be done to recover from permanent failures.
142                    error!(tx_digest=?digest, "Failed to execute certified transaction {digest:?}! attempt {attempts}, {e}");
143                    sleep(EXECUTION_FAILURE_RETRY_INTERVAL).await;
144                } else {
145                    break;
146                }
147            }
148            authority
149                .metrics
150                .execution_driver_executed_transactions
151                .inc();
152        }.instrument(error_span!("execution_driver", tx_digest = ?digest))));
153    }
154}