consensus_core/
subscriber.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{sync::Arc, time::Duration};
6
7use consensus_config::AuthorityIndex;
8use futures::StreamExt;
9use iota_metrics::spawn_monitored_task;
10use parking_lot::{Mutex, RwLock};
11use tokio::{
12    task::JoinHandle,
13    time::{sleep, timeout},
14};
15use tracing::{debug, error, info};
16
17use crate::{
18    Round,
19    block::BlockAPI as _,
20    context::Context,
21    dag_state::DagState,
22    error::ConsensusError,
23    network::{NetworkClient, NetworkService},
24};
25
26/// Subscriber manages the block stream subscriptions to other peers, taking
27/// care of retrying when subscription streams break. Blocks returned from the
28/// peer are sent to the authority service for processing.
29/// Currently subscription management for individual peer is not exposed, but it
30/// could become useful in future.
31pub(crate) struct Subscriber<C: NetworkClient, S: NetworkService> {
32    context: Arc<Context>,
33    network_client: Arc<C>,
34    authority_service: Arc<S>,
35    dag_state: Arc<RwLock<DagState>>,
36    subscriptions: Arc<Mutex<Box<[Option<JoinHandle<()>>]>>>,
37}
38
39impl<C: NetworkClient, S: NetworkService> Subscriber<C, S> {
40    pub(crate) fn new(
41        context: Arc<Context>,
42        network_client: Arc<C>,
43        authority_service: Arc<S>,
44        dag_state: Arc<RwLock<DagState>>,
45    ) -> Self {
46        let subscriptions = (0..context.committee.size())
47            .map(|_| None)
48            .collect::<Vec<_>>();
49        Self {
50            context,
51            network_client,
52            authority_service,
53            dag_state,
54            subscriptions: Arc::new(Mutex::new(subscriptions.into_boxed_slice())),
55        }
56    }
57
58    pub(crate) fn subscribe(&self, peer: AuthorityIndex) {
59        if peer == self.context.own_index {
60            error!("Attempt to subscribe to own validator {peer} is ignored!");
61            return;
62        }
63        let context = self.context.clone();
64        let network_client = self.network_client.clone();
65        let authority_service = self.authority_service.clone();
66        let (mut last_received, gc_round, gc_enabled) = {
67            let dag_state = self.dag_state.read();
68            (
69                dag_state.get_last_block_for_authority(peer).round(),
70                dag_state.gc_round(),
71                dag_state.gc_enabled(),
72            )
73        };
74
75        // If the latest block we have accepted by an authority is older than the
76        // current gc round, then do not attempt to fetch any blocks from that
77        // point as they will simply be skipped. Instead do attempt to fetch
78        // from the gc round.
79        if gc_enabled && last_received < gc_round {
80            info!(
81                "Last received block for peer {peer} is older than GC round, {last_received} < {gc_round}, fetching from GC round"
82            );
83            last_received = gc_round;
84        }
85
86        let mut subscriptions = self.subscriptions.lock();
87        self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
88        subscriptions[peer.value()] = Some(spawn_monitored_task!(Self::subscription_loop(
89            context,
90            network_client,
91            authority_service,
92            peer,
93            last_received,
94        )));
95    }
96
97    pub(crate) fn stop(&self) {
98        let mut subscriptions = self.subscriptions.lock();
99        for (peer, _) in self.context.committee.authorities() {
100            self.unsubscribe_locked(peer, &mut subscriptions[peer.value()]);
101        }
102    }
103
104    fn unsubscribe_locked(&self, peer: AuthorityIndex, subscription: &mut Option<JoinHandle<()>>) {
105        let peer_hostname = &self.context.committee.authority(peer).hostname;
106        if let Some(subscription) = subscription.take() {
107            subscription.abort();
108        }
109        // There is a race between shutting down the subscription task and clearing the
110        // metric here. TODO: fix the race when unsubscribe_locked() gets called
111        // outside of stop().
112        self.context
113            .metrics
114            .node_metrics
115            .subscribed_to
116            .with_label_values(&[peer_hostname])
117            .set(0);
118    }
119
120    async fn subscription_loop(
121        context: Arc<Context>,
122        network_client: Arc<C>,
123        authority_service: Arc<S>,
124        peer: AuthorityIndex,
125        last_received: Round,
126    ) {
127        const IMMEDIATE_RETRIES: i64 = 3;
128        // When not immediately retrying, limit retry delay between 100ms and 10s.
129        const INITIAL_RETRY_INTERVAL: Duration = Duration::from_millis(100);
130        const MAX_RETRY_INTERVAL: Duration = Duration::from_secs(10);
131        const RETRY_INTERVAL_MULTIPLIER: f32 = 1.2;
132        let peer_hostname = &context.committee.authority(peer).hostname;
133        let mut retries: i64 = 0;
134        let mut delay = INITIAL_RETRY_INTERVAL;
135        'subscription: loop {
136            context
137                .metrics
138                .node_metrics
139                .subscribed_to
140                .with_label_values(&[peer_hostname])
141                .set(0);
142
143            if retries > IMMEDIATE_RETRIES {
144                debug!(
145                    "Delaying retry {} of peer {} subscription, in {} seconds",
146                    retries,
147                    peer_hostname,
148                    delay.as_secs_f32(),
149                );
150                sleep(delay).await;
151                // Update delay for the next retry.
152                delay = delay
153                    .mul_f32(RETRY_INTERVAL_MULTIPLIER)
154                    .min(MAX_RETRY_INTERVAL);
155            } else if retries > 0 {
156                // Retry immediately, but still yield to avoid monopolizing the thread.
157                tokio::task::yield_now().await;
158            } else {
159                // First attempt, reset delay for next retries but no waiting.
160                delay = INITIAL_RETRY_INTERVAL;
161            }
162            retries += 1;
163
164            // Wrap subscribe_blocks in a timeout and increment metric on timeout
165            let subscribe_future =
166                network_client.subscribe_blocks(peer, last_received, MAX_RETRY_INTERVAL);
167            let subscribe_result = timeout(MAX_RETRY_INTERVAL * 5, subscribe_future).await;
168            let mut blocks = match subscribe_result {
169                Ok(inner_result) => match inner_result {
170                    Ok(blocks) => {
171                        debug!(
172                            "Subscribed to peer {} {} after {} attempts",
173                            peer, peer_hostname, retries
174                        );
175                        context
176                            .metrics
177                            .node_metrics
178                            .subscriber_connection_attempts
179                            .with_label_values(&[peer_hostname.as_str(), "success"])
180                            .inc();
181                        blocks
182                    }
183                    Err(e) => {
184                        debug!(
185                            "Failed to subscribe to blocks from peer {} {}: {}",
186                            peer, peer_hostname, e
187                        );
188                        context
189                            .metrics
190                            .node_metrics
191                            .subscriber_connection_attempts
192                            .with_label_values(&[peer_hostname.as_str(), "failure"])
193                            .inc();
194                        continue 'subscription;
195                    }
196                },
197                Err(_) => {
198                    debug!(
199                        "Timeout subscribing to blocks from peer {} {}",
200                        peer, peer_hostname
201                    );
202                    context
203                        .metrics
204                        .node_metrics
205                        .subscriber_connection_attempts
206                        .with_label_values(&[peer_hostname.as_str(), "timeout"])
207                        .inc();
208                    continue 'subscription;
209                }
210            };
211
212            // Now can consider the subscription successful
213            context
214                .metrics
215                .node_metrics
216                .subscribed_to
217                .with_label_values(&[peer_hostname])
218                .set(1);
219
220            'stream: loop {
221                match blocks.next().await {
222                    Some(block) => {
223                        context
224                            .metrics
225                            .node_metrics
226                            .subscribed_blocks
227                            .with_label_values(&[peer_hostname])
228                            .inc();
229                        let result = authority_service
230                            .handle_send_block(peer, block.clone())
231                            .await;
232                        if let Err(e) = result {
233                            match e {
234                                ConsensusError::BlockRejected { block_ref, reason } => {
235                                    debug!(
236                                        "Failed to process block from peer {} {} for block {:?}: {}",
237                                        peer, peer_hostname, block_ref, reason
238                                    );
239                                }
240                                _ => {
241                                    info!(
242                                        "Invalid block received from peer {} {}: {}",
243                                        peer, peer_hostname, e
244                                    );
245                                }
246                            }
247                        }
248                        // Reset retries when a block is received.
249                        retries = 0;
250                    }
251                    None => {
252                        debug!(
253                            "Subscription to blocks from peer {} {} ended",
254                            peer, peer_hostname
255                        );
256                        retries += 1;
257                        break 'stream;
258                    }
259                }
260            }
261        }
262    }
263}
264
265#[cfg(test)]
266mod test {
267    use async_trait::async_trait;
268    use bytes::Bytes;
269    use futures::stream;
270
271    use super::*;
272    use crate::{
273        VerifiedBlock,
274        block::BlockRef,
275        commit::CommitRange,
276        error::ConsensusResult,
277        network::{BlockStream, ExtendedSerializedBlock, test_network::TestService},
278        storage::mem_store::MemStore,
279    };
280
281    struct SubscriberTestClient {}
282
283    impl SubscriberTestClient {
284        fn new() -> Self {
285            Self {}
286        }
287    }
288
289    #[async_trait]
290    impl NetworkClient for SubscriberTestClient {
291        const SUPPORT_STREAMING: bool = true;
292
293        async fn send_block(
294            &self,
295            _peer: AuthorityIndex,
296            _block: &VerifiedBlock,
297            _timeout: Duration,
298        ) -> ConsensusResult<()> {
299            unimplemented!("Unimplemented")
300        }
301
302        async fn subscribe_blocks(
303            &self,
304            _peer: AuthorityIndex,
305            _last_received: Round,
306            _timeout: Duration,
307        ) -> ConsensusResult<BlockStream> {
308            let block_stream = stream::unfold((), |_| async {
309                sleep(Duration::from_millis(1)).await;
310                let block = ExtendedSerializedBlock {
311                    block: Bytes::from(vec![1u8; 8]),
312                    excluded_ancestors: vec![],
313                };
314                Some((block, ()))
315            })
316            .take(10);
317            Ok(Box::pin(block_stream))
318        }
319
320        async fn fetch_blocks(
321            &self,
322            _peer: AuthorityIndex,
323            _block_refs: Vec<BlockRef>,
324            _highest_accepted_rounds: Vec<Round>,
325            _timeout: Duration,
326        ) -> ConsensusResult<Vec<Bytes>> {
327            unimplemented!("Unimplemented")
328        }
329
330        async fn fetch_commits(
331            &self,
332            _peer: AuthorityIndex,
333            _commit_range: CommitRange,
334            _timeout: Duration,
335        ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)> {
336            unimplemented!("Unimplemented")
337        }
338
339        async fn fetch_latest_blocks(
340            &self,
341            _peer: AuthorityIndex,
342            _authorities: Vec<AuthorityIndex>,
343            _timeout: Duration,
344        ) -> ConsensusResult<Vec<Bytes>> {
345            unimplemented!("Unimplemented")
346        }
347
348        async fn get_latest_rounds(
349            &self,
350            _peer: AuthorityIndex,
351            _timeout: Duration,
352        ) -> ConsensusResult<(Vec<Round>, Vec<Round>)> {
353            unimplemented!("Unimplemented")
354        }
355    }
356
357    #[tokio::test(flavor = "current_thread", start_paused = true)]
358    async fn subscriber_retries() {
359        let (context, _keys) = Context::new_for_test(4);
360        let context = Arc::new(context);
361        let authority_service = Arc::new(Mutex::new(TestService::new()));
362        let network_client = Arc::new(SubscriberTestClient::new());
363        let store = Arc::new(MemStore::new());
364        let dag_state = Arc::new(RwLock::new(DagState::new(context.clone(), store)));
365        let subscriber = Subscriber::new(
366            context.clone(),
367            network_client,
368            authority_service.clone(),
369            dag_state,
370        );
371
372        let peer = context.committee.to_authority_index(2).unwrap();
373        subscriber.subscribe(peer);
374
375        // Wait for enough blocks received.
376        for _ in 0..10 {
377            tokio::time::sleep(Duration::from_secs(1)).await;
378            let service = authority_service.lock();
379            if service.handle_send_block.len() >= 100 {
380                break;
381            }
382        }
383
384        // Even if the stream ends after 10 blocks, the subscriber should retry and get
385        // enough blocks eventually.
386        let service = authority_service.lock();
387        assert!(service.handle_send_block.len() >= 100);
388        for (p, block) in service.handle_send_block.iter() {
389            assert_eq!(*p, peer);
390            assert_eq!(
391                *block,
392                ExtendedSerializedBlock {
393                    block: Bytes::from(vec![1u8; 8]),
394                    excluded_ancestors: vec![]
395                }
396            );
397        }
398    }
399}