consensus_core/
subscriber.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{task::JoinHandle, time::sleep};
12use tracing::{debug, error, info};
13
14use crate::{
15    Round,
16    block::BlockAPI as _,
17    context::Context,
18    dag_state::DagState,
19    error::ConsensusError,
20    network::{NetworkClient, NetworkService},
21};
22
23/// Subscriber manages the block stream subscriptions to other peers, taking
24/// care of retrying when subscription streams break. Blocks returned from the
25/// peer are sent to the authority service for processing.
26/// Currently subscription management for individual peer is not exposed, but it
27/// could become useful in future.
28pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
29    context: Arc<Context>,
30    network_client: Arc<C>,
31    authority_service: Arc<S>,
32    dag_state: Arc<RwLock<DagState>>,
33    subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
34}
35
36impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
37    pub(crate) fn new(
38        context: Arc<Context>,
39        network_client: Arc<C>,
40        authority_service: Arc<S>,
41        dag_state: Arc<RwLock<DagState>>,
42    ) -> Self {
43        let subscriptions = (0..context.committee.size())
44            .map(|_| None)
45            .collect::<Vec<_>>();
46        Self {
47            context,
48            network_client,
49            authority_service,
50            dag_state,
51            subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
52        }
53    }
54
55    pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
56        if peer == self.context.own_index {
57            error!("Attempt to subscribe to own validator {peer} is ignored!");
58            return;
59        }
60        let context = self.context.clone();
61        let network_client = self.network_client.clone();
62        let authority_service = self.authority_service.clone();
63        let (mut last_received, gc_round, gc_enabled) = {
64            let dag_state = self.dag_state.read();
65            (
66                dag_state.get_last_block_for_authority(peer).round(),
67                dag_state.gc_round(),
68                dag_state.gc_enabled(),
69            )
70        };
71
72        // If the latest block we have accepted by an authority is older than the
73        // current gc round, then do not attempt to fetch any blocks from that
74        // point as they will simply be skipped. Instead do attempt to fetch
75        // from the gc round.
76        if gc_enabled && last_received < gc_round {
77            info!(
78                "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
79            );
80            last_received = gc_round;
81        }
82
83        let mut subscriptions = self.subscriptions.lock();
84        self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
85        subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
86            context,
87            network_client,
88            authority_service,
89            peer,
90            last_received,
91        )));
92    }
93
94    pub(crate) fn stop(&self) {
95        let mut subscriptions = self.subscriptions.lock();
96        for (peer, _) in self.context.committee.authorities() {
97            self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
98        }
99    }
100
101    fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
102        let peer_hostname = &self.context.committee.authority(peer).hostname;
103        if let Some(subscription) = subscription.take() {
104            subscription.abort();
105        }
106        // There is a race between shutting down the subscription task and clearing the
107        // metric here. TODO: fix the race when unsubscribe_locked() gets called
108        // outside of stop().
109        self.context
110            .metrics
111            .node_metrics
112            .subscribed_to
113            .with_label_values(&[peer_hostname])
114            .set(0);
115    }
116
117    async fn subscription_loop(
118        context: Arc<Context>,
119        network_client: Arc<C>,
120        authority_service: Arc<S>,
121        peer: AuthorityIndex,
122        last_received: Round,
123    ) {
124        const IMMEDIATE_RETRIES: i64 = 3;
125        // When not immediately retrying, limit retry delay between 100ms and 10s.
126        const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
127        const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
128        const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
129        let peer_hostname = &context.committee.authority(peer).hostname;
130        let mut retries: i64 = 0;
131        let mut delay = INITIAL_RETRY_INTERVAL;
132        'subscription: loop {
133            context
134                .metrics
135                .node_metrics
136                .subscribed_to
137                .with_label_values(&[peer_hostname])
138                .set(0);
139
140            if retries > IMMEDIATE_RETRIES {
141                debug!(
142                    "Delaying retry {} of peer {} subscription, in {} seconds",
143                    retries,
144                    peer_hostname,
145                    delay.as_secs_f32(),
146                );
147                sleep(delay).await;
148                // Update delay for the next retry.
149                delay = delay
150                    .mul_f32(RETRY_INTERVAL_MULTIPLIER)
151                    .min(MAX_RETRY_INTERVAL);
152            } else if retries > 0 {
153                // Retry immediately, but still yield to avoid monopolizing the thread.
154                tokio::task::yield_now().await;
155            } else {
156                // First attempt, reset delay for next retries but no waiting.
157                delay = INITIAL_RETRY_INTERVAL;
158            }
159            retries += 1;
160
161            let mut blocks = match network_client
162                .subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL)
163                .await
164            {
165                Ok(blocks) => {
166                    debug!(
167                        "Subscribed to peer {} {} after {} attempts",
168                        peer, peer_hostname, retries
169                    );
170                    context
171                        .metrics
172                        .node_metrics
173                        .subscriber_connection_attempts
174                        .with_label_values(&[peer_hostname.as_str(), "success"])
175                        .inc();
176                    blocks
177                }
178                Err(e) => {
179                    debug!(
180                        "Failed to subscribe to blocks from peer {} {}: {}",
181                        peer, peer_hostname, e
182                    );
183                    context
184                        .metrics
185                        .node_metrics
186                        .subscriber_connection_attempts
187                        .with_label_values(&[peer_hostname.as_str(), "failure"])
188                        .inc();
189                    continue 'subscription;
190                }
191            };
192
193            // Now can consider the subscription successful
194            context
195                .metrics
196                .node_metrics
197                .subscribed_to
198                .with_label_values(&[peer_hostname])
199                .set(1);
200
201            'stream: loop {
202                match blocks.next().await {
203                    Some(block) => {
204                        context
205                            .metrics
206                            .node_metrics
207                            .subscribed_blocks
208                            .with_label_values(&[peer_hostname])
209                            .inc();
210                        let result = authority_service
211                            .handle_send_block(peer, block.clone())
212                            .await;
213                        if let Err(e) = result {
214                            match e {
215                                ConsensusError::BlockRejected { block_ref, reason } => {
216                                    debug!(
217                                        "Failed to process block from peer {} {} for block {:?}: {}",
218                                        peer, peer_hostname, block_ref, reason
219                                    );
220                                }
221                                _ => {
222                                    info!(
223                                        "Invalid block received from peer {} {}: {}",
224                                        peer, peer_hostname, e
225                                    );
226                                }
227                            }
228                        }
229                        // Reset retries when a block is received.
230                        retries = 0;
231                    }
232                    None => {
233                        debug!(
234                            "Subscription to blocks from peer {} {} ended",
235                            peer, peer_hostname
236                        );
237                        retries += 1;
238                        break 'stream;
239                    }
240                }
241            }
242        }
243    }
244}
245
246#[cfg(test)]
247mod test {
248    use async_trait::async_trait;
249    use bytes::Bytes;
250    use futures::stream;
251
252    use super::*;
253    use crate::{
254        VerifiedBlock,
255        block::BlockRef,
256        commit::CommitRange,
257        error::ConsensusResult,
258        network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
259        storage::mem_store::MemStore,
260    };
261
262    struct SubscriberTestClient {}
263
264    impl SubscriberTestClient {
265        fn new() -> Self {
266            Self {}
267        }
268    }
269
270    #[async_trait]
271    impl NetworkClient for SubscriberTestClient {
272        const SUPPORT_STREAMING: bool = true;
273
274        async fn send_block(
275            &self,
276            _peer: AuthorityIndex,
277            _block: &VerifiedBlock,
278            _timeout: Duration,
279        ) -> ConsensusResult<()> {
280            unimplemented!("Unimplemented")
281        }
282
283        async fn subscribe_blocks(
284            &self,
285            _peer: AuthorityIndex,
286            _last_received: Round,
287            _timeout: Duration,
288        ) -> ConsensusResult<BlockStream> {
289            let block_stream = stream::unfold((), |_| async {
290                sleep(Duration::from_millis(1)).await;
291                let block = ExtendedSerializedBlock {
292                    block: Bytes::from(vec![1u8; 8]),
293                    excluded_ancestors: vec![],
294                };
295                Some((block, ()))
296            })
297            .take(10);
298            Ok(Box::pin(block_stream))
299        }
300
301        async fn fetch_blocks(
302            &self,
303            _peer: AuthorityIndex,
304            _block_refs: Vec<BlockRef>,
305            _highest_accepted_rounds: Vec<Round>,
306            _timeout: Duration,
307        ) -> ConsensusResult<Vec<Bytes>> {
308            unimplemented!("Unimplemented")
309        }
310
311        async fn fetch_commits(
312            &self,
313            _peer: AuthorityIndex,
314            _commit_range: CommitRange,
315            _timeout: Duration,
316        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
317            unimplemented!("Unimplemented")
318        }
319
320        async fn fetch_latest_blocks(
321            &self,
322            _peer: AuthorityIndex,
323            _authorities: Vec<AuthorityIndex>,
324            _timeout: Duration,
325        ) -> ConsensusResult<Vec<Bytes>> {
326            unimplemented!("Unimplemented")
327        }
328
329        async fn get_latest_rounds(
330            &self,
331            _peer: AuthorityIndex,
332            _timeout: Duration,
333        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
334            unimplemented!("Unimplemented")
335        }
336    }
337
338    #[tokio::test(flavor = "current_thread", start_paused = true)]
339    async fn subscriber_retries() {
340        let (context, _keys) = Context::new_for_test(4);
341        let context = Arc::new(context);
342        let authority_service = Arc::new(Mutex::new(TestService::new()));
343        let network_client = Arc::new(SubscriberTestClient::new());
344        let store = Arc::new(MemStore::new());
345        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
346        let subscriber = Subscriber::new(
347            context.clone(),
348            network_client,
349            authority_service.clone(),
350            dag_state,
351        );
352
353        let peer = context.committee.to_authority_index(2).unwrap();
354        subscriber.subscribe(peer);
355
356        // Wait for enough blocks received.
357        for _ in 0..10 {
358            tokio::time::sleep(Duration::from_secs(1)).await;
359            let service = authority_service.lock();
360            if service.handle_send_block.len() >= 100 {
361                break;
362            }
363        }
364
365        // Even if the stream ends after 10 blocks, the subscriber should retry and get
366        // enough blocks eventually.
367        let service = authority_service.lock();
368        assert!(service.handle_send_block.len() >= 100);
369        for (p, block) in service.handle_send_block.iter() {
370            assert_eq!(*p, peer);
371            assert_eq!(
372                *block,
373                ExtendedSerializedBlock {
374                    block: Bytes::from(vec![1u8; 8]),
375                    excluded_ancestors: vec![]
376                }
377            );
378        }
379    }
380}