1use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info};
16
17use crate::{
18 Round,
19 block::BlockAPI as _,
20 context::Context,
21 dag_state::DagState,
22 error::ConsensusError,
23 network::{NetworkClient, NetworkService},
24};
25
26pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
32 context: Arc<Context>,
33 network_client: Arc<C>,
34 authority_service: Arc<S>,
35 dag_state: Arc<RwLock<DagState>>,
36 subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
37}
38
39impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
40 pub(crate) fn new(
41 context: Arc<Context>,
42 network_client: Arc<C>,
43 authority_service: Arc<S>,
44 dag_state: Arc<RwLock<DagState>>,
45 ) -> Self {
46 let subscriptions = (0..context.committee.size())
47 .map(|_| None)
48 .collect::<Vec<_>>();
49 Self {
50 context,
51 network_client,
52 authority_service,
53 dag_state,
54 subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
55 }
56 }
57
58 pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
59 if peer == self.context.own_index {
60 error!("Attempt to subscribe to own validator {peer} is ignored!");
61 return;
62 }
63 let context = self.context.clone();
64 let network_client = self.network_client.clone();
65 let authority_service = self.authority_service.clone();
66 let (mut last_received, gc_round, gc_enabled) = {
67 let dag_state = self.dag_state.read();
68 (
69 dag_state.get_last_block_for_authority(peer).round(),
70 dag_state.gc_round(),
71 dag_state.gc_enabled(),
72 )
73 };
74
75 if gc_enabled && last_received < gc_round {
80 info!(
81 "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
82 );
83 last_received = gc_round;
84 }
85
86 let mut subscriptions = self.subscriptions.lock();
87 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
88 subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
89 context,
90 network_client,
91 authority_service,
92 peer,
93 last_received,
94 )));
95 }
96
97 pub(crate) fn stop(&self) {
98 let mut subscriptions = self.subscriptions.lock();
99 for (peer, _) in self.context.committee.authorities() {
100 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
101 }
102 }
103
104 fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
105 let peer_hostname = &self.context.committee.authority(peer).hostname;
106 if let Some(subscription) = subscription.take() {
107 subscription.abort();
108 }
109 self.context
113 .metrics
114 .node_metrics
115 .subscribed_to
116 .with_label_values(&[peer_hostname])
117 .set(0);
118 }
119
120 async fn subscription_loop(
121 context: Arc<Context>,
122 network_client: Arc<C>,
123 authority_service: Arc<S>,
124 peer: AuthorityIndex,
125 last_received: Round,
126 ) {
127 const IMMEDIATE_RETRIES: i64 = 3;
128 const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
130 const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
131 const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
132 let peer_hostname = &context.committee.authority(peer).hostname;
133 let mut retries: i64 = 0;
134 let mut delay = INITIAL_RETRY_INTERVAL;
135 'subscription: loop {
136 context
137 .metrics
138 .node_metrics
139 .subscribed_to
140 .with_label_values(&[peer_hostname])
141 .set(0);
142
143 if retries > IMMEDIATE_RETRIES {
144 debug!(
145 "Delaying retry {} of peer {} subscription, in {} seconds",
146 retries,
147 peer_hostname,
148 delay.as_secs_f32(),
149 );
150 sleep(delay).await;
151 delay = delay
153 .mul_f32(RETRY_INTERVAL_MULTIPLIER)
154 .min(MAX_RETRY_INTERVAL);
155 } else if retries > 0 {
156 tokio::task::yield_now().await;
158 } else {
159 delay = INITIAL_RETRY_INTERVAL;
161 }
162 retries += 1;
163
164 let subscribe_future =
166 network_client.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL);
167 let subscribe_result = timeout(MAX_RETRY_INTERVAL * 5, subscribe_future).await;
168 let mut blocks = match subscribe_result {
169 Ok(inner_result) => match inner_result {
170 Ok(blocks) => {
171 debug!(
172 "Subscribed to peer {} {} after {} attempts",
173 peer, peer_hostname, retries
174 );
175 context
176 .metrics
177 .node_metrics
178 .subscriber_connection_attempts
179 .with_label_values(&[peer_hostname.as_str(), "success"])
180 .inc();
181 blocks
182 }
183 Err(e) => {
184 debug!(
185 "Failed to subscribe to blocks from peer {} {}: {}",
186 peer, peer_hostname, e
187 );
188 context
189 .metrics
190 .node_metrics
191 .subscriber_connection_attempts
192 .with_label_values(&[peer_hostname.as_str(), "failure"])
193 .inc();
194 continue 'subscription;
195 }
196 },
197 Err(_) => {
198 debug!(
199 "Timeout subscribing to blocks from peer {} {}",
200 peer, peer_hostname
201 );
202 context
203 .metrics
204 .node_metrics
205 .subscriber_connection_attempts
206 .with_label_values(&[peer_hostname.as_str(), "timeout"])
207 .inc();
208 continue 'subscription;
209 }
210 };
211
212 context
214 .metrics
215 .node_metrics
216 .subscribed_to
217 .with_label_values(&[peer_hostname])
218 .set(1);
219
220 'stream: loop {
221 match blocks.next().await {
222 Some(block) => {
223 context
224 .metrics
225 .node_metrics
226 .subscribed_blocks
227 .with_label_values(&[peer_hostname])
228 .inc();
229 let result = authority_service
230 .handle_send_block(peer, block.clone())
231 .await;
232 if let Err(e) = result {
233 context.metrics.update_scoring_metrics_on_block_receival(
234 peer,
235 peer_hostname,
236 e.clone(),
237 "handle_send_block",
238 );
239 match e {
240 ConsensusError::BlockRejected { block_ref, reason } => {
241 debug!(
242 "Failed to process block from peer {} {} for block {:?}: {}",
243 peer, peer_hostname, block_ref, reason
244 );
245 }
246 _ => {
247 info!(
248 "Invalid block received from peer {} {}: {}",
249 peer, peer_hostname, e
250 );
251 }
252 }
253 }
254 retries = 0;
256 }
257 None => {
258 debug!(
259 "Subscription to blocks from peer {} {} ended",
260 peer, peer_hostname
261 );
262 retries += 1;
263 break 'stream;
264 }
265 }
266 }
267 }
268 }
269}
270
271#[cfg(test)]
272mod test {
273 use async_trait::async_trait;
274 use bytes::Bytes;
275 use futures::stream;
276
277 use super::*;
278 use crate::{
279 VerifiedBlock,
280 block::BlockRef,
281 commit::CommitRange,
282 error::ConsensusResult,
283 network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
284 storage::mem_store::MemStore,
285 };
286
287 struct SubscriberTestClient {}
288
289 impl SubscriberTestClient {
290 fn new() -> Self {
291 Self {}
292 }
293 }
294
295 #[async_trait]
296 impl NetworkClient for SubscriberTestClient {
297 const SUPPORT_STREAMING: bool = true;
298
299 async fn send_block(
300 &self,
301 _peer: AuthorityIndex,
302 _block: &VerifiedBlock,
303 _timeout: Duration,
304 ) -> ConsensusResult<()> {
305 unimplemented!("Unimplemented")
306 }
307
308 async fn subscribe_blocks(
309 &self,
310 _peer: AuthorityIndex,
311 _last_received: Round,
312 _timeout: Duration,
313 ) -> ConsensusResult<BlockStream> {
314 let block_stream = stream::unfold((), |_| async {
315 sleep(Duration::from_millis(1)).await;
316 let block = ExtendedSerializedBlock {
317 block: Bytes::from(vec![1u8; 8]),
318 excluded_ancestors: vec![],
319 };
320 Some((block, ()))
321 })
322 .take(10);
323 Ok(Box::pin(block_stream))
324 }
325
326 async fn fetch_blocks(
327 &self,
328 _peer: AuthorityIndex,
329 _block_refs: Vec<BlockRef>,
330 _highest_accepted_rounds: Vec<Round>,
331 _timeout: Duration,
332 ) -> ConsensusResult<Vec<Bytes>> {
333 unimplemented!("Unimplemented")
334 }
335
336 async fn fetch_commits(
337 &self,
338 _peer: AuthorityIndex,
339 _commit_range: CommitRange,
340 _timeout: Duration,
341 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
342 unimplemented!("Unimplemented")
343 }
344
345 async fn fetch_latest_blocks(
346 &self,
347 _peer: AuthorityIndex,
348 _authorities: Vec<AuthorityIndex>,
349 _timeout: Duration,
350 ) -> ConsensusResult<Vec<Bytes>> {
351 unimplemented!("Unimplemented")
352 }
353
354 async fn get_latest_rounds(
355 &self,
356 _peer: AuthorityIndex,
357 _timeout: Duration,
358 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
359 unimplemented!("Unimplemented")
360 }
361 }
362
363 #[tokio::test(flavor = "current_thread", start_paused = true)]
364 async fn subscriber_retries() {
365 let (context, _keys) = Context::new_for_test(4);
366 let context = Arc::new(context);
367 let authority_service = Arc::new(Mutex::new(TestService::new()));
368 let network_client = Arc::new(SubscriberTestClient::new());
369 let store = Arc::new(MemStore::new());
370 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
371 let subscriber = Subscriber::new(
372 context.clone(),
373 network_client,
374 authority_service.clone(),
375 dag_state,
376 );
377
378 let peer = context.committee.to_authority_index(2).unwrap();
379 subscriber.subscribe(peer);
380
381 for _ in 0..10 {
383 tokio::time::sleep(Duration::from_secs(1)).await;
384 let service = authority_service.lock();
385 if service.handle_send_block.len() >= 100 {
386 break;
387 }
388 }
389
390 let service = authority_service.lock();
393 assert!(service.handle_send_block.len() >= 100);
394 for (p, block) in service.handle_send_block.iter() {
395 assert_eq!(*p, peer);
396 assert_eq!(
397 *block,
398 ExtendedSerializedBlock {
399 block: Bytes::from(vec![1u8; 8]),
400 excluded_ancestors: vec![]
401 }
402 );
403 }
404 }
405}