1use std::{
6 collections::HashMap,
7 sync::{Arc, RwLock},
8};
9
10use anemo::codegen::InboundRequestLayer;
11use anemo_tower::{inflight_limit, rate_limit};
12use iota_archival::reader::ArchiveReaderBalancer;
13use iota_config::p2p::StateSyncConfig;
14use iota_types::{messages_checkpoint::VerifiedCheckpoint, storage::WriteStore};
15use tap::Pipe;
16use tokio::{
17 sync::{broadcast, mpsc},
18 task::JoinSet,
19};
20
21use super::{
22 Handle, PeerHeights, StateSync, StateSyncEventLoop, StateSyncMessage, StateSyncServer,
23 metrics::Metrics,
24 server::{CheckpointContentsDownloadLimitLayer, Server},
25};
26
27pub struct Builder<S> {
28 store: Option<S>,
29 config: Option<StateSyncConfig>,
30 metrics: Option<Metrics>,
31 archive_readers: Option<ArchiveReaderBalancer>,
32}
33
34impl Builder<()> {
35 #[expect(clippy::new_without_default)]
36 pub fn new() -> Self {
37 Self {
38 store: None,
39 config: None,
40 metrics: None,
41 archive_readers: None,
42 }
43 }
44}
45
46impl<S> Builder<S> {
47 pub fn store<NewStore>(self, store: NewStore) -> Builder<NewStore> {
48 Builder {
49 store: Some(store),
50 config: self.config,
51 metrics: self.metrics,
52 archive_readers: self.archive_readers,
53 }
54 }
55
56 pub fn config(mut self, config: StateSyncConfig) -> Self {
57 self.config = Some(config);
58 self
59 }
60
61 pub fn with_metrics(mut self, registry: &prometheus::Registry) -> Self {
62 self.metrics = Some(Metrics::enabled(registry));
63 self
64 }
65
66 pub fn archive_readers(mut self, archive_readers: ArchiveReaderBalancer) -> Self {
67 self.archive_readers = Some(archive_readers);
68 self
69 }
70}
71
72impl<S> Builder<S>
73where
74 S: WriteStore + Clone + Send + Sync + 'static,
75{
76 pub fn build(self) -> (UnstartedStateSync<S>, StateSyncServer<impl StateSync>) {
77 let state_sync_config = self.config.clone().unwrap_or_default();
78 let (mut builder, server) = self.build_internal();
79 let mut state_sync_server = StateSyncServer::new(server);
80
81 if let Some(limit) = state_sync_config.push_checkpoint_summary_rate_limit {
83 state_sync_server = state_sync_server.add_layer_for_push_checkpoint_summary(
84 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
85 governor::Quota::per_second(limit),
86 rate_limit::WaitMode::Block,
87 )),
88 );
89 }
90 if let Some(limit) = state_sync_config.get_checkpoint_summary_rate_limit {
91 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_summary(
92 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
93 governor::Quota::per_second(limit),
94 rate_limit::WaitMode::Block,
95 )),
96 );
97 }
98 if let Some(limit) = state_sync_config.get_checkpoint_contents_rate_limit {
99 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
100 InboundRequestLayer::new(rate_limit::RateLimitLayer::new(
101 governor::Quota::per_second(limit),
102 rate_limit::WaitMode::Block,
103 )),
104 );
105 }
106 if let Some(limit) = state_sync_config.get_checkpoint_contents_inflight_limit {
107 state_sync_server = state_sync_server.add_layer_for_get_checkpoint_contents(
108 InboundRequestLayer::new(inflight_limit::InflightLimitLayer::new(
109 limit,
110 inflight_limit::WaitMode::ReturnError,
111 )),
112 );
113 }
114 if let Some(limit) = state_sync_config.get_checkpoint_contents_per_checkpoint_limit {
115 let layer = CheckpointContentsDownloadLimitLayer::new(limit);
116 builder.download_limit_layer = Some(layer.clone());
117 state_sync_server = state_sync_server
118 .add_layer_for_get_checkpoint_contents(InboundRequestLayer::new(layer));
119 }
120
121 (builder, state_sync_server)
122 }
123
124 pub(super) fn build_internal(self) -> (UnstartedStateSync<S>, Server<S>) {
125 let Builder {
126 store,
127 config,
128 metrics,
129 archive_readers,
130 } = self;
131 let store = store.unwrap();
132 let config = config.unwrap_or_default();
133 let metrics = metrics.unwrap_or_else(Metrics::disabled);
134 let archive_readers = archive_readers.unwrap_or_default();
135
136 let (sender, mailbox) = mpsc::channel(config.mailbox_capacity());
137 let (checkpoint_event_sender, _receiver) =
138 broadcast::channel(config.synced_checkpoint_broadcast_channel_capacity());
139 let weak_sender = sender.downgrade();
140 let handle = Handle {
141 sender,
142 checkpoint_event_sender: checkpoint_event_sender.clone(),
143 };
144 let peer_heights = PeerHeights {
145 peers: HashMap::new(),
146 unprocessed_checkpoints: HashMap::new(),
147 sequence_number_to_digest: HashMap::new(),
148 wait_interval_when_no_peer_to_sync_content: config
149 .wait_interval_when_no_peer_to_sync_content(),
150 }
151 .pipe(RwLock::new)
152 .pipe(Arc::new);
153
154 let server = Server {
155 store: store.clone(),
156 peer_heights: peer_heights.clone(),
157 sender: weak_sender,
158 };
159
160 (
161 UnstartedStateSync {
162 config,
163 handle,
164 mailbox,
165 store,
166 download_limit_layer: None,
167 peer_heights,
168 checkpoint_event_sender,
169 metrics,
170 archive_readers,
171 },
172 server,
173 )
174 }
175}
176
177pub struct UnstartedStateSync<S> {
178 pub(super) config: StateSyncConfig,
179 pub(super) handle: Handle,
180 pub(super) mailbox: mpsc::Receiver<StateSyncMessage>,
181 pub(super) download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
182 pub(super) store: S,
183 pub(super) peer_heights: Arc<RwLock<PeerHeights>>,
184 pub(super) checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
185 pub(super) metrics: Metrics,
186 pub(super) archive_readers: ArchiveReaderBalancer,
187}
188
189impl<S> UnstartedStateSync<S>
190where
191 S: WriteStore + Clone + Send + Sync + 'static,
192{
193 pub(super) fn build(self, network: anemo::Network) -> (StateSyncEventLoop<S>, Handle) {
194 let Self {
195 config,
196 handle,
197 mailbox,
198 download_limit_layer,
199 store,
200 peer_heights,
201 checkpoint_event_sender,
202 metrics,
203 archive_readers,
204 } = self;
205
206 (
207 StateSyncEventLoop {
208 config,
209 mailbox,
210 weak_sender: handle.sender.downgrade(),
211 tasks: JoinSet::new(),
212 sync_checkpoint_summaries_task: None,
213 sync_checkpoint_contents_task: None,
214 download_limit_layer,
215 store,
216 peer_heights,
217 checkpoint_event_sender,
218 network,
219 metrics,
220 archive_readers,
221 sync_checkpoint_from_archive_task: None,
222 },
223 handle,
224 )
225 }
226
227 pub fn start(self, network: anemo::Network) -> Handle {
228 let (event_loop, handle) = self.build(network);
229 tokio::spawn(event_loop.start());
230
231 handle
232 }
233}