consensus_core/
subscriber.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info};
16
17use crate::{
18    Round,
19    block::BlockAPI as _,
20    context::Context,
21    dag_state::DagState,
22    error::ConsensusError,
23    network::{NetworkClient, NetworkService},
24    scoring_metrics_store::ErrorSource,
25};
26
27/// Subscriber manages the block stream subscriptions to other peers, taking
28/// care of retrying when subscription streams break. Blocks returned from the
29/// peer are sent to the authority service for processing.
30/// Currently subscription management for individual peer is not exposed, but it
31/// could become useful in future.
32pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
33    context: Arc<Context>,
34    network_client: Arc<C>,
35    authority_service: Arc<S>,
36    dag_state: Arc<RwLock<DagState>>,
37    subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
38}
39
40impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
41    pub(crate) fn new(
42        context: Arc<Context>,
43        network_client: Arc<C>,
44        authority_service: Arc<S>,
45        dag_state: Arc<RwLock<DagState>>,
46    ) -> Self {
47        let subscriptions = (0..context.committee.size())
48            .map(|_| None)
49            .collect::<Vec<_>>();
50        Self {
51            context,
52            network_client,
53            authority_service,
54            dag_state,
55            subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
56        }
57    }
58
59    pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
60        if peer == self.context.own_index {
61            error!("Attempt to subscribe to own validator {peer} is ignored!");
62            return;
63        }
64        let context = self.context.clone();
65        let network_client = self.network_client.clone();
66        let authority_service = self.authority_service.clone();
67        let (mut last_received, gc_round, gc_enabled) = {
68            let dag_state = self.dag_state.read();
69            (
70                dag_state.get_last_block_for_authority(peer).round(),
71                dag_state.gc_round(),
72                dag_state.gc_enabled(),
73            )
74        };
75
76        // If the latest block we have accepted by an authority is older than the
77        // current gc round, then do not attempt to fetch any blocks from that
78        // point as they will simply be skipped. Instead do attempt to fetch
79        // from the gc round.
80        if gc_enabled && last_received < gc_round {
81            info!(
82                "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
83            );
84            last_received = gc_round;
85        }
86
87        let mut subscriptions = self.subscriptions.lock();
88        self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
89        subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
90            context,
91            network_client,
92            authority_service,
93            peer,
94            last_received,
95        )));
96    }
97
98    pub(crate) fn stop(&self) {
99        let mut subscriptions = self.subscriptions.lock();
100        for (peer, _) in self.context.committee.authorities() {
101            self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
102        }
103    }
104
105    fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
106        let peer_hostname = &self.context.committee.authority(peer).hostname;
107        if let Some(subscription) = subscription.take() {
108            subscription.abort();
109        }
110        // There is a race between shutting down the subscription task and clearing the
111        // metric here. TODO: fix the race when unsubscribe_locked() gets called
112        // outside of stop().
113        self.context
114            .metrics
115            .node_metrics
116            .subscribed_to
117            .with_label_values(&[peer_hostname])
118            .set(0);
119    }
120
121    async fn subscription_loop(
122        context: Arc<Context>,
123        network_client: Arc<C>,
124        authority_service: Arc<S>,
125        peer: AuthorityIndex,
126        last_received: Round,
127    ) {
128        const IMMEDIATE_RETRIES: i64 = 3;
129        // When not immediately retrying, limit retry delay between 100ms and 10s.
130        const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
131        const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
132        const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
133        let peer_hostname = &context.committee.authority(peer).hostname;
134        let mut retries: i64 = 0;
135        let mut delay = INITIAL_RETRY_INTERVAL;
136        'subscription: loop {
137            context
138                .metrics
139                .node_metrics
140                .subscribed_to
141                .with_label_values(&[peer_hostname])
142                .set(0);
143
144            if retries > IMMEDIATE_RETRIES {
145                debug!(
146                    "Delaying retry {} of peer {} subscription, in {} seconds",
147                    retries,
148                    peer_hostname,
149                    delay.as_secs_f32(),
150                );
151                sleep(delay).await;
152                // Update delay for the next retry.
153                delay = delay
154                    .mul_f32(RETRY_INTERVAL_MULTIPLIER)
155                    .min(MAX_RETRY_INTERVAL);
156            } else if retries > 0 {
157                // Retry immediately, but still yield to avoid monopolizing the thread.
158                tokio::task::yield_now().await;
159            } else {
160                // First attempt, reset delay for next retries but no waiting.
161                delay = INITIAL_RETRY_INTERVAL;
162            }
163            retries += 1;
164
165            // Wrap subscribe_blocks in a timeout and increment metric on timeout
166            let subscribe_future =
167                network_client.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL);
168            let subscribe_result = timeout(MAX_RETRY_INTERVAL * 5, subscribe_future).await;
169            let mut blocks = match subscribe_result {
170                Ok(inner_result) => match inner_result {
171                    Ok(blocks) => {
172                        debug!(
173                            "Subscribed to peer {} {} after {} attempts",
174                            peer, peer_hostname, retries
175                        );
176                        context
177                            .metrics
178                            .node_metrics
179                            .subscriber_connection_attempts
180                            .with_label_values(&[peer_hostname.as_str(), "success"])
181                            .inc();
182                        blocks
183                    }
184                    Err(e) => {
185                        debug!(
186                            "Failed to subscribe to blocks from peer {} {}: {}",
187                            peer, peer_hostname, e
188                        );
189                        context
190                            .metrics
191                            .node_metrics
192                            .subscriber_connection_attempts
193                            .with_label_values(&[peer_hostname.as_str(), "failure"])
194                            .inc();
195                        continue 'subscription;
196                    }
197                },
198                Err(_) => {
199                    debug!(
200                        "Timeout subscribing to blocks from peer {} {}",
201                        peer, peer_hostname
202                    );
203                    context
204                        .metrics
205                        .node_metrics
206                        .subscriber_connection_attempts
207                        .with_label_values(&[peer_hostname.as_str(), "timeout"])
208                        .inc();
209                    continue 'subscription;
210                }
211            };
212
213            // Now can consider the subscription successful
214            context
215                .metrics
216                .node_metrics
217                .subscribed_to
218                .with_label_values(&[peer_hostname])
219                .set(1);
220
221            'stream: loop {
222                match blocks.next().await {
223                    Some(block) => {
224                        context
225                            .metrics
226                            .node_metrics
227                            .subscribed_blocks
228                            .with_label_values(&[peer_hostname])
229                            .inc();
230                        let result = authority_service
231                            .handle_send_block(peer, block.clone())
232                            .await;
233                        if let Err(e) = result {
234                            context
235                                .scoring_metrics_store
236                                .update_scoring_metrics_on_block_receival(
237                                    peer,
238                                    peer_hostname,
239                                    e.clone(),
240                                    ErrorSource::Subscriber,
241                                    &context.metrics.node_metrics,
242                                );
243                            match e {
244                                ConsensusError::BlockRejected { block_ref, reason } => {
245                                    debug!(
246                                        "Failed to process block from peer {} {} for block {:?}: {}",
247                                        peer, peer_hostname, block_ref, reason
248                                    );
249                                }
250                                _ => {
251                                    info!(
252                                        "Invalid block received from peer {} {}: {}",
253                                        peer, peer_hostname, e
254                                    );
255                                }
256                            }
257                        }
258                        // Reset retries when a block is received.
259                        retries = 0;
260                    }
261                    None => {
262                        debug!(
263                            "Subscription to blocks from peer {} {} ended",
264                            peer, peer_hostname
265                        );
266                        retries += 1;
267                        break 'stream;
268                    }
269                }
270            }
271        }
272    }
273}
274
275#[cfg(test)]
276mod test {
277    use async_trait::async_trait;
278    use bytes::Bytes;
279    use futures::stream;
280
281    use super::*;
282    use crate::{
283        VerifiedBlock,
284        block::BlockRef,
285        commit::CommitRange,
286        error::ConsensusResult,
287        network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
288        storage::mem_store::MemStore,
289    };
290
291    struct SubscriberTestClient {}
292
293    impl SubscriberTestClient {
294        fn new() -> Self {
295            Self {}
296        }
297    }
298
299    #[async_trait]
300    impl NetworkClient for SubscriberTestClient {
301        const SUPPORT_STREAMING: bool = true;
302
303        async fn send_block(
304            &self,
305            _peer: AuthorityIndex,
306            _block: &VerifiedBlock,
307            _timeout: Duration,
308        ) -> ConsensusResult<()> {
309            unimplemented!("Unimplemented")
310        }
311
312        async fn subscribe_blocks(
313            &self,
314            _peer: AuthorityIndex,
315            _last_received: Round,
316            _timeout: Duration,
317        ) -> ConsensusResult<BlockStream> {
318            let block_stream = stream::unfold((), |_| async {
319                sleep(Duration::from_millis(1)).await;
320                let block = ExtendedSerializedBlock {
321                    block: Bytes::from(vec![1u8; 8]),
322                    excluded_ancestors: vec![],
323                };
324                Some((block, ()))
325            })
326            .take(10);
327            Ok(Box::pin(block_stream))
328        }
329
330        async fn fetch_blocks(
331            &self,
332            _peer: AuthorityIndex,
333            _block_refs: Vec<BlockRef>,
334            _highest_accepted_rounds: Vec<Round>,
335            _timeout: Duration,
336        ) -> ConsensusResult<Vec<Bytes>> {
337            unimplemented!("Unimplemented")
338        }
339
340        async fn fetch_commits(
341            &self,
342            _peer: AuthorityIndex,
343            _commit_range: CommitRange,
344            _timeout: Duration,
345        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
346            unimplemented!("Unimplemented")
347        }
348
349        async fn fetch_latest_blocks(
350            &self,
351            _peer: AuthorityIndex,
352            _authorities: Vec<AuthorityIndex>,
353            _timeout: Duration,
354        ) -> ConsensusResult<Vec<Bytes>> {
355            unimplemented!("Unimplemented")
356        }
357
358        async fn get_latest_rounds(
359            &self,
360            _peer: AuthorityIndex,
361            _timeout: Duration,
362        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
363            unimplemented!("Unimplemented")
364        }
365    }
366
367    #[tokio::test(flavor = "current_thread", start_paused = true)]
368    async fn subscriber_retries() {
369        let (context, _keys) = Context::new_for_test(4);
370        let context = Arc::new(context);
371        let authority_service = Arc::new(Mutex::new(TestService::new()));
372        let network_client = Arc::new(SubscriberTestClient::new());
373        let store = Arc::new(MemStore::new());
374        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
375        let subscriber = Subscriber::new(
376            context.clone(),
377            network_client,
378            authority_service.clone(),
379            dag_state,
380        );
381
382        let peer = context.committee.to_authority_index(2).unwrap();
383        subscriber.subscribe(peer);
384
385        // Wait for enough blocks received.
386        for _ in 0..10 {
387            tokio::time::sleep(Duration::from_secs(1)).await;
388            let service = authority_service.lock();
389            if service.handle_send_block.len() >= 100 {
390                break;
391            }
392        }
393
394        // Even if the stream ends after 10 blocks, the subscriber should retry and get
395        // enough blocks eventually.
396        let service = authority_service.lock();
397        assert!(service.handle_send_block.len() >= 100);
398        for (p, block) in service.handle_send_block.iter() {
399            assert_eq!(*p, peer);
400            assert_eq!(
401                *block,
402                ExtendedSerializedBlock {
403                    block: Bytes::from(vec![1u8; 8]),
404                    excluded_ancestors: vec![]
405                }
406            );
407        }
408    }
409}