consensus_core/
broadcaster.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    cmp::{max, min},
7    sync::Arc,
8    time::Duration,
9};
10
11use consensus_config::AuthorityIndex;
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use tokio::{
14    sync::broadcast,
15    task::JoinSet,
16    time::{Instant, error::Elapsed, sleep_until, timeout},
17};
18use tracing::{trace, warn};
19
20use crate::{
21    block::{BlockAPI as _, ExtendedBlock, VerifiedBlock},
22    context::Context,
23    core::CoreSignalsReceivers,
24    error::ConsensusResult,
25    network::NetworkClient,
26};
27
28/// Number of Blocks that can be inflight sending to a peer.
29const BROADCAST_CONCURRENCY: usize = 10;
30
31/// Broadcaster sends newly created blocks to each peer over the network.
32///
33/// For a peer that lags behind or is disconnected, blocks are buffered and
34/// retried until a limit is reached, then old blocks will get dropped from the
35/// buffer.
36pub(crate) struct Broadcaster {
37    // Background tasks listening for new blocks and pushing them to peers.
38    senders: JoinSet<()>,
39}
40
41impl Broadcaster {
42    const LAST_BLOCK_RETRY_INTERVAL: Duration = Duration::from_secs(2);
43    const MIN_SEND_BLOCK_NETWORK_TIMEOUT: Duration = Duration::from_secs(5);
44
45    pub(crate) fn new<C: NetworkClient>(
46        context: Arc<Context>,
47        network_client: Arc<C>,
48        signals_receiver: &CoreSignalsReceivers,
49    ) -> Self {
50        let mut senders = JoinSet::new();
51        for (index, _authority) in context.committee.authorities() {
52            // Skip sending Block to self.
53            if index == context.own_index {
54                continue;
55            }
56            senders.spawn(Self::push_blocks(
57                context.clone(),
58                network_client.clone(),
59                signals_receiver.block_broadcast_receiver(),
60                index,
61            ));
62        }
63
64        Self { senders }
65    }
66
67    pub(crate) fn stop(&mut self) {
68        // Intentionally not waiting for senders to exit, to speed up shutdown.
69        self.senders.abort_all();
70    }
71
72    /// Runs a loop that continuously pushes new blocks received from the
73    /// rx_block_broadcast channel to the target peer.
74    ///
75    /// The loop does not exit until the validator is shutting down.
76    async fn push_blocks<C: NetworkClient>(
77        context: Arc<Context>,
78        network_client: Arc<C>,
79        mut rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
80        peer: AuthorityIndex,
81    ) {
82        let peer_hostname = &context.committee.authority(peer).hostname;
83
84        // Record the last block to be broadcasted, to retry in case no new block is
85        // produced for awhile. Even if the peer has acknowledged the last
86        // block, the block might have been dropped afterwards if the peer
87        // crashed.
88        let mut last_block: Option<VerifiedBlock> = None;
89
90        // Retry last block with an interval.
91        let mut retry_timer = tokio::time::interval(Self::LAST_BLOCK_RETRY_INTERVAL);
92        retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
93        retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
94
95        // Use a simple exponential-decay RTT estimator to adjust the timeout for each
96        // block sent. The estimation logic will be removed once the underlying
97        // transport switches to use streaming and the streaming implementation
98        // can be relied upon for retries.
99        const RTT_ESTIMATE_DECAY: f64 = 0.95;
100        const TIMEOUT_THRESHOLD_MULTIPLIER: f64 = 2.0;
101        const TIMEOUT_RTT_INCREMENT_FACTOR: f64 = 1.6;
102        let mut rtt_estimate = Duration::from_millis(200);
103
104        let mut requests = FuturesUnordered::new();
105
106        async fn send_block<C: NetworkClient>(
107            network_client: Arc<C>,
108            peer: AuthorityIndex,
109            rtt_estimate: Duration,
110            block: VerifiedBlock,
111        ) -> (Result<ConsensusResult<()>, Elapsed>, Instant, VerifiedBlock) {
112            let start = Instant::now();
113            let req_timeout = rtt_estimate.mul_f64(TIMEOUT_THRESHOLD_MULTIPLIER);
114            // Use a minimum timeout of 5s so the receiver does not terminate the request
115            // too early.
116            let network_timeout =
117                std::cmp::max(req_timeout, Broadcaster::MIN_SEND_BLOCK_NETWORK_TIMEOUT);
118            let resp = timeout(
119                req_timeout,
120                network_client.send_block(peer, &block, network_timeout),
121            )
122            .await;
123            if matches!(resp, Ok(Err(_))) {
124                // Add a delay before retrying.
125                sleep_until(start + req_timeout).await;
126            }
127            (resp, start, block)
128        }
129
130        loop {
131            tokio::select! {
132                result = rx_block_broadcast.recv(), if requests.len() < BROADCAST_CONCURRENCY => {
133                    let block = match result {
134                        // Other info from ExtendedBlock are ignored, because Broadcaster is not used in production.
135                        Ok(block) => block.block,
136                        Err(broadcast::error::RecvError::Closed) => {
137                            trace!("Sender to {peer} is shutting down!");
138                            return;
139                        }
140                        Err(broadcast::error::RecvError::Lagged(e)) => {
141                            warn!("Sender to {peer} is lagging! {e}");
142                            // Re-run the loop to receive again.
143                            continue;
144                        }
145                    };
146                    requests.push(send_block(network_client.clone(), peer, rtt_estimate, block.clone()));
147                    if last_block.is_none() || last_block.as_ref().unwrap().round() < block.round() {
148                        last_block = Some(block);
149                    }
150                }
151
152                Some((resp, start, block)) = requests.next() => {
153                    match resp {
154                        Ok(Ok(_)) => {
155                            let now = Instant::now();
156                            rtt_estimate = rtt_estimate.mul_f64(RTT_ESTIMATE_DECAY) + (now - start).mul_f64(1.0 - RTT_ESTIMATE_DECAY);
157                            // Avoid immediately retrying a successfully sent block.
158                            // Resetting timer is unnecessary otherwise because there are
159                            // additional inflight requests.
160                            retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
161                        },
162                        Err(Elapsed { .. }) => {
163                            rtt_estimate = rtt_estimate.mul_f64(TIMEOUT_RTT_INCREMENT_FACTOR);
164                            requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
165                        },
166                        Ok(Err(_)) => {
167                            requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
168                        },
169                    };
170                }
171
172                _ = retry_timer.tick() => {
173                    if requests.is_empty() {
174                        if let Some(block) = last_block.clone() {
175                            requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
176                        }
177                    }
178                }
179            };
180
181            // Limit RTT estimate to be between 5ms and 5s.
182            rtt_estimate = min(rtt_estimate, Duration::from_secs(5));
183            rtt_estimate = max(rtt_estimate, Duration::from_millis(5));
184            context
185                .metrics
186                .node_metrics
187                .broadcaster_rtt_estimate_ms
188                .with_label_values(&[peer_hostname])
189                .set(rtt_estimate.as_millis() as i64);
190        }
191    }
192}
193
194#[cfg(test)]
195mod test {
196    use std::{collections::BTreeMap, ops::DerefMut, time::Duration};
197
198    use async_trait::async_trait;
199    use bytes::Bytes;
200    use parking_lot::Mutex;
201    use tokio::time::sleep;
202
203    use super::*;
204    use crate::{
205        Round,
206        block::{BlockRef, ExtendedBlock, TestBlock},
207        commit::CommitRange,
208        core::CoreSignals,
209        network::BlockStream,
210    };
211
212    struct FakeNetworkClient {
213        blocks_sent: Mutex<BTreeMap<AuthorityIndex, Vec<Bytes>>>,
214    }
215
216    impl FakeNetworkClient {
217        fn new() -> Self {
218            Self {
219                blocks_sent: Mutex::new(BTreeMap::new()),
220            }
221        }
222
223        fn blocks_sent(&self) -> BTreeMap<AuthorityIndex, Vec<Bytes>> {
224            let mut blocks_sent = self.blocks_sent.lock();
225            let result = std::mem::take(blocks_sent.deref_mut());
226            blocks_sent.clear();
227            result
228        }
229    }
230
231    #[async_trait]
232    impl NetworkClient for FakeNetworkClient {
233        const SUPPORT_STREAMING: bool = false;
234
235        async fn send_block(
236            &self,
237            peer: AuthorityIndex,
238            block: &VerifiedBlock,
239            _timeout: Duration,
240        ) -> ConsensusResult<()> {
241            let mut blocks_sent = self.blocks_sent.lock();
242            let blocks = blocks_sent.entry(peer).or_default();
243            blocks.push(block.serialized().clone());
244            Ok(())
245        }
246
247        async fn subscribe_blocks(
248            &self,
249            _peer: AuthorityIndex,
250            _last_received: Round,
251            _timeout: Duration,
252        ) -> ConsensusResult<BlockStream> {
253            unimplemented!("Unimplemented")
254        }
255
256        async fn fetch_blocks(
257            &self,
258            _peer: AuthorityIndex,
259            _block_refs: Vec<BlockRef>,
260            _highest_accepted_rounds: Vec<Round>,
261            _timeout: Duration,
262        ) -> ConsensusResult<Vec<Bytes>> {
263            unimplemented!("Unimplemented")
264        }
265
266        async fn fetch_commits(
267            &self,
268            _peer: AuthorityIndex,
269            _commit_range: CommitRange,
270            _timeout: Duration,
271        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
272            unimplemented!("Unimplemented")
273        }
274
275        async fn fetch_latest_blocks(
276            &self,
277            _peer: AuthorityIndex,
278            _authorities: Vec<AuthorityIndex>,
279            _timeout: Duration,
280        ) -> ConsensusResult<Vec<Bytes>> {
281            unimplemented!("Unimplemented")
282        }
283
284        async fn get_latest_rounds(
285            &self,
286            _peer: AuthorityIndex,
287            _timeout: Duration,
288        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
289            unimplemented!("Unimplemented")
290        }
291    }
292
293    #[tokio::test(flavor = "current_thread", start_paused = true)]
294    async fn test_broadcaster() {
295        let (context, _keys) = Context::new_for_test(4);
296        let context = Arc::new(context);
297        let network_client = Arc::new(FakeNetworkClient::new());
298        let (core_signals, signals_receiver) = CoreSignals::new(context.clone());
299        let _broadcaster =
300            Broadcaster::new(context.clone(), network_client.clone(), &signals_receiver);
301
302        let block = VerifiedBlock::new_for_test(TestBlock::new(9, 1).build());
303        assert!(
304            core_signals
305                .new_block(ExtendedBlock {
306                    block: block.clone(),
307                    excluded_ancestors: vec![],
308                })
309                .is_ok(),
310            "No subscriber active to receive the block"
311        );
312
313        // block should be broadcasted immediately to all peers.
314        sleep(Duration::from_millis(1)).await;
315        let blocks_sent = network_client.blocks_sent();
316        for (index, _) in context.committee.authorities() {
317            if index == context.own_index {
318                continue;
319            }
320            assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
321        }
322
323        // block should not be re-broadcasted ...
324        sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
325        let blocks_sent = network_client.blocks_sent();
326        for (index, _) in context.committee.authorities() {
327            if index == context.own_index {
328                continue;
329            }
330            assert!(!blocks_sent.contains_key(&index));
331        }
332
333        // ... until LAST_BLOCK_RETRY_INTERVAL
334        sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
335        let blocks_sent = network_client.blocks_sent();
336        for (index, _) in context.committee.authorities() {
337            if index == context.own_index {
338                continue;
339            }
340            assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
341        }
342    }
343}