1use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{task::JoinHandle, time::sleep};
12use tracing::{debug, error, info};
13
14use crate::{
15 Round,
16 block::BlockAPI as _,
17 context::Context,
18 dag_state::DagState,
19 error::ConsensusError,
20 network::{NetworkClient, NetworkService},
21};
22
23pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
29 context: Arc<Context>,
30 network_client: Arc<C>,
31 authority_service: Arc<S>,
32 dag_state: Arc<RwLock<DagState>>,
33 subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
34}
35
36impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
37 pub(crate) fn new(
38 context: Arc<Context>,
39 network_client: Arc<C>,
40 authority_service: Arc<S>,
41 dag_state: Arc<RwLock<DagState>>,
42 ) -> Self {
43 let subscriptions = (0..context.committee.size())
44 .map(|_| None)
45 .collect::<Vec<_>>();
46 Self {
47 context,
48 network_client,
49 authority_service,
50 dag_state,
51 subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
52 }
53 }
54
55 pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
56 if peer == self.context.own_index {
57 error!("Attempt to subscribe to own validator {peer} is ignored!");
58 return;
59 }
60 let context = self.context.clone();
61 let network_client = self.network_client.clone();
62 let authority_service = self.authority_service.clone();
63 let (mut last_received, gc_round, gc_enabled) = {
64 let dag_state = self.dag_state.read();
65 (
66 dag_state.get_last_block_for_authority(peer).round(),
67 dag_state.gc_round(),
68 dag_state.gc_enabled(),
69 )
70 };
71
72 if gc_enabled && last_received < gc_round {
77 info!(
78 "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
79 );
80 last_received = gc_round;
81 }
82
83 let mut subscriptions = self.subscriptions.lock();
84 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
85 subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
86 context,
87 network_client,
88 authority_service,
89 peer,
90 last_received,
91 )));
92 }
93
94 pub(crate) fn stop(&self) {
95 let mut subscriptions = self.subscriptions.lock();
96 for (peer, _) in self.context.committee.authorities() {
97 self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
98 }
99 }
100
101 fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
102 let peer_hostname = &self.context.committee.authority(peer).hostname;
103 if let Some(subscription) = subscription.take() {
104 subscription.abort();
105 }
106 self.context
110 .metrics
111 .node_metrics
112 .subscribed_to
113 .with_label_values(&[peer_hostname])
114 .set(0);
115 }
116
117 async fn subscription_loop(
118 context: Arc<Context>,
119 network_client: Arc<C>,
120 authority_service: Arc<S>,
121 peer: AuthorityIndex,
122 last_received: Round,
123 ) {
124 const IMMEDIATE_RETRIES: i64 = 3;
125 const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
127 const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
128 const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
129 let peer_hostname = &context.committee.authority(peer).hostname;
130 let mut retries: i64 = 0;
131 let mut delay = INITIAL_RETRY_INTERVAL;
132 'subscription: loop {
133 context
134 .metrics
135 .node_metrics
136 .subscribed_to
137 .with_label_values(&[peer_hostname])
138 .set(0);
139
140 if retries > IMMEDIATE_RETRIES {
141 debug!(
142 "Delaying retry {} of peer {} subscription, in {} seconds",
143 retries,
144 peer_hostname,
145 delay.as_secs_f32(),
146 );
147 sleep(delay).await;
148 delay = delay
150 .mul_f32(RETRY_INTERVAL_MULTIPLIER)
151 .min(MAX_RETRY_INTERVAL);
152 } else if retries > 0 {
153 tokio::task::yield_now().await;
155 } else {
156 delay = INITIAL_RETRY_INTERVAL;
158 }
159 retries += 1;
160
161 let mut blocks = match network_client
162 .subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL)
163 .await
164 {
165 Ok(blocks) => {
166 debug!(
167 "Subscribed to peer {} {} after {} attempts",
168 peer, peer_hostname, retries
169 );
170 context
171 .metrics
172 .node_metrics
173 .subscriber_connection_attempts
174 .with_label_values(&[peer_hostname.as_str(), "success"])
175 .inc();
176 blocks
177 }
178 Err(e) => {
179 debug!(
180 "Failed to subscribe to blocks from peer {} {}: {}",
181 peer, peer_hostname, e
182 );
183 context
184 .metrics
185 .node_metrics
186 .subscriber_connection_attempts
187 .with_label_values(&[peer_hostname.as_str(), "failure"])
188 .inc();
189 continue 'subscription;
190 }
191 };
192
193 context
195 .metrics
196 .node_metrics
197 .subscribed_to
198 .with_label_values(&[peer_hostname])
199 .set(1);
200
201 'stream: loop {
202 match blocks.next().await {
203 Some(block) => {
204 context
205 .metrics
206 .node_metrics
207 .subscribed_blocks
208 .with_label_values(&[peer_hostname])
209 .inc();
210 let result = authority_service
211 .handle_send_block(peer, block.clone())
212 .await;
213 if let Err(e) = result {
214 match e {
215 ConsensusError::BlockRejected { block_ref, reason } => {
216 debug!(
217 "Failed to process block from peer {} {} for block {:?}: {}",
218 peer, peer_hostname, block_ref, reason
219 );
220 }
221 _ => {
222 info!(
223 "Invalid block received from peer {} {}: {}",
224 peer, peer_hostname, e
225 );
226 }
227 }
228 }
229 retries = 0;
231 }
232 None => {
233 debug!(
234 "Subscription to blocks from peer {} {} ended",
235 peer, peer_hostname
236 );
237 retries += 1;
238 break 'stream;
239 }
240 }
241 }
242 }
243 }
244}
245
246#[cfg(test)]
247mod test {
248 use async_trait::async_trait;
249 use bytes::Bytes;
250 use futures::stream;
251
252 use super::*;
253 use crate::{
254 VerifiedBlock,
255 block::BlockRef,
256 commit::CommitRange,
257 error::ConsensusResult,
258 network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
259 storage::mem_store::MemStore,
260 };
261
262 struct SubscriberTestClient {}
263
264 impl SubscriberTestClient {
265 fn new() -> Self {
266 Self {}
267 }
268 }
269
270 #[async_trait]
271 impl NetworkClient for SubscriberTestClient {
272 const SUPPORT_STREAMING: bool = true;
273
274 async fn send_block(
275 &self,
276 _peer: AuthorityIndex,
277 _block: &VerifiedBlock,
278 _timeout: Duration,
279 ) -> ConsensusResult<()> {
280 unimplemented!("Unimplemented")
281 }
282
283 async fn subscribe_blocks(
284 &self,
285 _peer: AuthorityIndex,
286 _last_received: Round,
287 _timeout: Duration,
288 ) -> ConsensusResult<BlockStream> {
289 let block_stream = stream::unfold((), |_| async {
290 sleep(Duration::from_millis(1)).await;
291 let block = ExtendedSerializedBlock {
292 block: Bytes::from(vec![1u8; 8]),
293 excluded_ancestors: vec![],
294 };
295 Some((block, ()))
296 })
297 .take(10);
298 Ok(Box::pin(block_stream))
299 }
300
301 async fn fetch_blocks(
302 &self,
303 _peer: AuthorityIndex,
304 _block_refs: Vec<BlockRef>,
305 _highest_accepted_rounds: Vec<Round>,
306 _timeout: Duration,
307 ) -> ConsensusResult<Vec<Bytes>> {
308 unimplemented!("Unimplemented")
309 }
310
311 async fn fetch_commits(
312 &self,
313 _peer: AuthorityIndex,
314 _commit_range: CommitRange,
315 _timeout: Duration,
316 ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
317 unimplemented!("Unimplemented")
318 }
319
320 async fn fetch_latest_blocks(
321 &self,
322 _peer: AuthorityIndex,
323 _authorities: Vec<AuthorityIndex>,
324 _timeout: Duration,
325 ) -> ConsensusResult<Vec<Bytes>> {
326 unimplemented!("Unimplemented")
327 }
328
329 async fn get_latest_rounds(
330 &self,
331 _peer: AuthorityIndex,
332 _timeout: Duration,
333 ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
334 unimplemented!("Unimplemented")
335 }
336 }
337
338 #[tokio::test(flavor = "current_thread", start_paused = true)]
339 async fn subscriber_retries() {
340 let (context, _keys) = Context::new_for_test(4);
341 let context = Arc::new(context);
342 let authority_service = Arc::new(Mutex::new(TestService::new()));
343 let network_client = Arc::new(SubscriberTestClient::new());
344 let store = Arc::new(MemStore::new());
345 let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
346 let subscriber = Subscriber::new(
347 context.clone(),
348 network_client,
349 authority_service.clone(),
350 dag_state,
351 );
352
353 let peer = context.committee.to_authority_index(2).unwrap();
354 subscriber.subscribe(peer);
355
356 for _ in 0..10 {
358 tokio::time::sleep(Duration::from_secs(1)).await;
359 let service = authority_service.lock();
360 if service.handle_send_block.len() >= 100 {
361 break;
362 }
363 }
364
365 let service = authority_service.lock();
368 assert!(service.handle_send_block.len() >= 100);
369 for (p, block) in service.handle_send_block.iter() {
370 assert_eq!(*p, peer);
371 assert_eq!(
372 *block,
373 ExtendedSerializedBlock {
374 block: Bytes::from(vec![1u8; 8]),
375 excluded_ancestors: vec![]
376 }
377 );
378 }
379 }
380}