consensus_core/
subscriber.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info};
16
17use crate::{
18    Round,
19    block::BlockAPI as _,
20    context::Context,
21    dag_state::DagState,
22    error::ConsensusError,
23    network::{NetworkClient, NetworkService},
24};
25
26/// Subscriber manages the block stream subscriptions to other peers, taking
27/// care of retrying when subscription streams break. Blocks returned from the
28/// peer are sent to the authority service for processing.
29/// Currently subscription management for individual peer is not exposed, but it
30/// could become useful in future.
31pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
32    context: Arc<Context>,
33    network_client: Arc<C>,
34    authority_service: Arc<S>,
35    dag_state: Arc<RwLock<DagState>>,
36    subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
37}
38
39impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
40    pub(crate) fn new(
41        context: Arc<Context>,
42        network_client: Arc<C>,
43        authority_service: Arc<S>,
44        dag_state: Arc<RwLock<DagState>>,
45    ) -> Self {
46        let subscriptions = (0..context.committee.size())
47            .map(|_| None)
48            .collect::<Vec<_>>();
49        Self {
50            context,
51            network_client,
52            authority_service,
53            dag_state,
54            subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
55        }
56    }
57
58    pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
59        if peer == self.context.own_index {
60            error!("Attempt to subscribe to own validator {peer} is ignored!");
61            return;
62        }
63        let context = self.context.clone();
64        let network_client = self.network_client.clone();
65        let authority_service = self.authority_service.clone();
66        let (mut last_received, gc_round, gc_enabled) = {
67            let dag_state = self.dag_state.read();
68            (
69                dag_state.get_last_block_for_authority(peer).round(),
70                dag_state.gc_round(),
71                dag_state.gc_enabled(),
72            )
73        };
74
75        // If the latest block we have accepted by an authority is older than the
76        // current gc round, then do not attempt to fetch any blocks from that
77        // point as they will simply be skipped. Instead do attempt to fetch
78        // from the gc round.
79        if gc_enabled && last_received < gc_round {
80            info!(
81                "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
82            );
83            last_received = gc_round;
84        }
85
86        let mut subscriptions = self.subscriptions.lock();
87        self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
88        subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
89            context,
90            network_client,
91            authority_service,
92            peer,
93            last_received,
94        )));
95    }
96
97    pub(crate) fn stop(&self) {
98        let mut subscriptions = self.subscriptions.lock();
99        for (peer, _) in self.context.committee.authorities() {
100            self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
101        }
102    }
103
104    fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
105        let peer_hostname = &self.context.committee.authority(peer).hostname;
106        if let Some(subscription) = subscription.take() {
107            subscription.abort();
108        }
109        // There is a race between shutting down the subscription task and clearing the
110        // metric here. TODO: fix the race when unsubscribe_locked() gets called
111        // outside of stop().
112        self.context
113            .metrics
114            .node_metrics
115            .subscribed_to
116            .with_label_values(&[peer_hostname])
117            .set(0);
118    }
119
120    async fn subscription_loop(
121        context: Arc<Context>,
122        network_client: Arc<C>,
123        authority_service: Arc<S>,
124        peer: AuthorityIndex,
125        last_received: Round,
126    ) {
127        const IMMEDIATE_RETRIES: i64 = 3;
128        // When not immediately retrying, limit retry delay between 100ms and 10s.
129        const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
130        const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
131        const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
132        let peer_hostname = &context.committee.authority(peer).hostname;
133        let mut retries: i64 = 0;
134        let mut delay = INITIAL_RETRY_INTERVAL;
135        'subscription: loop {
136            context
137                .metrics
138                .node_metrics
139                .subscribed_to
140                .with_label_values(&[peer_hostname])
141                .set(0);
142
143            if retries > IMMEDIATE_RETRIES {
144                debug!(
145                    "Delaying retry {} of peer {} subscription, in {} seconds",
146                    retries,
147                    peer_hostname,
148                    delay.as_secs_f32(),
149                );
150                sleep(delay).await;
151                // Update delay for the next retry.
152                delay = delay
153                    .mul_f32(RETRY_INTERVAL_MULTIPLIER)
154                    .min(MAX_RETRY_INTERVAL);
155            } else if retries > 0 {
156                // Retry immediately, but still yield to avoid monopolizing the thread.
157                tokio::task::yield_now().await;
158            } else {
159                // First attempt, reset delay for next retries but no waiting.
160                delay = INITIAL_RETRY_INTERVAL;
161            }
162            retries += 1;
163
164            // Wrap subscribe_blocks in a timeout and increment metric on timeout
165            let subscribe_future =
166                network_client.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL);
167            let subscribe_result = timeout(MAX_RETRY_INTERVAL * 5, subscribe_future).await;
168            let mut blocks = match subscribe_result {
169                Ok(inner_result) => match inner_result {
170                    Ok(blocks) => {
171                        debug!(
172                            "Subscribed to peer {} {} after {} attempts",
173                            peer, peer_hostname, retries
174                        );
175                        context
176                            .metrics
177                            .node_metrics
178                            .subscriber_connection_attempts
179                            .with_label_values(&[peer_hostname.as_str(), "success"])
180                            .inc();
181                        blocks
182                    }
183                    Err(e) => {
184                        debug!(
185                            "Failed to subscribe to blocks from peer {} {}: {}",
186                            peer, peer_hostname, e
187                        );
188                        context
189                            .metrics
190                            .node_metrics
191                            .subscriber_connection_attempts
192                            .with_label_values(&[peer_hostname.as_str(), "failure"])
193                            .inc();
194                        continue 'subscription;
195                    }
196                },
197                Err(_) => {
198                    debug!(
199                        "Timeout subscribing to blocks from peer {} {}",
200                        peer, peer_hostname
201                    );
202                    context
203                        .metrics
204                        .node_metrics
205                        .subscriber_connection_attempts
206                        .with_label_values(&[peer_hostname.as_str(), "timeout"])
207                        .inc();
208                    continue 'subscription;
209                }
210            };
211
212            // Now can consider the subscription successful
213            context
214                .metrics
215                .node_metrics
216                .subscribed_to
217                .with_label_values(&[peer_hostname])
218                .set(1);
219
220            'stream: loop {
221                match blocks.next().await {
222                    Some(block) => {
223                        context
224                            .metrics
225                            .node_metrics
226                            .subscribed_blocks
227                            .with_label_values(&[peer_hostname])
228                            .inc();
229                        let result = authority_service
230                            .handle_send_block(peer, block.clone())
231                            .await;
232                        if let Err(e) = result {
233                            context.metrics.update_scoring_metrics_on_block_receival(
234                                peer,
235                                peer_hostname,
236                                e.clone(),
237                                "handle_send_block",
238                            );
239                            match e {
240                                ConsensusError::BlockRejected { block_ref, reason } => {
241                                    debug!(
242                                        "Failed to process block from peer {} {} for block {:?}: {}",
243                                        peer, peer_hostname, block_ref, reason
244                                    );
245                                }
246                                _ => {
247                                    info!(
248                                        "Invalid block received from peer {} {}: {}",
249                                        peer, peer_hostname, e
250                                    );
251                                }
252                            }
253                        }
254                        // Reset retries when a block is received.
255                        retries = 0;
256                    }
257                    None => {
258                        debug!(
259                            "Subscription to blocks from peer {} {} ended",
260                            peer, peer_hostname
261                        );
262                        retries += 1;
263                        break 'stream;
264                    }
265                }
266            }
267        }
268    }
269}
270
271#[cfg(test)]
272mod test {
273    use async_trait::async_trait;
274    use bytes::Bytes;
275    use futures::stream;
276
277    use super::*;
278    use crate::{
279        VerifiedBlock,
280        block::BlockRef,
281        commit::CommitRange,
282        error::ConsensusResult,
283        network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
284        storage::mem_store::MemStore,
285    };
286
287    struct SubscriberTestClient {}
288
289    impl SubscriberTestClient {
290        fn new() -> Self {
291            Self {}
292        }
293    }
294
295    #[async_trait]
296    impl NetworkClient for SubscriberTestClient {
297        const SUPPORT_STREAMING: bool = true;
298
299        async fn send_block(
300            &self,
301            _peer: AuthorityIndex,
302            _block: &VerifiedBlock,
303            _timeout: Duration,
304        ) -> ConsensusResult<()> {
305            unimplemented!("Unimplemented")
306        }
307
308        async fn subscribe_blocks(
309            &self,
310            _peer: AuthorityIndex,
311            _last_received: Round,
312            _timeout: Duration,
313        ) -> ConsensusResult<BlockStream> {
314            let block_stream = stream::unfold((), |_| async {
315                sleep(Duration::from_millis(1)).await;
316                let block = ExtendedSerializedBlock {
317                    block: Bytes::from(vec![1u8; 8]),
318                    excluded_ancestors: vec![],
319                };
320                Some((block, ()))
321            })
322            .take(10);
323            Ok(Box::pin(block_stream))
324        }
325
326        async fn fetch_blocks(
327            &self,
328            _peer: AuthorityIndex,
329            _block_refs: Vec<BlockRef>,
330            _highest_accepted_rounds: Vec<Round>,
331            _timeout: Duration,
332        ) -> ConsensusResult<Vec<Bytes>> {
333            unimplemented!("Unimplemented")
334        }
335
336        async fn fetch_commits(
337            &self,
338            _peer: AuthorityIndex,
339            _commit_range: CommitRange,
340            _timeout: Duration,
341        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
342            unimplemented!("Unimplemented")
343        }
344
345        async fn fetch_latest_blocks(
346            &self,
347            _peer: AuthorityIndex,
348            _authorities: Vec<AuthorityIndex>,
349            _timeout: Duration,
350        ) -> ConsensusResult<Vec<Bytes>> {
351            unimplemented!("Unimplemented")
352        }
353
354        async fn get_latest_rounds(
355            &self,
356            _peer: AuthorityIndex,
357            _timeout: Duration,
358        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
359            unimplemented!("Unimplemented")
360        }
361    }
362
363    #[tokio::test(flavor = "current_thread", start_paused = true)]
364    async fn subscriber_retries() {
365        let (context, _keys) = Context::new_for_test(4);
366        let context = Arc::new(context);
367        let authority_service = Arc::new(Mutex::new(TestService::new()));
368        let network_client = Arc::new(SubscriberTestClient::new());
369        let store = Arc::new(MemStore::new());
370        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
371        let subscriber = Subscriber::new(
372            context.clone(),
373            network_client,
374            authority_service.clone(),
375            dag_state,
376        );
377
378        let peer = context.committee.to_authority_index(2).unwrap();
379        subscriber.subscribe(peer);
380
381        // Wait for enough blocks received.
382        for _ in 0..10 {
383            tokio::time::sleep(Duration::from_secs(1)).await;
384            let service = authority_service.lock();
385            if service.handle_send_block.len() >= 100 {
386                break;
387            }
388        }
389
390        // Even if the stream ends after 10 blocks, the subscriber should retry and get
391        // enough blocks eventually.
392        let service = authority_service.lock();
393        assert!(service.handle_send_block.len() >= 100);
394        for (p, block) in service.handle_send_block.iter() {
395            assert_eq!(*p, peer);
396            assert_eq!(
397                *block,
398                ExtendedSerializedBlock {
399                    block: Bytes::from(vec![1u8; 8]),
400                    excluded_ancestors: vec![]
401                }
402            );
403        }
404    }
405}