1use std::{
62 collections::{HashMap, VecDeque},
63 sync::{
64 Arc, RwLock,
65 atomic::{AtomicU64, Ordering},
66 },
67 time::Duration,
68};
69
70use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
71use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
72use iota_config::p2p::StateSyncConfig;
73use iota_types::{
74 committee::Committee,
75 digests::CheckpointDigest,
76 messages_checkpoint::{
77 CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
78 FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
79 },
80 storage::WriteStore,
81};
82use rand::Rng;
83use tap::{Pipe, TapFallible, TapOptional};
84use tokio::{
85 sync::{broadcast, mpsc, oneshot, watch},
86 task::{AbortHandle, JoinSet},
87};
88use tracing::{debug, info, instrument, trace, warn};
89
90mod generated {
91 include!(concat!(env!("OUT_DIR"), "/iota.StateSync.rs"));
92}
93mod builder;
94mod metrics;
95mod server;
96#[cfg(test)]
97mod tests;
98
99pub use builder::{Builder, UnstartedStateSync};
100pub use generated::{
101 state_sync_client::StateSyncClient,
102 state_sync_server::{StateSync, StateSyncServer},
103};
104use iota_archival::reader::ArchiveReaderBalancer;
105use iota_storage::verify_checkpoint;
106pub use server::{GetCheckpointAvailabilityResponse, GetCheckpointSummaryRequest};
107
108use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
109
110#[derive(Clone, Debug)]
116pub struct Handle {
117 sender: mpsc::Sender<StateSyncMessage>,
118 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
119}
120
121impl Handle {
122 pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
132 self.sender
133 .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
134 .await
135 .unwrap()
136 }
137
138 pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
141 self.checkpoint_event_sender.subscribe()
142 }
143}
144
145struct PeerHeights {
146 peers: HashMap<PeerId, PeerStateSyncInfo>,
148 unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
149 sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
150
151 wait_interval_when_no_peer_to_sync_content: Duration,
153}
154
155#[derive(Copy, Clone, Debug, PartialEq, Eq)]
156struct PeerStateSyncInfo {
157 genesis_checkpoint_digest: CheckpointDigest,
159 on_same_chain_as_us: bool,
161 height: CheckpointSequenceNumber,
163 lowest: CheckpointSequenceNumber,
166}
167
168impl PeerHeights {
169 pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
170 self.highest_known_checkpoint_sequence_number()
171 .and_then(|s| self.sequence_number_to_digest.get(&s))
172 .and_then(|digest| self.unprocessed_checkpoints.get(digest))
173 }
174
175 pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
176 self.peers
177 .values()
178 .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
179 .max()
180 }
181
182 pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
183 self.peers
184 .iter()
185 .filter(|(_peer_id, info)| info.on_same_chain_as_us)
186 }
187
188 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
193 pub fn update_peer_info(
194 &mut self,
195 peer_id: PeerId,
196 checkpoint: Checkpoint,
197 low_watermark: Option<CheckpointSequenceNumber>,
198 ) -> bool {
199 debug!("Update peer info");
200
201 let info = match self.peers.get_mut(&peer_id) {
202 Some(info) if info.on_same_chain_as_us => info,
203 _ => return false,
204 };
205
206 info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
207 if let Some(low_watermark) = low_watermark {
208 info.lowest = low_watermark;
209 }
210 self.insert_checkpoint(checkpoint);
211
212 true
213 }
214
215 #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
216 pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
217 use std::collections::hash_map::Entry;
218 debug!("Insert peer info");
219
220 match self.peers.entry(peer_id) {
221 Entry::Occupied(mut entry) => {
222 let entry = entry.get_mut();
226 if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
227 entry.height = std::cmp::max(entry.height, info.height);
228 } else {
229 *entry = info;
230 }
231 }
232 Entry::Vacant(entry) => {
233 entry.insert(info);
234 }
235 }
236 }
237
238 pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
239 if let Some(info) = self.peers.get_mut(&peer_id) {
240 info.on_same_chain_as_us = false;
241 }
242 }
243
244 pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
245 self.unprocessed_checkpoints
246 .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
247 self.sequence_number_to_digest
248 .retain(|&s, _digest| s > sequence_number);
249 }
250
251 pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
254 let digest = *checkpoint.digest();
255 let sequence_number = *checkpoint.sequence_number();
256 self.unprocessed_checkpoints.insert(digest, checkpoint);
257 self.sequence_number_to_digest
258 .insert(sequence_number, digest);
259 }
260
261 pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
262 if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
263 self.sequence_number_to_digest
264 .remove(checkpoint.sequence_number());
265 }
266 }
267
268 pub fn get_checkpoint_by_sequence_number(
269 &self,
270 sequence_number: CheckpointSequenceNumber,
271 ) -> Option<&Checkpoint> {
272 self.sequence_number_to_digest
273 .get(&sequence_number)
274 .and_then(|digest| self.get_checkpoint_by_digest(digest))
275 }
276
277 pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
278 self.unprocessed_checkpoints.get(digest)
279 }
280
281 #[cfg(test)]
282 pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
283 self.wait_interval_when_no_peer_to_sync_content = duration;
284 }
285
286 pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
287 self.wait_interval_when_no_peer_to_sync_content
288 }
289}
290
291#[derive(Clone)]
294struct PeerBalancer {
295 peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
296 requested_checkpoint: Option<CheckpointSequenceNumber>,
297 request_type: PeerCheckpointRequestType,
298}
299
300#[derive(Clone)]
301enum PeerCheckpointRequestType {
302 Summary,
303 Content,
304}
305
306impl PeerBalancer {
307 pub fn new(
308 network: &anemo::Network,
309 peer_heights: Arc<RwLock<PeerHeights>>,
310 request_type: PeerCheckpointRequestType,
311 ) -> Self {
312 let mut peers: Vec<_> = peer_heights
313 .read()
314 .unwrap()
315 .peers_on_same_chain()
316 .filter_map(|(peer_id, info)| {
318 network
319 .peer(*peer_id)
320 .map(|peer| (peer.connection_rtt(), peer, *info))
321 })
322 .collect();
323 peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
324 Self {
325 peers: peers
326 .into_iter()
327 .map(|(_, peer, info)| (peer, info))
328 .collect(),
329 requested_checkpoint: None,
330 request_type,
331 }
332 }
333
334 pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
335 self.requested_checkpoint = Some(checkpoint);
336 self
337 }
338}
339
340impl Iterator for PeerBalancer {
341 type Item = StateSyncClient<anemo::Peer>;
342
343 fn next(&mut self) -> Option<Self::Item> {
344 while !self.peers.is_empty() {
345 const SELECTION_WINDOW: usize = 2;
346 let idx =
347 rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
348 let (peer, info) = self.peers.remove(idx).unwrap();
349 let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
350 match &self.request_type {
351 PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
353 return Some(StateSyncClient::new(peer));
354 }
355 PeerCheckpointRequestType::Content
356 if info.height >= requested_checkpoint
357 && info.lowest <= requested_checkpoint =>
358 {
359 return Some(StateSyncClient::new(peer));
360 }
361 _ => {}
362 }
363 }
364 None
365 }
366}
367
368#[derive(Clone, Debug)]
369enum StateSyncMessage {
370 StartSyncJob,
373 VerifiedCheckpoint(Box<VerifiedCheckpoint>),
376 SyncedCheckpoint(Box<VerifiedCheckpoint>),
380}
381
382struct StateSyncEventLoop<S> {
383 config: StateSyncConfig,
384
385 mailbox: mpsc::Receiver<StateSyncMessage>,
386 weak_sender: mpsc::WeakSender<StateSyncMessage>,
388
389 tasks: JoinSet<()>,
391 sync_checkpoint_summaries_task: Option<AbortHandle>,
392 sync_checkpoint_contents_task: Option<AbortHandle>,
393 download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
394
395 store: S,
396 peer_heights: Arc<RwLock<PeerHeights>>,
397 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
398 network: anemo::Network,
399 metrics: Metrics,
400
401 archive_readers: ArchiveReaderBalancer,
402 sync_checkpoint_from_archive_task: Option<AbortHandle>,
403}
404
405impl<S> StateSyncEventLoop<S>
406where
407 S: WriteStore + Clone + Send + Sync + 'static,
408{
409 pub async fn start(mut self) {
415 info!("State-Synchronizer started");
416
417 self.config.pinned_checkpoints.sort();
418
419 let mut interval = tokio::time::interval(self.config.interval_period());
420 let mut peer_events = {
421 let (subscriber, peers) = self.network.subscribe().unwrap();
422 for peer_id in peers {
423 self.spawn_get_latest_from_peer(peer_id);
424 }
425 subscriber
426 };
427 let (
428 target_checkpoint_contents_sequence_sender,
429 target_checkpoint_contents_sequence_receiver,
430 ) = watch::channel(0);
431
432 let (_sender, receiver) = oneshot::channel();
434 tokio::spawn(update_checkpoint_watermark_metrics(
435 receiver,
436 self.store.clone(),
437 self.metrics.clone(),
438 ));
439
440 let task = sync_checkpoint_contents(
442 self.network.clone(),
443 self.store.clone(),
444 self.peer_heights.clone(),
445 self.weak_sender.clone(),
446 self.checkpoint_event_sender.clone(),
447 self.config.checkpoint_content_download_concurrency(),
448 self.config.checkpoint_content_download_tx_concurrency(),
449 self.config.checkpoint_content_timeout(),
450 target_checkpoint_contents_sequence_receiver,
451 );
452 let task_handle = self.tasks.spawn(task);
453 self.sync_checkpoint_contents_task = Some(task_handle);
454
455 let task = sync_checkpoint_contents_from_archive(
464 self.network.clone(),
465 self.archive_readers.clone(),
466 self.store.clone(),
467 self.peer_heights.clone(),
468 );
469 let task_handle = self.tasks.spawn(task);
470 self.sync_checkpoint_from_archive_task = Some(task_handle);
471
472 loop {
474 tokio::select! {
475 now = interval.tick() => {
476 self.handle_tick(now.into_std());
479 },
480 maybe_message = self.mailbox.recv() => {
481 if let Some(message) = maybe_message {
485 self.handle_message(message);
486 } else {
487 break;
488 }
489 },
490 peer_event = peer_events.recv() => {
491 self.handle_peer_event(peer_event);
493 },
494 Some(task_result) = self.tasks.join_next() => {
496 match task_result {
497 Ok(()) => {},
498 Err(e) => {
499 if e.is_cancelled() {
500 } else if e.is_panic() {
502 std::panic::resume_unwind(e.into_panic());
504 } else {
505 panic!("task failed: {e}");
506 }
507 },
508 };
509
510 if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
512 panic!("sync_checkpoint_contents task unexpectedly terminated")
513 }
514
515 if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
517 self.sync_checkpoint_summaries_task = None;
518 }
519
520 if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
522 panic!("sync_checkpoint_from_archive task unexpectedly terminated")
523 }
524 },
525 }
526
527 self.maybe_start_checkpoint_summary_sync_task();
528 self.maybe_trigger_checkpoint_contents_sync_task(
529 &target_checkpoint_contents_sequence_sender,
530 );
531 }
532
533 info!("State-Synchronizer ended");
534 }
535
536 fn handle_message(&mut self, message: StateSyncMessage) {
537 debug!("Received message: {:?}", message);
538 match message {
539 StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
540 StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
541 self.handle_checkpoint_from_consensus(checkpoint)
542 }
543 StateSyncMessage::SyncedCheckpoint(checkpoint) => {
545 self.spawn_notify_peers_of_checkpoint(*checkpoint)
546 }
547 }
548 }
549
550 #[instrument(level = "debug", skip_all)]
552 fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
553 let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
556 .expect("store operation should not fail")
557 .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
558 .digest();
559 if checkpoint.previous_digest != Some(prev_digest) {
560 panic!(
561 "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
562 checkpoint.sequence_number(),
563 Some(prev_digest),
564 checkpoint.previous_digest
565 );
566 }
567
568 let latest_checkpoint = self
569 .store
570 .get_highest_verified_checkpoint()
571 .expect("store operation should not fail");
572
573 if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
575 return;
576 }
577
578 let checkpoint = *checkpoint;
579 let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
580 if *checkpoint.sequence_number() > next_sequence_number {
581 debug!(
582 "consensus sent too new of a checkpoint, expecting: {}, got: {}",
583 next_sequence_number,
584 checkpoint.sequence_number()
585 );
586 }
587
588 #[cfg(debug_assertions)]
592 {
593 let _ = (next_sequence_number..=*checkpoint.sequence_number())
594 .map(|n| {
595 let checkpoint = self
596 .store
597 .get_checkpoint_by_sequence_number(n)
598 .expect("store operation should not fail")
599 .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
600 self.store
601 .get_full_checkpoint_contents(&checkpoint.content_digest)
602 .expect("store operation should not fail")
603 .unwrap_or_else(|| {
604 panic!(
605 "store should contain checkpoint contents for {:?}",
606 checkpoint.content_digest
607 )
608 });
609 })
610 .collect::<Vec<_>>();
611 }
612
613 if let Some(EndOfEpochData {
620 next_epoch_committee,
621 ..
622 }) = checkpoint.end_of_epoch_data.as_ref()
623 {
624 let next_committee = next_epoch_committee.iter().cloned().collect();
625 let committee =
626 Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
627 self.store
628 .insert_committee(committee)
629 .expect("store operation should not fail");
630 }
631
632 self.store
633 .update_highest_verified_checkpoint(&checkpoint)
634 .expect("store operation should not fail");
635 self.store
636 .update_highest_synced_checkpoint(&checkpoint)
637 .expect("store operation should not fail");
638
639 let _ = self.checkpoint_event_sender.send(checkpoint.clone());
641
642 self.spawn_notify_peers_of_checkpoint(checkpoint);
644 }
645
646 fn handle_peer_event(
647 &mut self,
648 peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
649 ) {
650 use tokio::sync::broadcast::error::RecvError;
651
652 match peer_event {
653 Ok(PeerEvent::NewPeer(peer_id)) => {
654 self.spawn_get_latest_from_peer(peer_id);
655 }
656 Ok(PeerEvent::LostPeer(peer_id, _)) => {
657 self.peer_heights.write().unwrap().peers.remove(&peer_id);
658 }
659
660 Err(RecvError::Closed) => {
661 panic!("PeerEvent channel shouldn't be able to be closed");
662 }
663
664 Err(RecvError::Lagged(_)) => {
665 trace!("State-Sync fell behind processing PeerEvents");
666 }
667 }
668 }
669
670 fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
671 if let Some(peer) = self.network.peer(peer_id) {
672 let genesis_checkpoint_digest = *self
673 .store
674 .get_checkpoint_by_sequence_number(0)
675 .expect("store operation should not fail")
676 .expect("store should contain genesis checkpoint")
677 .digest();
678 let task = get_latest_from_peer(
679 genesis_checkpoint_digest,
680 peer,
681 self.peer_heights.clone(),
682 self.config.timeout(),
683 );
684 self.tasks.spawn(task);
685 }
686 }
687
688 fn handle_tick(&mut self, _now: std::time::Instant) {
689 let task = query_peers_for_their_latest_checkpoint(
690 self.network.clone(),
691 self.peer_heights.clone(),
692 self.weak_sender.clone(),
693 self.config.timeout(),
694 );
695 self.tasks.spawn(task);
696
697 if let Some(layer) = self.download_limit_layer.as_ref() {
698 layer.maybe_prune_map();
699 }
700 }
701
702 fn maybe_start_checkpoint_summary_sync_task(&mut self) {
706 if self.sync_checkpoint_summaries_task.is_some() {
708 return;
709 }
710
711 let highest_processed_checkpoint = self
712 .store
713 .get_highest_verified_checkpoint()
714 .expect("store operation should not fail");
715
716 let highest_known_checkpoint = self
717 .peer_heights
718 .read()
719 .unwrap()
720 .highest_known_checkpoint()
721 .cloned();
722
723 if Some(highest_processed_checkpoint.sequence_number())
724 < highest_known_checkpoint
725 .as_ref()
726 .map(|x| x.sequence_number())
727 {
728 let task = sync_to_checkpoint(
730 self.network.clone(),
731 self.store.clone(),
732 self.peer_heights.clone(),
733 self.metrics.clone(),
734 self.config.pinned_checkpoints.clone(),
735 self.config.checkpoint_header_download_concurrency(),
736 self.config.timeout(),
737 highest_known_checkpoint.unwrap(),
739 )
740 .map(|result| match result {
741 Ok(()) => {}
742 Err(e) => {
743 debug!("error syncing checkpoint {e}");
744 }
745 });
746 let task_handle = self.tasks.spawn(task);
747 self.sync_checkpoint_summaries_task = Some(task_handle);
748 }
749 }
750
751 fn maybe_trigger_checkpoint_contents_sync_task(
756 &mut self,
757 target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
758 ) {
759 let highest_verified_checkpoint = self
760 .store
761 .get_highest_verified_checkpoint()
762 .expect("store operation should not fail");
763 let highest_synced_checkpoint = self
764 .store
765 .get_highest_synced_checkpoint()
766 .expect("store operation should not fail");
767
768 if highest_verified_checkpoint.sequence_number()
769 > highest_synced_checkpoint.sequence_number()
770 && self
772 .peer_heights
773 .read()
774 .unwrap()
775 .highest_known_checkpoint_sequence_number()
776 > Some(*highest_synced_checkpoint.sequence_number())
777 {
778 let _ = target_sequence_channel.send_if_modified(|num| {
779 let new_num = *highest_verified_checkpoint.sequence_number();
780 if *num == new_num {
781 return false;
782 }
783 *num = new_num;
784 true
785 });
786 }
787 }
788
789 fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
790 let task = notify_peers_of_checkpoint(
791 self.network.clone(),
792 self.peer_heights.clone(),
793 checkpoint,
794 self.config.timeout(),
795 );
796 self.tasks.spawn(task);
797 }
798}
799
800async fn notify_peers_of_checkpoint(
803 network: anemo::Network,
804 peer_heights: Arc<RwLock<PeerHeights>>,
805 checkpoint: VerifiedCheckpoint,
806 timeout: Duration,
807) {
808 let futs = peer_heights
809 .read()
810 .unwrap()
811 .peers_on_same_chain()
813 .filter_map(|(peer_id, info)| {
815 (*checkpoint.sequence_number() > info.height).then_some(peer_id)
816 })
817 .flat_map(|peer_id| network.peer(*peer_id))
819 .map(StateSyncClient::new)
820 .map(|mut client| {
821 let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
822 async move { client.push_checkpoint_summary(request).await }
823 })
824 .collect::<Vec<_>>();
825 futures::future::join_all(futs).await;
826}
827
828async fn get_latest_from_peer(
831 our_genesis_checkpoint_digest: CheckpointDigest,
832 peer: anemo::Peer,
833 peer_heights: Arc<RwLock<PeerHeights>>,
834 timeout: Duration,
835) {
836 let peer_id = peer.peer_id();
837 let mut client = StateSyncClient::new(peer);
838
839 let info = {
840 let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
841
842 if let Some(info) = maybe_info {
843 info
844 } else {
845 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
850 .with_timeout(timeout);
851 let response = client
852 .get_checkpoint_summary(request)
853 .await
854 .map(Response::into_inner);
855
856 let info = match response {
857 Ok(Some(checkpoint)) => {
858 let digest = *checkpoint.digest();
859 PeerStateSyncInfo {
860 genesis_checkpoint_digest: digest,
861 on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
862 height: *checkpoint.sequence_number(),
863 lowest: CheckpointSequenceNumber::default(),
864 }
865 }
866 Ok(None) => PeerStateSyncInfo {
867 genesis_checkpoint_digest: CheckpointDigest::default(),
868 on_same_chain_as_us: false,
869 height: CheckpointSequenceNumber::default(),
870 lowest: CheckpointSequenceNumber::default(),
871 },
872 Err(status) => {
873 trace!("get_latest_checkpoint_summary request failed: {status:?}");
874 return;
875 }
876 };
877 peer_heights
878 .write()
879 .unwrap()
880 .insert_peer_info(peer_id, info);
881 info
882 }
883 };
884
885 if !info.on_same_chain_as_us {
887 trace!(?info, "Peer {peer_id} not on same chain as us");
888 return;
889 }
890 let Some((highest_checkpoint, low_watermark)) =
891 query_peer_for_latest_info(&mut client, timeout).await
892 else {
893 return;
894 };
895 peer_heights
896 .write()
897 .unwrap()
898 .update_peer_info(peer_id, highest_checkpoint, low_watermark);
899}
900
901async fn query_peer_for_latest_info(
904 client: &mut StateSyncClient<anemo::Peer>,
905 timeout: Duration,
906) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
907 let request = Request::new(()).with_timeout(timeout);
908 let response = client
909 .get_checkpoint_availability(request)
910 .await
911 .map(Response::into_inner);
912 match response {
913 Ok(GetCheckpointAvailabilityResponse {
914 highest_synced_checkpoint,
915 lowest_available_checkpoint,
916 }) => {
917 return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
918 }
919 Err(status) => {
920 if status.status() != anemo::types::response::StatusCode::NotFound {
922 trace!("get_checkpoint_availability request failed: {status:?}");
923 return None;
924 }
925 }
926 };
927
928 let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
931 let response = client
932 .get_checkpoint_summary(request)
933 .await
934 .map(Response::into_inner);
935 match response {
936 Ok(Some(checkpoint)) => Some((checkpoint, None)),
937 Ok(None) => None,
938 Err(status) => {
939 trace!("get_checkpoint_summary (latest) request failed: {status:?}");
940 None
941 }
942 }
943}
944
945#[instrument(level = "debug", skip_all)]
950async fn query_peers_for_their_latest_checkpoint(
951 network: anemo::Network,
952 peer_heights: Arc<RwLock<PeerHeights>>,
953 sender: mpsc::WeakSender<StateSyncMessage>,
954 timeout: Duration,
955) {
956 let peer_heights = &peer_heights;
957 let futs = peer_heights
958 .read()
959 .unwrap()
960 .peers_on_same_chain()
961 .flat_map(|(peer_id, _info)| network.peer(*peer_id))
963 .map(|peer| {
964 let peer_id = peer.peer_id();
965 let mut client = StateSyncClient::new(peer);
966
967 async move {
968 let response = query_peer_for_latest_info(&mut client, timeout).await;
969 match response {
970 Some((highest_checkpoint, low_watermark)) => peer_heights
971 .write()
972 .unwrap()
973 .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
974 .then_some(highest_checkpoint),
975 None => None,
976 }
977 }
978 })
979 .collect::<Vec<_>>();
980
981 debug!("Query {} peers for latest checkpoint", futs.len());
982
983 let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
984
985 let highest_checkpoint = checkpoints.max_by_key(|checkpoint| *checkpoint.sequence_number());
986
987 let our_highest_checkpoint = peer_heights
988 .read()
989 .unwrap()
990 .highest_known_checkpoint()
991 .cloned();
992
993 debug!(
994 "Our highest checkpoint {:?}, peers highest checkpoint {:?}",
995 our_highest_checkpoint.as_ref().map(|c| c.sequence_number()),
996 highest_checkpoint.as_ref().map(|c| c.sequence_number())
997 );
998
999 let _new_checkpoint = match (highest_checkpoint, our_highest_checkpoint) {
1000 (Some(theirs), None) => theirs,
1001 (Some(theirs), Some(ours)) if theirs.sequence_number() > ours.sequence_number() => theirs,
1002 _ => return,
1003 };
1004
1005 if let Some(sender) = sender.upgrade() {
1006 let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1007 }
1008}
1009
1010async fn sync_to_checkpoint<S>(
1015 network: anemo::Network,
1016 store: S,
1017 peer_heights: Arc<RwLock<PeerHeights>>,
1018 metrics: Metrics,
1019 pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1020 checkpoint_header_download_concurrency: usize,
1021 timeout: Duration,
1022 checkpoint: Checkpoint,
1023) -> Result<()>
1024where
1025 S: WriteStore,
1026{
1027 metrics.set_highest_known_checkpoint(*checkpoint.sequence_number());
1028
1029 let mut current = store
1030 .get_highest_verified_checkpoint()
1031 .expect("store operation should not fail");
1032 if current.sequence_number() >= checkpoint.sequence_number() {
1033 return Err(anyhow::anyhow!(
1034 "target checkpoint {} is older than highest verified checkpoint {}",
1035 checkpoint.sequence_number(),
1036 current.sequence_number(),
1037 ));
1038 }
1039
1040 let peer_balancer = PeerBalancer::new(
1041 &network,
1042 peer_heights.clone(),
1043 PeerCheckpointRequestType::Summary,
1044 );
1045 let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1047 ..=*checkpoint.sequence_number())
1048 .map(|next| {
1049 let peers = peer_balancer.clone().with_checkpoint(next);
1050 let peer_heights = peer_heights.clone();
1051 let pinned_checkpoints = &pinned_checkpoints;
1052 async move {
1053 if let Some(checkpoint) = peer_heights
1054 .read()
1055 .unwrap()
1056 .get_checkpoint_by_sequence_number(next)
1057 {
1058 return (Some(checkpoint.to_owned()), next, None);
1059 }
1060
1061 for mut peer in peers {
1064 let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1065 .with_timeout(timeout);
1066 if let Some(checkpoint) = peer
1067 .get_checkpoint_summary(request)
1068 .await
1069 .tap_err(|e| trace!("{e:?}"))
1070 .ok()
1071 .and_then(Response::into_inner)
1072 .tap_none(|| trace!("peer unable to help sync"))
1073 {
1074 if *checkpoint.sequence_number() != next {
1076 tracing::debug!(
1077 "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1078 checkpoint.sequence_number()
1079 );
1080 continue;
1081 }
1082
1083 let checkpoint_digest = checkpoint.digest();
1085 if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1086 checkpoint.sequence_number(),
1087 |(seq_num, _digest)| *seq_num
1088 ) {
1089 if pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1090 tracing::debug!(
1091 "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1092 pinned_checkpoints[pinned_digest_index].1,
1093 checkpoint_digest
1094 );
1095 continue;
1096 }
1097 }
1098
1099 peer_heights
1101 .write()
1102 .unwrap()
1103 .insert_checkpoint(checkpoint.clone());
1104 return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1105 }
1106 }
1107 (None, next, None)
1108 }
1109 })
1110 .pipe(futures::stream::iter)
1111 .buffered(checkpoint_header_download_concurrency);
1112
1113 while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1114 assert_eq!(
1115 current
1116 .sequence_number()
1117 .checked_add(1)
1118 .expect("exhausted u64"),
1119 next
1120 );
1121
1122 let checkpoint = 'cp: {
1124 let checkpoint = maybe_checkpoint.ok_or_else(|| {
1125 anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1126 })?;
1127 if pinned_checkpoints
1129 .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1130 .is_ok()
1131 {
1132 break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1133 }
1134 match verify_checkpoint(¤t, &store, checkpoint) {
1135 Ok(verified_checkpoint) => verified_checkpoint,
1136 Err(checkpoint) => {
1137 let mut peer_heights = peer_heights.write().unwrap();
1138 peer_heights.remove_checkpoint(checkpoint.digest());
1141
1142 if let Some(peer_id) = maybe_peer_id {
1144 peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1145 }
1146
1147 return Err(anyhow::anyhow!(
1148 "unable to verify checkpoint {checkpoint:?}"
1149 ));
1150 }
1151 }
1152 };
1153
1154 debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1155 if let Some(checkpoint_summary_age_metric) = metrics.checkpoint_summary_age_metric() {
1156 checkpoint.report_checkpoint_age_ms(checkpoint_summary_age_metric);
1157 }
1158
1159 current = checkpoint.clone();
1160 store
1163 .insert_checkpoint(&checkpoint)
1164 .expect("store operation should not fail");
1165 }
1166
1167 peer_heights
1168 .write()
1169 .unwrap()
1170 .cleanup_old_checkpoints(*checkpoint.sequence_number());
1171
1172 Ok(())
1173}
1174
1175async fn sync_checkpoint_contents_from_archive<S>(
1179 network: anemo::Network,
1180 archive_readers: ArchiveReaderBalancer,
1181 store: S,
1182 peer_heights: Arc<RwLock<PeerHeights>>,
1183) where
1184 S: WriteStore + Clone + Send + Sync + 'static,
1185{
1186 loop {
1187 let peers: Vec<_> = peer_heights
1189 .read()
1190 .unwrap()
1191 .peers_on_same_chain()
1192 .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1194 .collect();
1195 let lowest_checkpoint_on_peers = peers
1196 .iter()
1197 .map(|(_p, state_sync_info)| state_sync_info.lowest)
1198 .min();
1199 let highest_synced = store
1200 .get_highest_synced_checkpoint()
1201 .expect("store operation should not fail")
1202 .sequence_number;
1203 let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers
1204 {
1205 highest_synced < lowest_checkpoint_on_peers
1206 } else {
1207 false
1208 };
1209 debug!(
1210 "Syncing checkpoint contents from archive: {sync_from_archive}, highest_synced: {highest_synced}, lowest_checkpoint_on_peers: {}",
1211 lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1212 );
1213 if sync_from_archive {
1214 let start = highest_synced
1215 .checked_add(1)
1216 .expect("Checkpoint seq num overflow");
1217 let checkpoint_range = start..lowest_checkpoint_on_peers.unwrap();
1218 if let Some(archive_reader) = archive_readers
1219 .pick_one_random(checkpoint_range.clone())
1220 .await
1221 {
1222 let txn_counter = Arc::new(AtomicU64::new(0));
1223 let checkpoint_counter = Arc::new(AtomicU64::new(0));
1224 if let Err(err) = archive_reader
1225 .read(
1226 store.clone(),
1227 checkpoint_range,
1228 txn_counter.clone(),
1229 checkpoint_counter.clone(),
1230 true,
1231 )
1232 .await
1233 {
1234 warn!("State sync from archive failed with error: {:?}", err);
1235 } else {
1236 info!(
1237 "State sync from archive is complete. Checkpoints downloaded = {:?}, Txns downloaded = {:?}",
1238 checkpoint_counter.load(Ordering::Relaxed),
1239 txn_counter.load(Ordering::Relaxed)
1240 );
1241 }
1242 } else {
1243 warn!("Failed to find an archive reader to complete the state sync request");
1244 }
1245 }
1246 tokio::time::sleep(Duration::from_secs(5)).await;
1247 }
1248}
1249
1250async fn sync_checkpoint_contents<S>(
1256 network: anemo::Network,
1257 store: S,
1258 peer_heights: Arc<RwLock<PeerHeights>>,
1259 sender: mpsc::WeakSender<StateSyncMessage>,
1260 checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1261 checkpoint_content_download_concurrency: usize,
1262 checkpoint_content_download_tx_concurrency: u64,
1263 timeout: Duration,
1264 mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1265) where
1266 S: WriteStore + Clone,
1267{
1268 let mut highest_synced = store
1269 .get_highest_synced_checkpoint()
1270 .expect("store operation should not fail");
1271
1272 let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1273 let mut target_sequence_cursor = 0;
1274 let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1275 let mut checkpoint_contents_tasks = FuturesOrdered::new();
1276
1277 let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1278
1279 loop {
1280 tokio::select! {
1281 result = target_sequence_channel.changed() => {
1282 match result {
1283 Ok(()) => {
1284 target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1285 }
1286 Err(_) => {
1287 return
1289 }
1290 }
1291 },
1292 Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1293 match maybe_checkpoint {
1294 Ok(checkpoint) => {
1295 let _: &VerifiedCheckpoint = &checkpoint; store
1298 .update_highest_synced_checkpoint(&checkpoint)
1299 .expect("store operation should not fail");
1300 let _ = checkpoint_event_sender.send(checkpoint.clone());
1302 tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1303 highest_synced = checkpoint;
1304
1305 }
1306 Err(checkpoint) => {
1307 let _: &VerifiedCheckpoint = &checkpoint; if let Some(lowest_peer_checkpoint) =
1309 peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1310 if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1311 info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1312 }
1313 } else {
1314 info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1315
1316 }
1317 checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1319 network.clone(),
1320 &store,
1321 peer_heights.clone(),
1322 timeout,
1323 checkpoint,
1324 ));
1325 }
1326 }
1327 },
1328 }
1329
1330 while current_sequence < target_sequence_cursor
1332 && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1333 {
1334 let next_checkpoint = store
1335 .get_checkpoint_by_sequence_number(current_sequence)
1336 .expect("store operation should not fail")
1337 .expect(
1338 "BUG: store should have all checkpoints older than highest_verified_checkpoint",
1339 );
1340
1341 let tx_count = next_checkpoint.network_total_transactions
1343 - highest_started_network_total_transactions;
1344 if tx_count > tx_concurrency_remaining {
1345 break;
1346 }
1347 tx_concurrency_remaining -= tx_count;
1348
1349 highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1350 current_sequence += 1;
1351 checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1352 network.clone(),
1353 &store,
1354 peer_heights.clone(),
1355 timeout,
1356 next_checkpoint,
1357 ));
1358 }
1359
1360 if highest_synced.sequence_number() % checkpoint_content_download_concurrency as u64 == 0
1361 || checkpoint_contents_tasks.is_empty()
1362 {
1363 if let Some(sender) = sender.upgrade() {
1366 let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1367 let _ = sender.send(message).await;
1368 }
1369 }
1370 }
1371}
1372
1373#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1374async fn sync_one_checkpoint_contents<S>(
1377 network: anemo::Network,
1378 store: S,
1379 peer_heights: Arc<RwLock<PeerHeights>>,
1380 timeout: Duration,
1381 checkpoint: VerifiedCheckpoint,
1382) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1383where
1384 S: WriteStore + Clone,
1385{
1386 debug!("syncing checkpoint contents");
1387
1388 if store
1391 .get_highest_synced_checkpoint()
1392 .expect("store operation should not fail")
1393 .sequence_number()
1394 >= checkpoint.sequence_number()
1395 {
1396 debug!("checkpoint was already created via consensus output");
1397 return Ok(checkpoint);
1398 }
1399
1400 let peers = PeerBalancer::new(
1402 &network,
1403 peer_heights.clone(),
1404 PeerCheckpointRequestType::Content,
1405 )
1406 .with_checkpoint(*checkpoint.sequence_number());
1407 let now = tokio::time::Instant::now();
1408 let Some(_contents) = get_full_checkpoint_contents(peers, &store, &checkpoint, timeout).await
1409 else {
1410 let duration = peer_heights
1413 .read()
1414 .unwrap()
1415 .wait_interval_when_no_peer_to_sync_content();
1416 if now.elapsed() < duration {
1417 let duration = duration - now.elapsed();
1418 info!("retrying checkpoint sync after {:?}", duration);
1419 tokio::time::sleep(duration).await;
1420 }
1421 return Err(checkpoint);
1422 };
1423 debug!("completed checkpoint contents sync");
1424 Ok(checkpoint)
1425}
1426
1427#[instrument(level = "debug", skip_all)]
1428async fn get_full_checkpoint_contents<S>(
1432 peers: PeerBalancer,
1433 store: S,
1434 checkpoint: &VerifiedCheckpoint,
1435 timeout: Duration,
1436) -> Option<FullCheckpointContents>
1437where
1438 S: WriteStore,
1439{
1440 let digest = checkpoint.content_digest;
1441 if let Some(contents) = store
1442 .get_full_checkpoint_contents_by_sequence_number(*checkpoint.sequence_number())
1443 .expect("store operation should not fail")
1444 .or_else(|| {
1445 store
1446 .get_full_checkpoint_contents(&digest)
1447 .expect("store operation should not fail")
1448 })
1449 {
1450 debug!("store already contains checkpoint contents");
1451 return Some(contents);
1452 }
1453
1454 for mut peer in peers {
1457 debug!(
1458 ?timeout,
1459 "requesting checkpoint contents from {}",
1460 peer.inner().peer_id(),
1461 );
1462 let request = Request::new(digest).with_timeout(timeout);
1463 if let Some(contents) = peer
1464 .get_checkpoint_contents(request)
1465 .await
1466 .tap_err(|e| trace!("{e:?}"))
1467 .ok()
1468 .and_then(Response::into_inner)
1469 .tap_none(|| trace!("peer unable to help sync"))
1470 {
1471 if contents.verify_digests(digest).is_ok() {
1472 let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1473 store
1474 .insert_checkpoint_contents(checkpoint, verified_contents)
1475 .expect("store operation should not fail");
1476 return Some(contents);
1477 }
1478 }
1479 }
1480 debug!("no peers had checkpoint contents");
1481 None
1482}
1483
1484async fn update_checkpoint_watermark_metrics<S>(
1485 mut recv: oneshot::Receiver<()>,
1486 store: S,
1487 metrics: Metrics,
1488) -> Result<()>
1489where
1490 S: WriteStore + Clone + Send + Sync,
1491{
1492 let mut interval = tokio::time::interval(Duration::from_secs(5));
1493 loop {
1494 tokio::select! {
1495 _now = interval.tick() => {
1496 let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1497 .expect("store operation should not fail");
1498 metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1499 let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1500 .expect("store operation should not fail");
1501 metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1502 },
1503 _ = &mut recv => break,
1504 }
1505 }
1506 Ok(())
1507}