1use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info};
16
17use crate::{
18 Round,
19 block::BlockAPI as _,
20 context::Context,
21 dag_state::DagState,
22 error::ConsensusError,
23 network::{NetworkClient, NetworkService},
24 scoring_metrics_store::ErrorSource,
25};
26
27pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
33 context: Arc<Context>,
34 network_client: Arc<C>,
35 authority_service: Arc<S>,
36 dag_state: Arc<RwLock<DagState>>,
37 subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
38}
39
40impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
41 pub(crate) fn new(
42 context: Arc<Context>,
43 network_client: Arc<C>,
44 authority_service: Arc<S>,
45 dag_state: Arc<RwLock<DagState>>,
46 ) -> Self {
47 let subscriptions = (0..context.committee.size())
48 .map(|_| None)
49 .collect::<Vec<_>>();
50 Self {
51 context,
52 network_client,
53 authority_service,
54 dag_state,
55 subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
56 }
57 }
58
59 pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
60 if peer == self.context.own_index {
61 error!("Attempt to subscribe to own validator {peer} is ignored!");
62 return;
63 }
64 let context = self.context.clone();
65 let network_client = self.network_client.clone();
66 let authority_service = self.authority_service.clone();
67 let (mut last_received, gc_round, gc_enabled) = {
68 let dag_state = self.dag_state.read();
69 (
70 dag_state.get_last_block_for_authority(peer).round(),
71 dag_state.gc_round(),
72 dag_state.gc_enabled(),
73 )
74 };
75
76 if gc_enabled && last_received < gc_round {
81 info!(
82 "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
83 );
84 last_received = gc_round;
85 }
86
87 let mut subscriptions = self.subscriptions.lock();
88 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
89 subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
90 context,
91 network_client,
92 authority_service,
93 peer,
94 last_received,
95 )));
96 }
97
98 pub(crate) fn stop(&self) {
99 let mut subscriptions = self.subscriptions.lock();
100 for (peer, _) in self.context.committee.authorities() {
101 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
102 }
103 }
104
105 fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
106 let peer_hostname = &self.context.committee.authority(peer).hostname;
107 if let Some(subscription) = subscription.take() {
108 subscription.abort();
109 }
110 self.context
114 .metrics
115 .node_metrics
116 .subscribed_to
117 .with_label_values(&[peer_hostname])
118 .set(0);
119 }
120
121 async fn subscription_loop(
122 context: Arc<Context>,
123 network_client: Arc<C>,
124 authority_service: Arc<S>,
125 peer: AuthorityIndex,
126 last_received: Round,
127 ) {
128 const IMMEDIATE_RETRIES: i64 = 3;
129 const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
131 const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
132 const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
133 let peer_hostname = &context.committee.authority(peer).hostname;
134 let mut retries: i64 = 0;
135 let mut delay = INITIAL_RETRY_INTERVAL;
136 'subscription: loop {
137 context
138 .metrics
139 .node_metrics
140 .subscribed_to
141 .with_label_values(&[peer_hostname])
142 .set(0);
143
144 if retries > IMMEDIATE_RETRIES {
145 debug!(
146 "Delaying retry {} of peer {} subscription, in {} seconds",
147 retries,
148 peer_hostname,
149 delay.as_secs_f32(),
150 );
151 sleep(delay).await;
152 delay = delay
154 .mul_f32(RETRY_INTERVAL_MULTIPLIER)
155 .min(MAX_RETRY_INTERVAL);
156 } else if retries > 0 {
157 tokio::task::yield_now().await;
159 } else {
160 delay = INITIAL_RETRY_INTERVAL;
162 }
163 retries += 1;
164
165 let subscribe_future =
167 network_client.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL);
168 let subscribe_result = timeout(MAX_RETRY_INTERVAL * 5, subscribe_future).await;
169 let mut blocks = match subscribe_result {
170 Ok(inner_result) => match inner_result {
171 Ok(blocks) => {
172 debug!(
173 "Subscribed to peer {} {} after {} attempts",
174 peer, peer_hostname, retries
175 );
176 context
177 .metrics
178 .node_metrics
179 .subscriber_connection_attempts
180 .with_label_values(&[peer_hostname.as_str(), "success"])
181 .inc();
182 blocks
183 }
184 Err(e) => {
185 debug!(
186 "Failed to subscribe to blocks from peer {} {}: {}",
187 peer, peer_hostname, e
188 );
189 context
190 .metrics
191 .node_metrics
192 .subscriber_connection_attempts
193 .with_label_values(&[peer_hostname.as_str(), "failure"])
194 .inc();
195 continue 'subscription;
196 }
197 },
198 Err(_) => {
199 debug!(
200 "Timeout subscribing to blocks from peer {} {}",
201 peer, peer_hostname
202 );
203 context
204 .metrics
205 .node_metrics
206 .subscriber_connection_attempts
207 .with_label_values(&[peer_hostname.as_str(), "timeout"])
208 .inc();
209 continue 'subscription;
210 }
211 };
212
213 context
215 .metrics
216 .node_metrics
217 .subscribed_to
218 .with_label_values(&[peer_hostname])
219 .set(1);
220
221 'stream: loop {
222 match blocks.next().await {
223 Some(block) => {
224 context
225 .metrics
226 .node_metrics
227 .subscribed_blocks
228 .with_label_values(&[peer_hostname])
229 .inc();
230 let result = authority_service
231 .handle_send_block(peer, block.clone())
232 .await;
233 if let Err(e) = result {
234 context
235 .scoring_metrics_store
236 .update_scoring_metrics_on_block_receival(
237 peer,
238 peer_hostname,
239 e.clone(),
240 ErrorSource::Subscriber,
241 &context.metrics.node_metrics,
242 );
243 match e {
244 ConsensusError::BlockRejected { block_ref, reason } => {
245 debug!(
246 "Failed to process block from peer {} {} for block {:?}: {}",
247 peer, peer_hostname, block_ref, reason
248 );
249 }
250 _ => {
251 info!(
252 "Invalid block received from peer {} {}: {}",
253 peer, peer_hostname, e
254 );
255 }
256 }
257 }
258 retries = 0;
260 }
261 None => {
262 debug!(
263 "Subscription to blocks from peer {} {} ended",
264 peer, peer_hostname
265 );
266 retries += 1;
267 break 'stream;
268 }
269 }
270 }
271 }
272 }
273}
274
275#[cfg(test)]
276mod test {
277 use async_trait::async_trait;
278 use bytes::Bytes;
279 use futures::stream;
280
281 use super::*;
282 use crate::{
283 VerifiedBlock,
284 block::BlockRef,
285 commit::CommitRange,
286 error::ConsensusResult,
287 network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
288 storage::mem_store::MemStore,
289 };
290
291 struct SubscriberTestClient {}
292
293 impl SubscriberTestClient {
294 fn new() -> Self {
295 Self {}
296 }
297 }
298
299 #[async_trait]
300 impl NetworkClient for SubscriberTestClient {
301 const SUPPORT_STREAMING: bool = true;
302
303 async fn send_block(
304 &self,
305 _peer: AuthorityIndex,
306 _block: &VerifiedBlock,
307 _timeout: Duration,
308 ) -> ConsensusResult<()> {
309 unimplemented!("Unimplemented")
310 }
311
312 async fn subscribe_blocks(
313 &self,
314 _peer: AuthorityIndex,
315 _last_received: Round,
316 _timeout: Duration,
317 ) -> ConsensusResult<BlockStream> {
318 let block_stream = stream::unfold((), |_| async {
319 sleep(Duration::from_millis(1)).await;
320 let block = ExtendedSerializedBlock {
321 block: Bytes::from(vec![1u8; 8]),
322 excluded_ancestors: vec![],
323 };
324 Some((block, ()))
325 })
326 .take(10);
327 Ok(Box::pin(block_stream))
328 }
329
330 async fn fetch_blocks(
331 &self,
332 _peer: AuthorityIndex,
333 _block_refs: Vec<BlockRef>,
334 _highest_accepted_rounds: Vec<Round>,
335 _timeout: Duration,
336 ) -> ConsensusResult<Vec<Bytes>> {
337 unimplemented!("Unimplemented")
338 }
339
340 async fn fetch_commits(
341 &self,
342 _peer: AuthorityIndex,
343 _commit_range: CommitRange,
344 _timeout: Duration,
345 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
346 unimplemented!("Unimplemented")
347 }
348
349 async fn fetch_latest_blocks(
350 &self,
351 _peer: AuthorityIndex,
352 _authorities: Vec<AuthorityIndex>,
353 _timeout: Duration,
354 ) -> ConsensusResult<Vec<Bytes>> {
355 unimplemented!("Unimplemented")
356 }
357
358 async fn get_latest_rounds(
359 &self,
360 _peer: AuthorityIndex,
361 _timeout: Duration,
362 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
363 unimplemented!("Unimplemented")
364 }
365 }
366
367 #[tokio::test(flavor = "current_thread", start_paused = true)]
368 async fn subscriber_retries() {
369 let (context, _keys) = Context::new_for_test(4);
370 let context = Arc::new(context);
371 let authority_service = Arc::new(Mutex::new(TestService::new()));
372 let network_client = Arc::new(SubscriberTestClient::new());
373 let store = Arc::new(MemStore::new());
374 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
375 let subscriber = Subscriber::new(
376 context.clone(),
377 network_client,
378 authority_service.clone(),
379 dag_state,
380 );
381
382 let peer = context.committee.to_authority_index(2).unwrap();
383 subscriber.subscribe(peer);
384
385 for _ in 0..10 {
387 tokio::time::sleep(Duration::from_secs(1)).await;
388 let service = authority_service.lock();
389 if service.handle_send_block.len() >= 100 {
390 break;
391 }
392 }
393
394 let service = authority_service.lock();
397 assert!(service.handle_send_block.len() >= 100);
398 for (p, block) in service.handle_send_block.iter() {
399 assert_eq!(*p, peer);
400 assert_eq!(
401 *block,
402 ExtendedSerializedBlock {
403 block: Bytes::from(vec![1u8; 8]),
404 excluded_ancestors: vec![]
405 }
406 );
407 }
408 }
409}