iota_network/state_sync/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::HashMap,
7    sync::{Arc, RwLock},
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{inflight_limit, rate_limit};
12use iota_archival::reader::ArchiveReaderBalancer;
13use iota_config::p2p::StateSyncConfig;
14use iota_types::{messages_checkpoint::VerifiedCheckpoint, storage::WriteStore};
15use tap::Pipe;
16use tokio::{
17    sync::{broadcast, mpsc},
18    task::JoinSet,
19};
20
21use super::{
22    Handle, PeerHeights, StateSync, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
23    metrics::Metrics,
24    server::{CheckpointContentsDownloadLimitLayer, Server},
25};
26
27pub struct Builder<S> {
28    store: Option<S>,
29    config: Option<StateSyncConfig>,
30    metrics: Option<Metrics>,
31    archive_readers: Option<ArchiveReaderBalancer>,
32}
33
34impl Builder<()> {
35    #[expect(clippy::new_without_default)]
36    pub fn new() -> Self {
37        Self {
38            store: None,
39            config: None,
40            metrics: None,
41            archive_readers: None,
42        }
43    }
44}
45
46impl<S> Builder<S> {
47    pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
48        Builder {
49            store: Some(store),
50            config: self.config,
51            metrics: self.metrics,
52            archive_readers: self.archive_readers,
53        }
54    }
55
56    pub fn config(mut self, config: StateSyncConfig) -> Self {
57        self.config = Some(config);
58        self
59    }
60
61    pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
62        self.metrics = Some(Metrics::enabled(registry));
63        self
64    }
65
66    pub fn archive_readers(mut self, archive_readers: ArchiveReaderBalancer) -> Self {
67        self.archive_readers = Some(archive_readers);
68        self
69    }
70}
71
72impl<S> Builder<S>
73where
74    S: WriteStore + Clone + Send + Sync + 'static,
75{
76    pub fn build(self) -> (UnstartedStateSync<S>, StateSyncServer<impl StateSync>) {
77        let state_sync_config = self.config.clone().unwrap_or_default();
78        let (mut builder, server) = self.build_internal();
79        let mut state_sync_server = StateSyncServer::new(server);
80
81        // Apply rate limits from configuration as needed.
82        if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
83            state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
84                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
85                    governor::Quota::per_second(limit),
86                    rate_limit::WaitMode::Block,
87                )),
88            );
89        }
90        if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
91            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
92                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
93                    governor::Quota::per_second(limit),
94                    rate_limit::WaitMode::Block,
95                )),
96            );
97        }
98        if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
99            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
100                InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
101                    governor::Quota::per_second(limit),
102                    rate_limit::WaitMode::Block,
103                )),
104            );
105        }
106        if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
107            state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
108                InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
109                    limit,
110                    inflight_limit::WaitMode::ReturnError,
111                )),
112            );
113        }
114        if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
115            let layer = CheckpointContentsDownloadLimitLayer::new(limit);
116            builder.download_limit_layer = Some(layer.clone());
117            state_sync_server = state_sync_server
118                .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
119        }
120
121        (builder, state_sync_server)
122    }
123
124    pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
125        let Builder {
126            store,
127            config,
128            metrics,
129            archive_readers,
130        } = self;
131        let store = store.unwrap();
132        let config = config.unwrap_or_default();
133        let metrics = metrics.unwrap_or_else(Metrics::disabled);
134        let archive_readers = archive_readers.unwrap_or_default();
135
136        let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
137        let (checkpoint_event_sender, _receiver) =
138            broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
139        let weak_sender = sender.downgrade();
140        let handle = Handle {
141            sender,
142            checkpoint_event_sender: checkpoint_event_sender.clone(),
143        };
144        let peer_heights = PeerHeights {
145            peers: HashMap::new(),
146            unprocessed_checkpoints: HashMap::new(),
147            sequence_number_to_digest: HashMap::new(),
148            wait_interval_when_no_peer_to_sync_content: config
149                .wait_interval_when_no_peer_to_sync_content(),
150        }
151        .pipe(RwLock::new)
152        .pipe(Arc::new);
153
154        let server = Server {
155            store: store.clone(),
156            peer_heights: peer_heights.clone(),
157            sender: weak_sender,
158        };
159
160        (
161            UnstartedStateSync {
162                config,
163                handle,
164                mailbox,
165                store,
166                download_limit_layer: None,
167                peer_heights,
168                checkpoint_event_sender,
169                metrics,
170                archive_readers,
171            },
172            server,
173        )
174    }
175}
176
177pub struct UnstartedStateSync<S> {
178    pub(super) config: StateSyncConfig,
179    pub(super) handle: Handle,
180    pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
181    pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
182    pub(super) store: S,
183    pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
184    pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
185    pub(super) metrics: Metrics,
186    pub(super) archive_readers: ArchiveReaderBalancer,
187}
188
189impl<S> UnstartedStateSync<S>
190where
191    S: WriteStore + Clone + Send + Sync + 'static,
192{
193    pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
194        let Self {
195            config,
196            handle,
197            mailbox,
198            download_limit_layer,
199            store,
200            peer_heights,
201            checkpoint_event_sender,
202            metrics,
203            archive_readers,
204        } = self;
205
206        (
207            StateSyncEventLoop {
208                config,
209                mailbox,
210                weak_sender: handle.sender.downgrade(),
211                tasks: JoinSet::new(),
212                sync_checkpoint_summaries_task: None,
213                sync_checkpoint_contents_task: None,
214                download_limit_layer,
215                store,
216                peer_heights,
217                checkpoint_event_sender,
218                network,
219                metrics,
220                archive_readers,
221                sync_checkpoint_from_archive_task: None,
222            },
223            handle,
224        )
225    }
226
227    pub fn start(self, network: anemo::Network) -> Handle {
228        let (event_loop, handle) = self.build(network);
229        tokio::spawn(event_loop.start());
230
231        handle
232    }
233}