1use std::{
6 cmp::{max, min},
7 sync::Arc,
8 time::Duration,
9};
10
11use consensus_config::AuthorityIndex;
12use futures::{StreamExt as _, stream::FuturesUnordered};
13use tokio::{
14 sync::broadcast,
15 task::JoinSet,
16 time::{Instant, error::Elapsed, sleep_until, timeout},
17};
18use tracing::{trace, warn};
19
20use crate::{
21 block::{BlockAPI as _, ExtendedBlock, VerifiedBlock},
22 context::Context,
23 core::CoreSignalsReceivers,
24 error::ConsensusResult,
25 network::NetworkClient,
26};
27
28const BROADCAST_CONCURRENCY: usize = 10;
30
31pub(crate) struct Broadcaster {
37 senders: JoinSet<()>,
39}
40
41impl Broadcaster {
42 const LAST_BLOCK_RETRY_INTERVAL: Duration = Duration::from_secs(2);
43 const MIN_SEND_BLOCK_NETWORK_TIMEOUT: Duration = Duration::from_secs(5);
44
45 pub(crate) fn new<C: NetworkClient>(
46 context: Arc<Context>,
47 network_client: Arc<C>,
48 signals_receiver: &CoreSignalsReceivers,
49 ) -> Self {
50 let mut senders = JoinSet::new();
51 for (index, _authority) in context.committee.authorities() {
52 if index == context.own_index {
54 continue;
55 }
56 senders.spawn(Self::push_blocks(
57 context.clone(),
58 network_client.clone(),
59 signals_receiver.block_broadcast_receiver(),
60 index,
61 ));
62 }
63
64 Self { senders }
65 }
66
67 pub(crate) fn stop(&mut self) {
68 self.senders.abort_all();
70 }
71
72 async fn push_blocks<C: NetworkClient>(
77 context: Arc<Context>,
78 network_client: Arc<C>,
79 mut rx_block_broadcast: broadcast::Receiver<ExtendedBlock>,
80 peer: AuthorityIndex,
81 ) {
82 let peer_hostname = &context.committee.authority(peer).hostname;
83
84 let mut last_block: Option<VerifiedBlock> = None;
89
90 let mut retry_timer = tokio::time::interval(Self::LAST_BLOCK_RETRY_INTERVAL);
92 retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
93 retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
94
95 const RTT_ESTIMATE_DECAY: f64 = 0.95;
100 const TIMEOUT_THRESHOLD_MULTIPLIER: f64 = 2.0;
101 const TIMEOUT_RTT_INCREMENT_FACTOR: f64 = 1.6;
102 let mut rtt_estimate = Duration::from_millis(200);
103
104 let mut requests = FuturesUnordered::new();
105
106 async fn send_block<C: NetworkClient>(
107 network_client: Arc<C>,
108 peer: AuthorityIndex,
109 rtt_estimate: Duration,
110 block: VerifiedBlock,
111 ) -> (Result<ConsensusResult<()>, Elapsed>, Instant, VerifiedBlock) {
112 let start = Instant::now();
113 let req_timeout = rtt_estimate.mul_f64(TIMEOUT_THRESHOLD_MULTIPLIER);
114 let network_timeout =
117 std::cmp::max(req_timeout, Broadcaster::MIN_SEND_BLOCK_NETWORK_TIMEOUT);
118 let resp = timeout(
119 req_timeout,
120 network_client.send_block(peer, &block, network_timeout),
121 )
122 .await;
123 if matches!(resp, Ok(Err(_))) {
124 sleep_until(start + req_timeout).await;
126 }
127 (resp, start, block)
128 }
129
130 loop {
131 tokio::select! {
132 result = rx_block_broadcast.recv(), if requests.len() < BROADCAST_CONCURRENCY => {
133 let block = match result {
134 Ok(block) => block.block,
136 Err(broadcast::error::RecvError::Closed) => {
137 trace!("Sender to {peer} is shutting down!");
138 return;
139 }
140 Err(broadcast::error::RecvError::Lagged(e)) => {
141 warn!("Sender to {peer} is lagging! {e}");
142 continue;
144 }
145 };
146 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block.clone()));
147 if last_block.is_none() || last_block.as_ref().unwrap().round() < block.round() {
148 last_block = Some(block);
149 }
150 }
151
152 Some((resp, start, block)) = requests.next() => {
153 match resp {
154 Ok(Ok(_)) => {
155 let now = Instant::now();
156 rtt_estimate = rtt_estimate.mul_f64(RTT_ESTIMATE_DECAY) + (now - start).mul_f64(1.0 - RTT_ESTIMATE_DECAY);
157 retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
161 },
162 Err(Elapsed { .. }) => {
163 rtt_estimate = rtt_estimate.mul_f64(TIMEOUT_RTT_INCREMENT_FACTOR);
164 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
165 },
166 Ok(Err(_)) => {
167 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
168 },
169 };
170 }
171
172 _ = retry_timer.tick() => {
173 if requests.is_empty() {
174 if let Some(block) = last_block.clone() {
175 requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
176 }
177 }
178 }
179 };
180
181 rtt_estimate = min(rtt_estimate, Duration::from_secs(5));
183 rtt_estimate = max(rtt_estimate, Duration::from_millis(5));
184 context
185 .metrics
186 .node_metrics
187 .broadcaster_rtt_estimate_ms
188 .with_label_values(&[peer_hostname])
189 .set(rtt_estimate.as_millis() as i64);
190 }
191 }
192}
193
194#[cfg(test)]
195mod test {
196 use std::{collections::BTreeMap, ops::DerefMut, time::Duration};
197
198 use async_trait::async_trait;
199 use bytes::Bytes;
200 use parking_lot::Mutex;
201 use tokio::time::sleep;
202
203 use super::*;
204 use crate::{
205 Round,
206 block::{BlockRef, ExtendedBlock, TestBlock},
207 commit::CommitRange,
208 core::CoreSignals,
209 network::BlockStream,
210 };
211
212 struct FakeNetworkClient {
213 blocks_sent: Mutex<BTreeMap<AuthorityIndex, Vec<Bytes>>>,
214 }
215
216 impl FakeNetworkClient {
217 fn new() -> Self {
218 Self {
219 blocks_sent: Mutex::new(BTreeMap::new()),
220 }
221 }
222
223 fn blocks_sent(&self) -> BTreeMap<AuthorityIndex, Vec<Bytes>> {
224 let mut blocks_sent = self.blocks_sent.lock();
225 let result = std::mem::take(blocks_sent.deref_mut());
226 blocks_sent.clear();
227 result
228 }
229 }
230
231 #[async_trait]
232 impl NetworkClient for FakeNetworkClient {
233 const SUPPORT_STREAMING: bool = false;
234
235 async fn send_block(
236 &self,
237 peer: AuthorityIndex,
238 block: &VerifiedBlock,
239 _timeout: Duration,
240 ) -> ConsensusResult<()> {
241 let mut blocks_sent = self.blocks_sent.lock();
242 let blocks = blocks_sent.entry(peer).or_default();
243 blocks.push(block.serialized().clone());
244 Ok(())
245 }
246
247 async fn subscribe_blocks(
248 &self,
249 _peer: AuthorityIndex,
250 _last_received: Round,
251 _timeout: Duration,
252 ) -> ConsensusResult<BlockStream> {
253 unimplemented!("Unimplemented")
254 }
255
256 async fn fetch_blocks(
257 &self,
258 _peer: AuthorityIndex,
259 _block_refs: Vec<BlockRef>,
260 _highest_accepted_rounds: Vec<Round>,
261 _timeout: Duration,
262 ) -> ConsensusResult<Vec<Bytes>> {
263 unimplemented!("Unimplemented")
264 }
265
266 async fn fetch_commits(
267 &self,
268 _peer: AuthorityIndex,
269 _commit_range: CommitRange,
270 _timeout: Duration,
271 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
272 unimplemented!("Unimplemented")
273 }
274
275 async fn fetch_latest_blocks(
276 &self,
277 _peer: AuthorityIndex,
278 _authorities: Vec<AuthorityIndex>,
279 _timeout: Duration,
280 ) -> ConsensusResult<Vec<Bytes>> {
281 unimplemented!("Unimplemented")
282 }
283
284 async fn get_latest_rounds(
285 &self,
286 _peer: AuthorityIndex,
287 _timeout: Duration,
288 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
289 unimplemented!("Unimplemented")
290 }
291 }
292
293 #[tokio::test(flavor = "current_thread", start_paused = true)]
294 async fn test_broadcaster() {
295 let (context, _keys) = Context::new_for_test(4);
296 let context = Arc::new(context);
297 let network_client = Arc::new(FakeNetworkClient::new());
298 let (core_signals, signals_receiver) = CoreSignals::new(context.clone());
299 let _broadcaster =
300 Broadcaster::new(context.clone(), network_client.clone(), &signals_receiver);
301
302 let block = VerifiedBlock::new_for_test(TestBlock::new(9, 1).build());
303 assert!(
304 core_signals
305 .new_block(ExtendedBlock {
306 block: block.clone(),
307 excluded_ancestors: vec![],
308 })
309 .is_ok(),
310 "No subscriber active to receive the block"
311 );
312
313 sleep(Duration::from_millis(1)).await;
315 let blocks_sent = network_client.blocks_sent();
316 for (index, _) in context.committee.authorities() {
317 if index == context.own_index {
318 continue;
319 }
320 assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
321 }
322
323 sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
325 let blocks_sent = network_client.blocks_sent();
326 for (index, _) in context.committee.authorities() {
327 if index == context.own_index {
328 continue;
329 }
330 assert!(!blocks_sent.contains_key(&index));
331 }
332
333 sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
335 let blocks_sent = network_client.blocks_sent();
336 for (index, _) in context.committee.authorities() {
337 if index == context.own_index {
338 continue;
339 }
340 assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
341 }
342 }
343}