consensus_core/network/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! This module defines the network interface, and provides network
6//! implementations for the consensus protocol.
7//!
8//! Having an abstract network interface allows
9//! - simplying the semantics of sending data and serving requests over the
10//!   network
11//! - hiding implementation specific types and semantics from the consensus
12//!   protocol
13//! - allowing easy swapping of network implementations, for better performance
14//!   or testing
15//!
16//! When modifying the client and server interfaces, the principle is to keep
17//! the interfaces low level, close to underlying implementations in semantics.
18//! For example, the client interface exposes sending messages to a specific
19//! peer, instead of broadcasting to all peers. Subscribing to a stream of
20//! blocks gets back the stream via response, instead of delivering the stream
21//! directly to the server. This keeps the logic agnostics to the underlying
22//! network outside of this module, so they can be reused easily across network
23//! implementations.
24
25use std::{pin::Pin, sync::Arc, time::Duration};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use consensus_config::{AuthorityIndex, NetworkKeyPair};
30use futures::Stream;
31
32use crate::{
33    Round,
34    block::{BlockRef, ExtendedBlock, VerifiedBlock},
35    commit::{CommitRange, TrustedCommit},
36    context::Context,
37    error::ConsensusResult,
38};
39
40// Tonic generated RPC stubs.
41mod tonic_gen {
42    include!(concat!(env!("OUT_DIR"), "/consensus.ConsensusService.rs"));
43}
44
45pub(crate) mod metrics;
46mod metrics_layer;
47#[cfg(all(test, not(msim)))]
48mod network_tests;
49#[cfg(test)]
50pub(crate) mod test_network;
51#[cfg(not(msim))]
52pub(crate) mod tonic_network;
53#[cfg(msim)]
54pub mod tonic_network;
55mod tonic_tls;
56
57/// A stream of serialized filtered blocks returned over the network.
58pub(crate) type BlockStream = Pin<Box<dyn Stream<Item = ExtendedSerializedBlock> + Send>>;
59
60/// Network client for communicating with peers.
61///
62/// NOTE: the timeout parameters help saving resources at client and potentially
63/// server. But it is up to the server implementation if the timeout is honored.
64/// - To bound server resources, server should implement own timeout for
65///   incoming requests.
66#[async_trait]
67pub(crate) trait NetworkClient: Send + Sync + Sized + 'static {
68    // Whether the network client streams blocks to subscribed peers.
69    const SUPPORT_STREAMING: bool;
70
71    /// Sends a serialized SignedBlock to a peer.
72    async fn send_block(
73        &self,
74        peer: AuthorityIndex,
75        block: &VerifiedBlock,
76        timeout: Duration,
77    ) -> ConsensusResult<()>;
78
79    /// Subscribes to blocks from a peer after last_received round.
80    async fn subscribe_blocks(
81        &self,
82        peer: AuthorityIndex,
83        last_received: Round,
84        timeout: Duration,
85    ) -> ConsensusResult<BlockStream>;
86
87    // TODO: add a parameter for maximum total size of blocks returned.
88    /// Fetches serialized `SignedBlock`s from a peer. It also might return
89    /// additional ancestor blocks of the requested blocks according to the
90    /// provided `highest_accepted_rounds`. The `highest_accepted_rounds`
91    /// length should be equal to the committee size. If
92    /// `highest_accepted_rounds` is empty then it will be simply ignored.
93    async fn fetch_blocks(
94        &self,
95        peer: AuthorityIndex,
96        block_refs: Vec<BlockRef>,
97        highest_accepted_rounds: Vec<Round>,
98        timeout: Duration,
99    ) -> ConsensusResult<Vec<Bytes>>;
100
101    /// Fetches serialized commits in the commit range from a peer.
102    /// Returns a tuple of both the serialized commits, and serialized blocks
103    /// that contain votes certifying the last commit.
104    async fn fetch_commits(
105        &self,
106        peer: AuthorityIndex,
107        commit_range: CommitRange,
108        timeout: Duration,
109    ) -> ConsensusResult<(Vec<Bytes>, Vec<Bytes>)>;
110
111    /// Fetches the latest block from `peer` for the requested `authorities`.
112    /// The latest blocks are returned in the serialised format of
113    /// `SignedBlocks`. The method can return multiple blocks per peer as
114    /// its possible to have equivocations.
115    async fn fetch_latest_blocks(
116        &self,
117        peer: AuthorityIndex,
118        authorities: Vec<AuthorityIndex>,
119        timeout: Duration,
120    ) -> ConsensusResult<Vec<Bytes>>;
121
122    /// Gets the latest received & accepted rounds of all authorities from the
123    /// peer.
124    async fn get_latest_rounds(
125        &self,
126        peer: AuthorityIndex,
127        timeout: Duration,
128    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
129}
130
131/// Network service for handling requests from peers.
132#[async_trait]
133pub(crate) trait NetworkService: Send + Sync + 'static {
134    /// Handles the block sent from the peer via either unicast RPC or
135    /// subscription stream. Peer value can be trusted to be a valid
136    /// authority index. But serialized_block must be verified before its
137    /// contents are trusted.
138    /// Excluded ancestors are also included as part of an effort to further
139    /// propagate blocks to peers despite the current exclusion.
140    async fn handle_send_block(
141        &self,
142        peer: AuthorityIndex,
143        block: ExtendedSerializedBlock,
144    ) -> ConsensusResult<()>;
145
146    /// Handles the subscription request from the peer.
147    /// A stream of newly proposed blocks is returned to the peer.
148    /// The stream continues until the end of epoch, peer unsubscribes, or a
149    /// network error / crash occurs.
150    async fn handle_subscribe_blocks(
151        &self,
152        peer: AuthorityIndex,
153        last_received: Round,
154    ) -> ConsensusResult<BlockStream>;
155
156    /// Handles the request to fetch blocks by references from the peer.
157    async fn handle_fetch_blocks(
158        &self,
159        peer: AuthorityIndex,
160        block_refs: Vec<BlockRef>,
161        highest_accepted_rounds: Vec<Round>,
162    ) -> ConsensusResult<Vec<Bytes>>;
163
164    /// Handles the request to fetch commits by index range from the peer.
165    async fn handle_fetch_commits(
166        &self,
167        peer: AuthorityIndex,
168        commit_range: CommitRange,
169    ) -> ConsensusResult<(Vec<TrustedCommit>, Vec<VerifiedBlock>)>;
170
171    /// Handles the request to fetch the latest block for the provided
172    /// `authorities`.
173    async fn handle_fetch_latest_blocks(
174        &self,
175        peer: AuthorityIndex,
176        authorities: Vec<AuthorityIndex>,
177    ) -> ConsensusResult<Vec<Bytes>>;
178
179    /// Handles the request to get the latest received & accepted rounds of all
180    /// authorities.
181    async fn handle_get_latest_rounds(
182        &self,
183        peer: AuthorityIndex,
184    ) -> ConsensusResult<(Vec<Round>, Vec<Round>)>;
185}
186
187/// An `AuthorityNode` holds a `NetworkManager` until shutdown.
188/// Dropping `NetworkManager` will shutdown the network service.
189pub(crate) trait NetworkManager<S>: Send + Sync
190where
191    S: NetworkService,
192{
193    type Client: NetworkClient;
194
195    /// Creates a new network manager.
196    fn new(context: Arc<Context>, network_keypair: NetworkKeyPair) -> Self;
197
198    /// Returns the network client.
199    fn client(&self) -> Arc<Self::Client>;
200
201    /// Installs network service.
202    async fn install_service(&mut self, service: Arc<S>);
203
204    /// Stops the network service.
205    async fn stop(&mut self);
206}
207
208/// Serialized block with extended information from the proposing authority.
209#[derive(Clone, PartialEq, Eq, Debug)]
210pub(crate) struct ExtendedSerializedBlock {
211    pub(crate) block: Bytes,
212    // Serialized BlockRefs that are excluded from the blocks ancestors.
213    pub(crate) excluded_ancestors: Vec<Vec<u8>>,
214}
215
216impl From<ExtendedBlock> for ExtendedSerializedBlock {
217    fn from(extended_block: ExtendedBlock) -> Self {
218        Self {
219            block: extended_block.block.serialized().clone(),
220            excluded_ancestors: extended_block
221                .excluded_ancestors
222                .iter()
223                .filter_map(|r| match bcs::to_bytes(r) {
224                    Ok(serialized) => Some(serialized),
225                    Err(e) => {
226                        tracing::debug!("Failed to serialize block ref {:?}: {e:?}", r);
227                        None
228                    }
229                })
230                .collect(),
231        }
232    }
233}