1use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{
12 task::JoinHandle,
13 time::{sleep, timeout},
14};
15use tracing::{debug, error, info};
16
17use crate::{
18 Round,
19 block::BlockAPI as _,
20 context::Context,
21 dag_state::DagState,
22 error::ConsensusError,
23 network::{NetworkClient, NetworkService},
24};
25
26pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
32 context: Arc<Context>,
33 network_client: Arc<C>,
34 authority_service: Arc<S>,
35 dag_state: Arc<RwLock<DagState>>,
36 subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
37}
38
39impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
40 pub(crate) fn new(
41 context: Arc<Context>,
42 network_client: Arc<C>,
43 authority_service: Arc<S>,
44 dag_state: Arc<RwLock<DagState>>,
45 ) -> Self {
46 let subscriptions = (0..context.committee.size())
47 .map(|_| None)
48 .collect::<Vec<_>>();
49 Self {
50 context,
51 network_client,
52 authority_service,
53 dag_state,
54 subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
55 }
56 }
57
58 pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
59 if peer == self.context.own_index {
60 error!("Attempt to subscribe to own validator {peer} is ignored!");
61 return;
62 }
63 let context = self.context.clone();
64 let network_client = self.network_client.clone();
65 let authority_service = self.authority_service.clone();
66 let (mut last_received, gc_round, gc_enabled) = {
67 let dag_state = self.dag_state.read();
68 (
69 dag_state.get_last_block_for_authority(peer).round(),
70 dag_state.gc_round(),
71 dag_state.gc_enabled(),
72 )
73 };
74
75 if gc_enabled && last_received < gc_round {
80 info!(
81 "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
82 );
83 last_received = gc_round;
84 }
85
86 let mut subscriptions = self.subscriptions.lock();
87 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
88 subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
89 context,
90 network_client,
91 authority_service,
92 peer,
93 last_received,
94 )));
95 }
96
97 pub(crate) fn stop(&self) {
98 let mut subscriptions = self.subscriptions.lock();
99 for (peer, _) in self.context.committee.authorities() {
100 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
101 }
102 }
103
104 fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
105 let peer_hostname = &self.context.committee.authority(peer).hostname;
106 if let Some(subscription) = subscription.take() {
107 subscription.abort();
108 }
109 self.context
113 .metrics
114 .node_metrics
115 .subscribed_to
116 .with_label_values(&[peer_hostname])
117 .set(0);
118 }
119
120 async fn subscription_loop(
121 context: Arc<Context>,
122 network_client: Arc<C>,
123 authority_service: Arc<S>,
124 peer: AuthorityIndex,
125 last_received: Round,
126 ) {
127 const IMMEDIATE_RETRIES: i64 = 3;
128 const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
130 const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
131 const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
132 let peer_hostname = &context.committee.authority(peer).hostname;
133 let mut retries: i64 = 0;
134 let mut delay = INITIAL_RETRY_INTERVAL;
135 'subscription: loop {
136 context
137 .metrics
138 .node_metrics
139 .subscribed_to
140 .with_label_values(&[peer_hostname])
141 .set(0);
142
143 if retries > IMMEDIATE_RETRIES {
144 debug!(
145 "Delaying retry {} of peer {} subscription, in {} seconds",
146 retries,
147 peer_hostname,
148 delay.as_secs_f32(),
149 );
150 sleep(delay).await;
151 delay = delay
153 .mul_f32(RETRY_INTERVAL_MULTIPLIER)
154 .min(MAX_RETRY_INTERVAL);
155 } else if retries > 0 {
156 tokio::task::yield_now().await;
158 } else {
159 delay = INITIAL_RETRY_INTERVAL;
161 }
162 retries += 1;
163
164 let subscribe_future =
166 network_client.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL);
167 let subscribe_result = timeout(MAX_RETRY_INTERVAL * 5, subscribe_future).await;
168 let mut blocks = match subscribe_result {
169 Ok(inner_result) => match inner_result {
170 Ok(blocks) => {
171 debug!(
172 "Subscribed to peer {} {} after {} attempts",
173 peer, peer_hostname, retries
174 );
175 context
176 .metrics
177 .node_metrics
178 .subscriber_connection_attempts
179 .with_label_values(&[peer_hostname.as_str(), "success"])
180 .inc();
181 blocks
182 }
183 Err(e) => {
184 debug!(
185 "Failed to subscribe to blocks from peer {} {}: {}",
186 peer, peer_hostname, e
187 );
188 context
189 .metrics
190 .node_metrics
191 .subscriber_connection_attempts
192 .with_label_values(&[peer_hostname.as_str(), "failure"])
193 .inc();
194 continue 'subscription;
195 }
196 },
197 Err(_) => {
198 debug!(
199 "Timeout subscribing to blocks from peer {} {}",
200 peer, peer_hostname
201 );
202 context
203 .metrics
204 .node_metrics
205 .subscriber_connection_attempts
206 .with_label_values(&[peer_hostname.as_str(), "timeout"])
207 .inc();
208 continue 'subscription;
209 }
210 };
211
212 context
214 .metrics
215 .node_metrics
216 .subscribed_to
217 .with_label_values(&[peer_hostname])
218 .set(1);
219
220 'stream: loop {
221 match blocks.next().await {
222 Some(block) => {
223 context
224 .metrics
225 .node_metrics
226 .subscribed_blocks
227 .with_label_values(&[peer_hostname])
228 .inc();
229 let result = authority_service
230 .handle_send_block(peer, block.clone())
231 .await;
232 if let Err(e) = result {
233 match e {
234 ConsensusError::BlockRejected { block_ref, reason } => {
235 debug!(
236 "Failed to process block from peer {} {} for block {:?}: {}",
237 peer, peer_hostname, block_ref, reason
238 );
239 }
240 _ => {
241 info!(
242 "Invalid block received from peer {} {}: {}",
243 peer, peer_hostname, e
244 );
245 }
246 }
247 }
248 retries = 0;
250 }
251 None => {
252 debug!(
253 "Subscription to blocks from peer {} {} ended",
254 peer, peer_hostname
255 );
256 retries += 1;
257 break 'stream;
258 }
259 }
260 }
261 }
262 }
263}
264
265#[cfg(test)]
266mod test {
267 use async_trait::async_trait;
268 use bytes::Bytes;
269 use futures::stream;
270
271 use super::*;
272 use crate::{
273 VerifiedBlock,
274 block::BlockRef,
275 commit::CommitRange,
276 error::ConsensusResult,
277 network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
278 storage::mem_store::MemStore,
279 };
280
281 struct SubscriberTestClient {}
282
283 impl SubscriberTestClient {
284 fn new() -> Self {
285 Self {}
286 }
287 }
288
289 #[async_trait]
290 impl NetworkClient for SubscriberTestClient {
291 const SUPPORT_STREAMING: bool = true;
292
293 async fn send_block(
294 &self,
295 _peer: AuthorityIndex,
296 _block: &VerifiedBlock,
297 _timeout: Duration,
298 ) -> ConsensusResult<()> {
299 unimplemented!("Unimplemented")
300 }
301
302 async fn subscribe_blocks(
303 &self,
304 _peer: AuthorityIndex,
305 _last_received: Round,
306 _timeout: Duration,
307 ) -> ConsensusResult<BlockStream> {
308 let block_stream = stream::unfold((), |_| async {
309 sleep(Duration::from_millis(1)).await;
310 let block = ExtendedSerializedBlock {
311 block: Bytes::from(vec![1u8; 8]),
312 excluded_ancestors: vec![],
313 };
314 Some((block, ()))
315 })
316 .take(10);
317 Ok(Box::pin(block_stream))
318 }
319
320 async fn fetch_blocks(
321 &self,
322 _peer: AuthorityIndex,
323 _block_refs: Vec<BlockRef>,
324 _highest_accepted_rounds: Vec<Round>,
325 _timeout: Duration,
326 ) -> ConsensusResult<Vec<Bytes>> {
327 unimplemented!("Unimplemented")
328 }
329
330 async fn fetch_commits(
331 &self,
332 _peer: AuthorityIndex,
333 _commit_range: CommitRange,
334 _timeout: Duration,
335 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
336 unimplemented!("Unimplemented")
337 }
338
339 async fn fetch_latest_blocks(
340 &self,
341 _peer: AuthorityIndex,
342 _authorities: Vec<AuthorityIndex>,
343 _timeout: Duration,
344 ) -> ConsensusResult<Vec<Bytes>> {
345 unimplemented!("Unimplemented")
346 }
347
348 async fn get_latest_rounds(
349 &self,
350 _peer: AuthorityIndex,
351 _timeout: Duration,
352 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
353 unimplemented!("Unimplemented")
354 }
355 }
356
357 #[tokio::test(flavor = "current_thread", start_paused = true)]
358 async fn subscriber_retries() {
359 let (context, _keys) = Context::new_for_test(4);
360 let context = Arc::new(context);
361 let authority_service = Arc::new(Mutex::new(TestService::new()));
362 let network_client = Arc::new(SubscriberTestClient::new());
363 let store = Arc::new(MemStore::new());
364 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
365 let subscriber = Subscriber::new(
366 context.clone(),
367 network_client,
368 authority_service.clone(),
369 dag_state,
370 );
371
372 let peer = context.committee.to_authority_index(2).unwrap();
373 subscriber.subscribe(peer);
374
375 for _ in 0..10 {
377 tokio::time::sleep(Duration::from_secs(1)).await;
378 let service = authority_service.lock();
379 if service.handle_send_block.len() >= 100 {
380 break;
381 }
382 }
383
384 let service = authority_service.lock();
387 assert!(service.handle_send_block.len() >= 100);
388 for (p, block) in service.handle_send_block.iter() {
389 assert_eq!(*p, peer);
390 assert_eq!(
391 *block,
392 ExtendedSerializedBlock {
393 block: Bytes::from(vec![1u8; 8]),
394 excluded_ancestors: vec![]
395 }
396 );
397 }
398 }
399}