iota_network/state_sync/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! Peer-to-peer data synchronization of checkpoints.
6//!
7//! This StateSync module is responsible for the synchronization and
8//! dissemination of checkpoints and the transactions, and their effects,
9//! contained within. This module is *not* responsible for the execution of the
10//! transactions included in a checkpoint, that process is left to another
11//! component in the system.
12//!
13//! # High-level Overview of StateSync
14//!
15//! StateSync discovers new checkpoints via a few different sources:
16//! 1. If this node is a Validator, checkpoints will be produced via consensus
17//!    at which point consensus can notify state-sync of the new checkpoint via
18//!    [Handle::send_checkpoint].
19//! 2. A peer notifies us of the latest checkpoint which they have synchronized.
20//!    State-Sync will also periodically query its peers to discover what their
21//!    latest checkpoint is.
22//!
23//! We keep track of two different watermarks:
24//! * highest_verified_checkpoint - This is the highest checkpoint header that
25//!   we've locally verified. This indicated that we have in our persistent
26//!   store (and have verified) all checkpoint headers up to and including this
27//!   value.
28//! * highest_synced_checkpoint - This is the highest checkpoint that we've
29//!   fully synchronized, meaning we've downloaded and have in our persistent
30//!   stores all of the transactions, and their effects (but not the objects),
31//!   for all checkpoints up to and including this point. This is the watermark
32//!   that is shared with other peers, either via notification or when they
33//!   query for our latest checkpoint, and is intended to be used as a guarantee
34//!   of data availability.
35//!
36//! The `PeerHeights` struct is used to track the highest_synced_checkpoint
37//! watermark for all of our peers.
38//!
39//! When a new checkpoint is discovered, and we've determined that it is higher
40//! than our highest_verified_checkpoint, then StateSync will kick off a task to
41//! synchronize and verify all checkpoints between our highest_synced_checkpoint
42//! and the newly discovered checkpoint. This process is done by querying one of
43//! our peers for the checkpoints we're missing (using the `PeerHeights` struct
44//! as a way to intelligently select which peers have the data available for
45//! us to query) at which point we will locally verify the signatures on the
46//! checkpoint header with the appropriate committee (based on the epoch). As
47//! checkpoints are verified, the highest_synced_checkpoint watermark will be
48//! ratcheted up.
49//!
50//! Once we've ratcheted up our highest_verified_checkpoint, and if it is higher
51//! than highest_synced_checkpoint, StateSync will then kick off a task to
52//! synchronize the contents of all of the checkpoints from
53//! highest_synced_checkpoint..=highest_verified_checkpoint. After the
54//! contents of each checkpoint is fully downloaded, StateSync will update our
55//! highest_synced_checkpoint watermark and send out a notification on a
56//! broadcast channel indicating that a new checkpoint has been fully
57//! downloaded. Notifications on this broadcast channel will always be made in
58//! order. StateSync will also send out a notification to its peers of the newly
59//! synchronized checkpoint so that it can help other peers synchronize.
60
61use std::{
62    collections::{HashMap, VecDeque},
63    sync::{
64        Arc, RwLock,
65        atomic::{AtomicU64, Ordering},
66    },
67    time::Duration,
68};
69
70use anemo::{PeerId, Request, Response, Result, types::PeerEvent};
71use futures::{FutureExt, StreamExt, stream::FuturesOrdered};
72use iota_config::p2p::StateSyncConfig;
73use iota_types::{
74    committee::Committee,
75    digests::CheckpointDigest,
76    messages_checkpoint::{
77        CertifiedCheckpointSummary as Checkpoint, CheckpointSequenceNumber, EndOfEpochData,
78        FullCheckpointContents, VerifiedCheckpoint, VerifiedCheckpointContents,
79    },
80    storage::WriteStore,
81};
82use rand::Rng;
83use tap::{Pipe, TapFallible, TapOptional};
84use tokio::{
85    sync::{broadcast, mpsc, oneshot, watch},
86    task::{AbortHandle, JoinSet},
87};
88use tracing::{debug, info, instrument, trace, warn};
89
90mod generated {
91    include!(concat!(env!("OUT_DIR"), "/iota.StateSync.rs"));
92}
93mod builder;
94mod metrics;
95mod server;
96#[cfg(test)]
97mod tests;
98
99pub use builder::{Builder, UnstartedStateSync};
100pub use generated::{
101    state_sync_client::StateSyncClient,
102    state_sync_server::{StateSync, StateSyncServer},
103};
104use iota_archival::reader::ArchiveReaderBalancer;
105use iota_storage::verify_checkpoint;
106pub use server::{GetCheckpointAvailabilityResponse, GetCheckpointSummaryRequest};
107
108use self::{metrics::Metrics, server::CheckpointContentsDownloadLimitLayer};
109
110/// A handle to the StateSync subsystem.
111///
112/// This handle can be cloned and shared. Once all copies of a StateSync
113/// system's Handle have been dropped, the StateSync system will be gracefully
114/// shutdown.
115#[derive(Clone, Debug)]
116pub struct Handle {
117    sender: mpsc::Sender<StateSyncMessage>,
118    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
119}
120
121impl Handle {
122    /// Send a newly minted checkpoint from Consensus to StateSync so that it
123    /// can be disseminated to other nodes on the network.
124    ///
125    /// # Invariant
126    ///
127    /// Consensus must only notify StateSync of new checkpoints that have been
128    /// fully committed to persistent storage. This includes
129    /// CheckpointContents and all Transactions and TransactionEffects
130    /// included therein.
131    pub async fn send_checkpoint(&self, checkpoint: VerifiedCheckpoint) {
132        self.sender
133            .send(StateSyncMessage::VerifiedCheckpoint(Box::new(checkpoint)))
134            .await
135            .unwrap()
136    }
137
138    /// Subscribe to the stream of checkpoints that have been fully synchronized
139    /// and downloaded.
140    pub fn subscribe_to_synced_checkpoints(&self) -> broadcast::Receiver<VerifiedCheckpoint> {
141        self.checkpoint_event_sender.subscribe()
142    }
143}
144
145struct PeerHeights {
146    /// Table used to track the highest checkpoint for each of our peers.
147    peers: HashMap<PeerId, PeerStateSyncInfo>,
148    unprocessed_checkpoints: HashMap<CheckpointDigest, Checkpoint>,
149    sequence_number_to_digest: HashMap<CheckpointSequenceNumber, CheckpointDigest>,
150
151    // The amount of time to wait before retry if there are no peers to sync content from.
152    wait_interval_when_no_peer_to_sync_content: Duration,
153}
154
155#[derive(Copy, Clone, Debug, PartialEq, Eq)]
156struct PeerStateSyncInfo {
157    /// The digest of the Peer's genesis checkpoint.
158    genesis_checkpoint_digest: CheckpointDigest,
159    /// Indicates if this Peer is on the same chain as us.
160    on_same_chain_as_us: bool,
161    /// Highest checkpoint sequence number we know of for this Peer.
162    height: CheckpointSequenceNumber,
163    /// lowest available checkpoint watermark for this Peer.
164    /// This defaults to 0 for now.
165    lowest: CheckpointSequenceNumber,
166}
167
168impl PeerHeights {
169    pub fn highest_known_checkpoint(&self) -> Option<&Checkpoint> {
170        self.highest_known_checkpoint_sequence_number()
171            .and_then(|s| self.sequence_number_to_digest.get(&s))
172            .and_then(|digest| self.unprocessed_checkpoints.get(digest))
173    }
174
175    pub fn highest_known_checkpoint_sequence_number(&self) -> Option<CheckpointSequenceNumber> {
176        self.peers
177            .values()
178            .filter_map(|info| info.on_same_chain_as_us.then_some(info.height))
179            .max()
180    }
181
182    pub fn peers_on_same_chain(&self) -> impl Iterator<Item = (&PeerId, &PeerStateSyncInfo)> {
183        self.peers
184            .iter()
185            .filter(|(_peer_id, info)| info.on_same_chain_as_us)
186    }
187
188    // Returns a bool that indicates if the update was done successfully.
189    //
190    // This will return false if the given peer doesn't have an entry or is not on
191    // the same chain as us
192    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, checkpoint=?checkpoint.sequence_number()))]
193    pub fn update_peer_info(
194        &mut self,
195        peer_id: PeerId,
196        checkpoint: Checkpoint,
197        low_watermark: Option<CheckpointSequenceNumber>,
198    ) -> bool {
199        debug!("Update peer info");
200
201        let info = match self.peers.get_mut(&peer_id) {
202            Some(info) if info.on_same_chain_as_us => info,
203            _ => return false,
204        };
205
206        info.height = std::cmp::max(*checkpoint.sequence_number(), info.height);
207        if let Some(low_watermark) = low_watermark {
208            info.lowest = low_watermark;
209        }
210        self.insert_checkpoint(checkpoint);
211
212        true
213    }
214
215    #[instrument(level = "debug", skip_all, fields(peer_id=?peer_id, lowest = ?info.lowest, height = ?info.height))]
216    pub fn insert_peer_info(&mut self, peer_id: PeerId, info: PeerStateSyncInfo) {
217        use std::collections::hash_map::Entry;
218        debug!("Insert peer info");
219
220        match self.peers.entry(peer_id) {
221            Entry::Occupied(mut entry) => {
222                // If there's already an entry and the genesis checkpoint digests match then
223                // update the maximum height. Otherwise we'll use the more
224                // recent one
225                let entry = entry.get_mut();
226                if entry.genesis_checkpoint_digest == info.genesis_checkpoint_digest {
227                    entry.height = std::cmp::max(entry.height, info.height);
228                } else {
229                    *entry = info;
230                }
231            }
232            Entry::Vacant(entry) => {
233                entry.insert(info);
234            }
235        }
236    }
237
238    pub fn mark_peer_as_not_on_same_chain(&mut self, peer_id: PeerId) {
239        if let Some(info) = self.peers.get_mut(&peer_id) {
240            info.on_same_chain_as_us = false;
241        }
242    }
243
244    pub fn cleanup_old_checkpoints(&mut self, sequence_number: CheckpointSequenceNumber) {
245        self.unprocessed_checkpoints
246            .retain(|_digest, checkpoint| *checkpoint.sequence_number() > sequence_number);
247        self.sequence_number_to_digest
248            .retain(|&s, _digest| s > sequence_number);
249    }
250
251    // TODO: also record who gives this checkpoint info for peer quality
252    // measurement?
253    pub fn insert_checkpoint(&mut self, checkpoint: Checkpoint) {
254        let digest = *checkpoint.digest();
255        let sequence_number = *checkpoint.sequence_number();
256        self.unprocessed_checkpoints.insert(digest, checkpoint);
257        self.sequence_number_to_digest
258            .insert(sequence_number, digest);
259    }
260
261    pub fn remove_checkpoint(&mut self, digest: &CheckpointDigest) {
262        if let Some(checkpoint) = self.unprocessed_checkpoints.remove(digest) {
263            self.sequence_number_to_digest
264                .remove(checkpoint.sequence_number());
265        }
266    }
267
268    pub fn get_checkpoint_by_sequence_number(
269        &self,
270        sequence_number: CheckpointSequenceNumber,
271    ) -> Option<&Checkpoint> {
272        self.sequence_number_to_digest
273            .get(&sequence_number)
274            .and_then(|digest| self.get_checkpoint_by_digest(digest))
275    }
276
277    pub fn get_checkpoint_by_digest(&self, digest: &CheckpointDigest) -> Option<&Checkpoint> {
278        self.unprocessed_checkpoints.get(digest)
279    }
280
281    #[cfg(test)]
282    pub fn set_wait_interval_when_no_peer_to_sync_content(&mut self, duration: Duration) {
283        self.wait_interval_when_no_peer_to_sync_content = duration;
284    }
285
286    pub fn wait_interval_when_no_peer_to_sync_content(&self) -> Duration {
287        self.wait_interval_when_no_peer_to_sync_content
288    }
289}
290
291// PeerBalancer is an Iterator that selects peers based on RTT with some added
292// randomness.
293#[derive(Clone)]
294struct PeerBalancer {
295    peers: VecDeque<(anemo::Peer, PeerStateSyncInfo)>,
296    requested_checkpoint: Option<CheckpointSequenceNumber>,
297    request_type: PeerCheckpointRequestType,
298}
299
300#[derive(Clone)]
301enum PeerCheckpointRequestType {
302    Summary,
303    Content,
304}
305
306impl PeerBalancer {
307    pub fn new(
308        network: &anemo::Network,
309        peer_heights: Arc<RwLock<PeerHeights>>,
310        request_type: PeerCheckpointRequestType,
311    ) -> Self {
312        let mut peers: Vec<_> = peer_heights
313            .read()
314            .unwrap()
315            .peers_on_same_chain()
316            // Filter out any peers who we aren't connected with.
317            .filter_map(|(peer_id, info)| {
318                network
319                    .peer(*peer_id)
320                    .map(|peer| (peer.connection_rtt(), peer, *info))
321            })
322            .collect();
323        peers.sort_by(|(rtt_a, _, _), (rtt_b, _, _)| rtt_a.cmp(rtt_b));
324        Self {
325            peers: peers
326                .into_iter()
327                .map(|(_, peer, info)| (peer, info))
328                .collect(),
329            requested_checkpoint: None,
330            request_type,
331        }
332    }
333
334    pub fn with_checkpoint(mut self, checkpoint: CheckpointSequenceNumber) -> Self {
335        self.requested_checkpoint = Some(checkpoint);
336        self
337    }
338}
339
340impl Iterator for PeerBalancer {
341    type Item = StateSyncClient<anemo::Peer>;
342
343    fn next(&mut self) -> Option<Self::Item> {
344        while !self.peers.is_empty() {
345            const SELECTION_WINDOW: usize = 2;
346            let idx =
347                rand::thread_rng().gen_range(0..std::cmp::min(SELECTION_WINDOW, self.peers.len()));
348            let (peer, info) = self.peers.remove(idx).unwrap();
349            let requested_checkpoint = self.requested_checkpoint.unwrap_or(0);
350            match &self.request_type {
351                // Summary will never be pruned
352                PeerCheckpointRequestType::Summary if info.height >= requested_checkpoint => {
353                    return Some(StateSyncClient::new(peer));
354                }
355                PeerCheckpointRequestType::Content
356                    if info.height >= requested_checkpoint
357                        && info.lowest <= requested_checkpoint =>
358                {
359                    return Some(StateSyncClient::new(peer));
360                }
361                _ => {}
362            }
363        }
364        None
365    }
366}
367
368#[derive(Clone, Debug)]
369enum StateSyncMessage {
370    /// Node will send this to StateSyncEventLoop in order to kick off the state
371    /// sync process.
372    StartSyncJob,
373    // Validators will send this to the StateSyncEventLoop in order to kick off notifying our
374    // peers of the new checkpoint.
375    VerifiedCheckpoint(Box<VerifiedCheckpoint>),
376    // Notification that the checkpoint content sync task will send to the event loop in the event
377    // it was able to successfully sync a checkpoint's contents. If multiple checkpoints were
378    // synced at the same time, only the highest checkpoint is sent.
379    SyncedCheckpoint(Box<VerifiedCheckpoint>),
380}
381
382struct StateSyncEventLoop<S> {
383    config: StateSyncConfig,
384
385    mailbox: mpsc::Receiver<StateSyncMessage>,
386    /// Weak reference to our own mailbox
387    weak_sender: mpsc::WeakSender<StateSyncMessage>,
388
389    /// A set of all spawned tasks.
390    tasks: JoinSet<()>,
391    sync_checkpoint_summaries_task: Option<AbortHandle>,
392    sync_checkpoint_contents_task: Option<AbortHandle>,
393    download_limit_layer: Option<CheckpointContentsDownloadLimitLayer>,
394
395    store: S,
396    peer_heights: Arc<RwLock<PeerHeights>>,
397    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
398    network: anemo::Network,
399    metrics: Metrics,
400
401    archive_readers: ArchiveReaderBalancer,
402    sync_checkpoint_from_archive_task: Option<AbortHandle>,
403}
404
405impl<S> StateSyncEventLoop<S>
406where
407    S: WriteStore + Clone + Send + Sync + 'static,
408{
409    // Note: A great deal of care is taken to ensure that all event handlers are
410    // non-asynchronous and that the only "await" points are from the select
411    // macro picking which event to handle. This ensures that the event loop is
412    // able to process events at a high speed and reduce the chance for building
413    // up a backlog of events to process.
414    pub async fn start(mut self) {
415        info!("State-Synchronizer started");
416
417        self.config.pinned_checkpoints.sort();
418
419        let mut interval = tokio::time::interval(self.config.interval_period());
420        let mut peer_events = {
421            let (subscriber, peers) = self.network.subscribe().unwrap();
422            for peer_id in peers {
423                self.spawn_get_latest_from_peer(peer_id);
424            }
425            subscriber
426        };
427        let (
428            target_checkpoint_contents_sequence_sender,
429            target_checkpoint_contents_sequence_receiver,
430        ) = watch::channel(0);
431
432        // Spawn tokio task to update metrics periodically in the background
433        let (_sender, receiver) = oneshot::channel();
434        tokio::spawn(update_checkpoint_watermark_metrics(
435            receiver,
436            self.store.clone(),
437            self.metrics.clone(),
438        ));
439
440        // Start checkpoint contents sync loop.
441        let task = sync_checkpoint_contents(
442            self.network.clone(),
443            self.store.clone(),
444            self.peer_heights.clone(),
445            self.weak_sender.clone(),
446            self.checkpoint_event_sender.clone(),
447            self.config.checkpoint_content_download_concurrency(),
448            self.config.checkpoint_content_download_tx_concurrency(),
449            self.config.checkpoint_content_timeout(),
450            target_checkpoint_contents_sequence_receiver,
451        );
452        let task_handle = self.tasks.spawn(task);
453        self.sync_checkpoint_contents_task = Some(task_handle);
454
455        // Start archive based checkpoint content sync loop.
456        // TODO: Consider switching to sync from archive only on startup.
457        // Right now because the peer set is fixed at startup, a node may eventually
458        // end up with peers who have all purged their local state. In such a scenario
459        // it will be stuck until restart when it ends up with a different set
460        // of peers. Once the discovery mechanism can dynamically identify and
461        // connect to other peers on the network, we will rely on sync from
462        // archive as a fall back.
463        let task = sync_checkpoint_contents_from_archive(
464            self.network.clone(),
465            self.archive_readers.clone(),
466            self.store.clone(),
467            self.peer_heights.clone(),
468        );
469        let task_handle = self.tasks.spawn(task);
470        self.sync_checkpoint_from_archive_task = Some(task_handle);
471
472        // Start main loop.
473        loop {
474            tokio::select! {
475                now = interval.tick() => {
476                    // Query the latest checkpoint of connected peers that are on the
477                    // same chain as us. And check if download_limit_layer needs to be pruned or not.
478                    self.handle_tick(now.into_std());
479                },
480                maybe_message = self.mailbox.recv() => {
481                    // Handle StateSyncMessage.
482                    // Once all handles to our mailbox have been dropped this
483                    // will yield `None` and we can terminate the event loop
484                    if let Some(message) = maybe_message {
485                        self.handle_message(message);
486                    } else {
487                        break;
488                    }
489                },
490                peer_event = peer_events.recv() => {
491                    // Handle new and closed peer connections.
492                    self.handle_peer_event(peer_event);
493                },
494                // Resolve the spawned tasks.
495                Some(task_result) = self.tasks.join_next() => {
496                    match task_result {
497                        Ok(()) => {},
498                        Err(e) => {
499                            if e.is_cancelled() {
500                                // Avoid crashing on ungraceful shutdown
501                            } else if e.is_panic() {
502                                // Propagate panics.
503                                std::panic::resume_unwind(e.into_panic());
504                            } else {
505                                panic!("task failed: {e}");
506                            }
507                        },
508                    };
509
510                    // The sync_checkpoint_contents task is expected to run indefinitely.
511                    if matches!(&self.sync_checkpoint_contents_task, Some(t) if t.is_finished()) {
512                        panic!("sync_checkpoint_contents task unexpectedly terminated")
513                    }
514
515                    // Clean up sync_checkpoint_summaries_task if it's finished.
516                    if matches!(&self.sync_checkpoint_summaries_task, Some(t) if t.is_finished()) {
517                        self.sync_checkpoint_summaries_task = None;
518                    }
519
520                    // The sync_checkpoint_from_archive_task task is expected to run indefinitely.
521                    if matches!(&self.sync_checkpoint_from_archive_task, Some(t) if t.is_finished()) {
522                        panic!("sync_checkpoint_from_archive task unexpectedly terminated")
523                    }
524                },
525            }
526
527            self.maybe_start_checkpoint_summary_sync_task();
528            self.maybe_trigger_checkpoint_contents_sync_task(
529                &target_checkpoint_contents_sequence_sender,
530            );
531        }
532
533        info!("State-Synchronizer ended");
534    }
535
536    fn handle_message(&mut self, message: StateSyncMessage) {
537        debug!("Received message: {:?}", message);
538        match message {
539            StateSyncMessage::StartSyncJob => self.maybe_start_checkpoint_summary_sync_task(),
540            StateSyncMessage::VerifiedCheckpoint(checkpoint) => {
541                self.handle_checkpoint_from_consensus(checkpoint)
542            }
543            // After we've successfully synced a checkpoint we can notify our peers
544            StateSyncMessage::SyncedCheckpoint(checkpoint) => {
545                self.spawn_notify_peers_of_checkpoint(*checkpoint)
546            }
547        }
548    }
549
550    // Handle a checkpoint that we received from consensus
551    #[instrument(level = "debug", skip_all)]
552    fn handle_checkpoint_from_consensus(&mut self, checkpoint: Box<VerifiedCheckpoint>) {
553        // Always check previous_digest matches in case there is a gap between
554        // state sync and consensus.
555        let prev_digest = *self.store.get_checkpoint_by_sequence_number(checkpoint.sequence_number() - 1)
556            .expect("store operation should not fail")
557            .unwrap_or_else(|| panic!("Got checkpoint {} from consensus but cannot find checkpoint {} in certified_checkpoints", checkpoint.sequence_number(), checkpoint.sequence_number() - 1))
558            .digest();
559        if checkpoint.previous_digest != Some(prev_digest) {
560            panic!(
561                "Checkpoint {} from consensus has mismatched previous_digest, expected: {:?}, actual: {:?}",
562                checkpoint.sequence_number(),
563                Some(prev_digest),
564                checkpoint.previous_digest
565            );
566        }
567
568        let latest_checkpoint = self
569            .store
570            .get_highest_verified_checkpoint()
571            .expect("store operation should not fail");
572
573        // If this is an older checkpoint, just ignore it
574        if latest_checkpoint.sequence_number() >= checkpoint.sequence_number() {
575            return;
576        }
577
578        let checkpoint = *checkpoint;
579        let next_sequence_number = latest_checkpoint.sequence_number().checked_add(1).unwrap();
580        if *checkpoint.sequence_number() > next_sequence_number {
581            debug!(
582                "consensus sent too new of a checkpoint, expecting: {}, got: {}",
583                next_sequence_number,
584                checkpoint.sequence_number()
585            );
586        }
587
588        // Because checkpoint from consensus sends in order, when we have checkpoint n,
589        // we must have all of the checkpoints before n from either state sync or
590        // consensus.
591        #[cfg(debug_assertions)]
592        {
593            let _ = (next_sequence_number..=*checkpoint.sequence_number())
594                .map(|n| {
595                    let checkpoint = self
596                        .store
597                        .get_checkpoint_by_sequence_number(n)
598                        .expect("store operation should not fail")
599                        .unwrap_or_else(|| panic!("store should contain checkpoint {n}"));
600                    self.store
601                        .get_full_checkpoint_contents(&checkpoint.content_digest)
602                        .expect("store operation should not fail")
603                        .unwrap_or_else(|| {
604                            panic!(
605                                "store should contain checkpoint contents for {:?}",
606                                checkpoint.content_digest
607                            )
608                        });
609                })
610                .collect::<Vec<_>>();
611        }
612
613        // If this is the last checkpoint of an epoch, we need to make sure
614        // new committee is in store before we verify newer checkpoints in next epoch.
615        // This could happen before this validator's reconfiguration finishes, because
616        // state sync does not reconfig.
617        // TODO: make CheckpointAggregator use WriteStore so we don't need to do this
618        // committee insertion in two places (only in `insert_checkpoint`).
619        if let Some(EndOfEpochData {
620            next_epoch_committee,
621            ..
622        }) = checkpoint.end_of_epoch_data.as_ref()
623        {
624            let next_committee = next_epoch_committee.iter().cloned().collect();
625            let committee =
626                Committee::new(checkpoint.epoch().checked_add(1).unwrap(), next_committee);
627            self.store
628                .insert_committee(committee)
629                .expect("store operation should not fail");
630        }
631
632        self.store
633            .update_highest_verified_checkpoint(&checkpoint)
634            .expect("store operation should not fail");
635        self.store
636            .update_highest_synced_checkpoint(&checkpoint)
637            .expect("store operation should not fail");
638
639        // We don't care if no one is listening as this is a broadcast channel
640        let _ = self.checkpoint_event_sender.send(checkpoint.clone());
641
642        // Notify connected peers of the new checkpoint
643        self.spawn_notify_peers_of_checkpoint(checkpoint);
644    }
645
646    fn handle_peer_event(
647        &mut self,
648        peer_event: Result<PeerEvent, tokio::sync::broadcast::error::RecvError>,
649    ) {
650        use tokio::sync::broadcast::error::RecvError;
651
652        match peer_event {
653            Ok(PeerEvent::NewPeer(peer_id)) => {
654                self.spawn_get_latest_from_peer(peer_id);
655            }
656            Ok(PeerEvent::LostPeer(peer_id, _)) => {
657                self.peer_heights.write().unwrap().peers.remove(&peer_id);
658            }
659
660            Err(RecvError::Closed) => {
661                panic!("PeerEvent channel shouldn't be able to be closed");
662            }
663
664            Err(RecvError::Lagged(_)) => {
665                trace!("State-Sync fell behind processing PeerEvents");
666            }
667        }
668    }
669
670    fn spawn_get_latest_from_peer(&mut self, peer_id: PeerId) {
671        if let Some(peer) = self.network.peer(peer_id) {
672            let genesis_checkpoint_digest = *self
673                .store
674                .get_checkpoint_by_sequence_number(0)
675                .expect("store operation should not fail")
676                .expect("store should contain genesis checkpoint")
677                .digest();
678            let task = get_latest_from_peer(
679                genesis_checkpoint_digest,
680                peer,
681                self.peer_heights.clone(),
682                self.config.timeout(),
683            );
684            self.tasks.spawn(task);
685        }
686    }
687
688    fn handle_tick(&mut self, _now: std::time::Instant) {
689        let task = query_peers_for_their_latest_checkpoint(
690            self.network.clone(),
691            self.peer_heights.clone(),
692            self.weak_sender.clone(),
693            self.config.timeout(),
694        );
695        self.tasks.spawn(task);
696
697        if let Some(layer) = self.download_limit_layer.as_ref() {
698            layer.maybe_prune_map();
699        }
700    }
701
702    /// Starts syncing checkpoint summaries if there are peers that have a
703    /// higher known checkpoint than us. Only one sync task is allowed to
704    /// run at a time.
705    fn maybe_start_checkpoint_summary_sync_task(&mut self) {
706        // Only run one sync task at a time
707        if self.sync_checkpoint_summaries_task.is_some() {
708            return;
709        }
710
711        let highest_processed_checkpoint = self
712            .store
713            .get_highest_verified_checkpoint()
714            .expect("store operation should not fail");
715
716        let highest_known_checkpoint = self
717            .peer_heights
718            .read()
719            .unwrap()
720            .highest_known_checkpoint()
721            .cloned();
722
723        if Some(highest_processed_checkpoint.sequence_number())
724            < highest_known_checkpoint
725                .as_ref()
726                .map(|x| x.sequence_number())
727        {
728            // Start sync job
729            let task = sync_to_checkpoint(
730                self.network.clone(),
731                self.store.clone(),
732                self.peer_heights.clone(),
733                self.metrics.clone(),
734                self.config.pinned_checkpoints.clone(),
735                self.config.checkpoint_header_download_concurrency(),
736                self.config.timeout(),
737                // The if condition should ensure that this is Some
738                highest_known_checkpoint.unwrap(),
739            )
740            .map(|result| match result {
741                Ok(()) => {}
742                Err(e) => {
743                    debug!("error syncing checkpoint {e}");
744                }
745            });
746            let task_handle = self.tasks.spawn(task);
747            self.sync_checkpoint_summaries_task = Some(task_handle);
748        }
749    }
750
751    /// Triggers the checkpoint contents sync task if
752    /// highest_verified_checkpoint > highest_synced_checkpoint and there
753    /// are peers that have highest_known_checkpoint >
754    /// highest_synced_checkpoint.
755    fn maybe_trigger_checkpoint_contents_sync_task(
756        &mut self,
757        target_sequence_channel: &watch::Sender<CheckpointSequenceNumber>,
758    ) {
759        let highest_verified_checkpoint = self
760            .store
761            .get_highest_verified_checkpoint()
762            .expect("store operation should not fail");
763        let highest_synced_checkpoint = self
764            .store
765            .get_highest_synced_checkpoint()
766            .expect("store operation should not fail");
767
768        if highest_verified_checkpoint.sequence_number()
769            > highest_synced_checkpoint.sequence_number()
770            // Skip if we aren't connected to any peers that can help
771            && self
772                .peer_heights
773                .read()
774                .unwrap()
775                .highest_known_checkpoint_sequence_number()
776                > Some(*highest_synced_checkpoint.sequence_number())
777        {
778            let _ = target_sequence_channel.send_if_modified(|num| {
779                let new_num = *highest_verified_checkpoint.sequence_number();
780                if *num == new_num {
781                    return false;
782                }
783                *num = new_num;
784                true
785            });
786        }
787    }
788
789    fn spawn_notify_peers_of_checkpoint(&mut self, checkpoint: VerifiedCheckpoint) {
790        let task = notify_peers_of_checkpoint(
791            self.network.clone(),
792            self.peer_heights.clone(),
793            checkpoint,
794            self.config.timeout(),
795        );
796        self.tasks.spawn(task);
797    }
798}
799
800/// Sends a notification of the new checkpoint to all connected peers that are
801/// on the same chain as us.
802async fn notify_peers_of_checkpoint(
803    network: anemo::Network,
804    peer_heights: Arc<RwLock<PeerHeights>>,
805    checkpoint: VerifiedCheckpoint,
806    timeout: Duration,
807) {
808    let futs = peer_heights
809        .read()
810        .unwrap()
811        // Filter out any peers who is not on the same chain as us
812        .peers_on_same_chain()
813        // Filter out any peers who we know already have a checkpoint higher than this one
814        .filter_map(|(peer_id, info)| {
815            (*checkpoint.sequence_number() > info.height).then_some(peer_id)
816        })
817        // Filter out any peers who we aren't connected with
818        .flat_map(|peer_id| network.peer(*peer_id))
819        .map(StateSyncClient::new)
820        .map(|mut client| {
821            let request = Request::new(checkpoint.inner().clone()).with_timeout(timeout);
822            async move { client.push_checkpoint_summary(request).await }
823        })
824        .collect::<Vec<_>>();
825    futures::future::join_all(futs).await;
826}
827
828/// Queries a peer for their latest PeerStateSyncInfo and
829/// keep the updated info in PeerHeights.
830async fn get_latest_from_peer(
831    our_genesis_checkpoint_digest: CheckpointDigest,
832    peer: anemo::Peer,
833    peer_heights: Arc<RwLock<PeerHeights>>,
834    timeout: Duration,
835) {
836    let peer_id = peer.peer_id();
837    let mut client = StateSyncClient::new(peer);
838
839    let info = {
840        let maybe_info = peer_heights.read().unwrap().peers.get(&peer_id).copied();
841
842        if let Some(info) = maybe_info {
843            info
844        } else {
845            // TODO do we want to create a new API just for querying a node's chainid?
846            //
847            // We need to query this node's genesis checkpoint to see if they're on the same
848            // chain as us
849            let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(0))
850                .with_timeout(timeout);
851            let response = client
852                .get_checkpoint_summary(request)
853                .await
854                .map(Response::into_inner);
855
856            let info = match response {
857                Ok(Some(checkpoint)) => {
858                    let digest = *checkpoint.digest();
859                    PeerStateSyncInfo {
860                        genesis_checkpoint_digest: digest,
861                        on_same_chain_as_us: our_genesis_checkpoint_digest == digest,
862                        height: *checkpoint.sequence_number(),
863                        lowest: CheckpointSequenceNumber::default(),
864                    }
865                }
866                Ok(None) => PeerStateSyncInfo {
867                    genesis_checkpoint_digest: CheckpointDigest::default(),
868                    on_same_chain_as_us: false,
869                    height: CheckpointSequenceNumber::default(),
870                    lowest: CheckpointSequenceNumber::default(),
871                },
872                Err(status) => {
873                    trace!("get_latest_checkpoint_summary request failed: {status:?}");
874                    return;
875                }
876            };
877            peer_heights
878                .write()
879                .unwrap()
880                .insert_peer_info(peer_id, info);
881            info
882        }
883    };
884
885    // Bail early if this node isn't on the same chain as us
886    if !info.on_same_chain_as_us {
887        trace!(?info, "Peer {peer_id} not on same chain as us");
888        return;
889    }
890    let Some((highest_checkpoint, low_watermark)) =
891        query_peer_for_latest_info(&mut client, timeout).await
892    else {
893        return;
894    };
895    peer_heights
896        .write()
897        .unwrap()
898        .update_peer_info(peer_id, highest_checkpoint, low_watermark);
899}
900
901/// Queries a peer for their highest_synced_checkpoint and low checkpoint
902/// watermark
903async fn query_peer_for_latest_info(
904    client: &mut StateSyncClient<anemo::Peer>,
905    timeout: Duration,
906) -> Option<(Checkpoint, Option<CheckpointSequenceNumber>)> {
907    let request = Request::new(()).with_timeout(timeout);
908    let response = client
909        .get_checkpoint_availability(request)
910        .await
911        .map(Response::into_inner);
912    match response {
913        Ok(GetCheckpointAvailabilityResponse {
914            highest_synced_checkpoint,
915            lowest_available_checkpoint,
916        }) => {
917            return Some((highest_synced_checkpoint, Some(lowest_available_checkpoint)));
918        }
919        Err(status) => {
920            // If peer hasn't upgraded they would return 404 NotFound error
921            if status.status() != anemo::types::response::StatusCode::NotFound {
922                trace!("get_checkpoint_availability request failed: {status:?}");
923                return None;
924            }
925        }
926    };
927
928    // Then we try the old query
929    // TODO: Remove this once the new feature stabilizes
930    let request = Request::new(GetCheckpointSummaryRequest::Latest).with_timeout(timeout);
931    let response = client
932        .get_checkpoint_summary(request)
933        .await
934        .map(Response::into_inner);
935    match response {
936        Ok(Some(checkpoint)) => Some((checkpoint, None)),
937        Ok(None) => None,
938        Err(status) => {
939            trace!("get_checkpoint_summary (latest) request failed: {status:?}");
940            None
941        }
942    }
943}
944
945/// Queries and update the latest checkpoint of connected peers that are on the
946/// same chain as us. If the received highest checkpoint of any peer is higher
947/// than the current one, we will start syncing via
948/// StateSyncMessage::StartSyncJob.
949#[instrument(level = "debug", skip_all)]
950async fn query_peers_for_their_latest_checkpoint(
951    network: anemo::Network,
952    peer_heights: Arc<RwLock<PeerHeights>>,
953    sender: mpsc::WeakSender<StateSyncMessage>,
954    timeout: Duration,
955) {
956    let peer_heights = &peer_heights;
957    let futs = peer_heights
958        .read()
959        .unwrap()
960        .peers_on_same_chain()
961        // Filter out any peers who we aren't connected with
962        .flat_map(|(peer_id, _info)| network.peer(*peer_id))
963        .map(|peer| {
964            let peer_id = peer.peer_id();
965            let mut client = StateSyncClient::new(peer);
966
967            async move {
968                let response = query_peer_for_latest_info(&mut client, timeout).await;
969                match response {
970                    Some((highest_checkpoint, low_watermark)) => peer_heights
971                        .write()
972                        .unwrap()
973                        .update_peer_info(peer_id, highest_checkpoint.clone(), low_watermark)
974                        .then_some(highest_checkpoint),
975                    None => None,
976                }
977            }
978        })
979        .collect::<Vec<_>>();
980
981    debug!("Query {} peers for latest checkpoint", futs.len());
982
983    let checkpoints = futures::future::join_all(futs).await.into_iter().flatten();
984
985    let highest_checkpoint = checkpoints.max_by_key(|checkpoint| *checkpoint.sequence_number());
986
987    let our_highest_checkpoint = peer_heights
988        .read()
989        .unwrap()
990        .highest_known_checkpoint()
991        .cloned();
992
993    debug!(
994        "Our highest checkpoint {:?}, peers highest checkpoint {:?}",
995        our_highest_checkpoint.as_ref().map(|c| c.sequence_number()),
996        highest_checkpoint.as_ref().map(|c| c.sequence_number())
997    );
998
999    let _new_checkpoint = match (highest_checkpoint, our_highest_checkpoint) {
1000        (Some(theirs), None) => theirs,
1001        (Some(theirs), Some(ours)) if theirs.sequence_number() > ours.sequence_number() => theirs,
1002        _ => return,
1003    };
1004
1005    if let Some(sender) = sender.upgrade() {
1006        let _ = sender.send(StateSyncMessage::StartSyncJob).await;
1007    }
1008}
1009
1010/// Queries connected peers for checkpoints from sequence
1011/// current+1 to the target. The received checkpoints will be verified and
1012/// stored in the store. Checkpoints in temporary store (peer_heights) will be
1013/// cleaned up after syncing.
1014async fn sync_to_checkpoint<S>(
1015    network: anemo::Network,
1016    store: S,
1017    peer_heights: Arc<RwLock<PeerHeights>>,
1018    metrics: Metrics,
1019    pinned_checkpoints: Vec<(CheckpointSequenceNumber, CheckpointDigest)>,
1020    checkpoint_header_download_concurrency: usize,
1021    timeout: Duration,
1022    checkpoint: Checkpoint,
1023) -> Result<()>
1024where
1025    S: WriteStore,
1026{
1027    metrics.set_highest_known_checkpoint(*checkpoint.sequence_number());
1028
1029    let mut current = store
1030        .get_highest_verified_checkpoint()
1031        .expect("store operation should not fail");
1032    if current.sequence_number() >= checkpoint.sequence_number() {
1033        return Err(anyhow::anyhow!(
1034            "target checkpoint {} is older than highest verified checkpoint {}",
1035            checkpoint.sequence_number(),
1036            current.sequence_number(),
1037        ));
1038    }
1039
1040    let peer_balancer = PeerBalancer::new(
1041        &network,
1042        peer_heights.clone(),
1043        PeerCheckpointRequestType::Summary,
1044    );
1045    // Range of the next sequence_numbers to fetch
1046    let mut request_stream = (current.sequence_number().checked_add(1).unwrap()
1047        ..=*checkpoint.sequence_number())
1048        .map(|next| {
1049            let peers = peer_balancer.clone().with_checkpoint(next);
1050            let peer_heights = peer_heights.clone();
1051            let pinned_checkpoints = &pinned_checkpoints;
1052            async move {
1053                if let Some(checkpoint) = peer_heights
1054                    .read()
1055                    .unwrap()
1056                    .get_checkpoint_by_sequence_number(next)
1057                {
1058                    return (Some(checkpoint.to_owned()), next, None);
1059                }
1060
1061                // Iterate through peers trying each one in turn until we're able to
1062                // successfully get the target checkpoint
1063                for mut peer in peers {
1064                    let request = Request::new(GetCheckpointSummaryRequest::BySequenceNumber(next))
1065                        .with_timeout(timeout);
1066                    if let Some(checkpoint) = peer
1067                        .get_checkpoint_summary(request)
1068                        .await
1069                        .tap_err(|e| trace!("{e:?}"))
1070                        .ok()
1071                        .and_then(Response::into_inner)
1072                        .tap_none(|| trace!("peer unable to help sync"))
1073                    {
1074                        // Peer didn't give us a checkpoint with the height that we requested
1075                        if *checkpoint.sequence_number() != next {
1076                            tracing::debug!(
1077                                "peer returned checkpoint with wrong sequence number: expected {next}, got {}",
1078                                checkpoint.sequence_number()
1079                            );
1080                            continue;
1081                        }
1082
1083                        // Peer gave us a checkpoint whose digest does not match pinned digest
1084                        let checkpoint_digest = checkpoint.digest();
1085                        if let Ok(pinned_digest_index) = pinned_checkpoints.binary_search_by_key(
1086                            checkpoint.sequence_number(),
1087                            |(seq_num, _digest)| *seq_num
1088                        ) {
1089                            if pinned_checkpoints[pinned_digest_index].1 != *checkpoint_digest {
1090                                tracing::debug!(
1091                                    "peer returned checkpoint with digest that does not match pinned digest: expected {:?}, got {:?}",
1092                                    pinned_checkpoints[pinned_digest_index].1,
1093                                    checkpoint_digest
1094                                );
1095                                continue;
1096                            }
1097                        }
1098
1099                        // Insert in our store in the event that things fail and we need to retry
1100                        peer_heights
1101                            .write()
1102                            .unwrap()
1103                            .insert_checkpoint(checkpoint.clone());
1104                        return (Some(checkpoint), next, Some(peer.inner().peer_id()));
1105                    }
1106                }
1107                (None, next, None)
1108            }
1109        })
1110        .pipe(futures::stream::iter)
1111        .buffered(checkpoint_header_download_concurrency);
1112
1113    while let Some((maybe_checkpoint, next, maybe_peer_id)) = request_stream.next().await {
1114        assert_eq!(
1115            current
1116                .sequence_number()
1117                .checked_add(1)
1118                .expect("exhausted u64"),
1119            next
1120        );
1121
1122        // Verify the checkpoint
1123        let checkpoint = 'cp: {
1124            let checkpoint = maybe_checkpoint.ok_or_else(|| {
1125                anyhow::anyhow!("no peers were able to help sync checkpoint {next}")
1126            })?;
1127            // Skip verification for manually pinned checkpoints.
1128            if pinned_checkpoints
1129                .binary_search_by_key(checkpoint.sequence_number(), |(seq_num, _digest)| *seq_num)
1130                .is_ok()
1131            {
1132                break 'cp VerifiedCheckpoint::new_unchecked(checkpoint);
1133            }
1134            match verify_checkpoint(&current, &store, checkpoint) {
1135                Ok(verified_checkpoint) => verified_checkpoint,
1136                Err(checkpoint) => {
1137                    let mut peer_heights = peer_heights.write().unwrap();
1138                    // Remove the checkpoint from our temporary store so that we can try querying
1139                    // another peer for a different one
1140                    peer_heights.remove_checkpoint(checkpoint.digest());
1141
1142                    // Mark peer as not on the same chain as us
1143                    if let Some(peer_id) = maybe_peer_id {
1144                        peer_heights.mark_peer_as_not_on_same_chain(peer_id);
1145                    }
1146
1147                    return Err(anyhow::anyhow!(
1148                        "unable to verify checkpoint {checkpoint:?}"
1149                    ));
1150                }
1151            }
1152        };
1153
1154        debug!(checkpoint_seq = ?checkpoint.sequence_number(), "verified checkpoint summary");
1155        if let Some(checkpoint_summary_age_metric) = metrics.checkpoint_summary_age_metric() {
1156            checkpoint.report_checkpoint_age_ms(checkpoint_summary_age_metric);
1157        }
1158
1159        current = checkpoint.clone();
1160        // Insert the newly verified checkpoint into our store, which will bump our
1161        // highest verified checkpoint watermark as well.
1162        store
1163            .insert_checkpoint(&checkpoint)
1164            .expect("store operation should not fail");
1165    }
1166
1167    peer_heights
1168        .write()
1169        .unwrap()
1170        .cleanup_old_checkpoints(*checkpoint.sequence_number());
1171
1172    Ok(())
1173}
1174
1175/// Syncs checkpoint contents from one of the archive_readers if the
1176/// highest_synced_checkpoint < lowest_checkpoint among peers. The requesting
1177/// checkpoint range is from highest_synced_checkpoint+1 to lowest_checkpoint.
1178async fn sync_checkpoint_contents_from_archive<S>(
1179    network: anemo::Network,
1180    archive_readers: ArchiveReaderBalancer,
1181    store: S,
1182    peer_heights: Arc<RwLock<PeerHeights>>,
1183) where
1184    S: WriteStore + Clone + Send + Sync + 'static,
1185{
1186    loop {
1187        // Get connected peers that are on the same chain as us
1188        let peers: Vec<_> = peer_heights
1189            .read()
1190            .unwrap()
1191            .peers_on_same_chain()
1192            // Filter out any peers who we aren't connected with.
1193            .filter_map(|(peer_id, info)| network.peer(*peer_id).map(|peer| (peer, *info)))
1194            .collect();
1195        let lowest_checkpoint_on_peers = peers
1196            .iter()
1197            .map(|(_p, state_sync_info)| state_sync_info.lowest)
1198            .min();
1199        let highest_synced = store
1200            .get_highest_synced_checkpoint()
1201            .expect("store operation should not fail")
1202            .sequence_number;
1203        let sync_from_archive = if let Some(lowest_checkpoint_on_peers) = lowest_checkpoint_on_peers
1204        {
1205            highest_synced < lowest_checkpoint_on_peers
1206        } else {
1207            false
1208        };
1209        debug!(
1210            "Syncing checkpoint contents from archive: {sync_from_archive},  highest_synced: {highest_synced},  lowest_checkpoint_on_peers: {}",
1211            lowest_checkpoint_on_peers.map_or_else(|| "None".to_string(), |l| l.to_string())
1212        );
1213        if sync_from_archive {
1214            let start = highest_synced
1215                .checked_add(1)
1216                .expect("Checkpoint seq num overflow");
1217            let checkpoint_range = start..lowest_checkpoint_on_peers.unwrap();
1218            if let Some(archive_reader) = archive_readers
1219                .pick_one_random(checkpoint_range.clone())
1220                .await
1221            {
1222                let txn_counter = Arc::new(AtomicU64::new(0));
1223                let checkpoint_counter = Arc::new(AtomicU64::new(0));
1224                if let Err(err) = archive_reader
1225                    .read(
1226                        store.clone(),
1227                        checkpoint_range,
1228                        txn_counter.clone(),
1229                        checkpoint_counter.clone(),
1230                        true,
1231                    )
1232                    .await
1233                {
1234                    warn!("State sync from archive failed with error: {:?}", err);
1235                } else {
1236                    info!(
1237                        "State sync from archive is complete. Checkpoints downloaded = {:?}, Txns downloaded = {:?}",
1238                        checkpoint_counter.load(Ordering::Relaxed),
1239                        txn_counter.load(Ordering::Relaxed)
1240                    );
1241                }
1242            } else {
1243                warn!("Failed to find an archive reader to complete the state sync request");
1244            }
1245        }
1246        tokio::time::sleep(Duration::from_secs(5)).await;
1247    }
1248}
1249
1250/// Syncs checkpoint contents from peers if the target sequence cursor, which is
1251/// changed via target_sequence_channel, is greater than the current one. The
1252/// requesting checkpoint range is from current_sequence+1 to
1253/// target_sequence_cursor. It will also periodically notify the peers of our
1254/// latest synced checkpoint.
1255async fn sync_checkpoint_contents<S>(
1256    network: anemo::Network,
1257    store: S,
1258    peer_heights: Arc<RwLock<PeerHeights>>,
1259    sender: mpsc::WeakSender<StateSyncMessage>,
1260    checkpoint_event_sender: broadcast::Sender<VerifiedCheckpoint>,
1261    checkpoint_content_download_concurrency: usize,
1262    checkpoint_content_download_tx_concurrency: u64,
1263    timeout: Duration,
1264    mut target_sequence_channel: watch::Receiver<CheckpointSequenceNumber>,
1265) where
1266    S: WriteStore + Clone,
1267{
1268    let mut highest_synced = store
1269        .get_highest_synced_checkpoint()
1270        .expect("store operation should not fail");
1271
1272    let mut current_sequence = highest_synced.sequence_number().checked_add(1).unwrap();
1273    let mut target_sequence_cursor = 0;
1274    let mut highest_started_network_total_transactions = highest_synced.network_total_transactions;
1275    let mut checkpoint_contents_tasks = FuturesOrdered::new();
1276
1277    let mut tx_concurrency_remaining = checkpoint_content_download_tx_concurrency;
1278
1279    loop {
1280        tokio::select! {
1281            result = target_sequence_channel.changed() => {
1282                match result {
1283                    Ok(()) => {
1284                        target_sequence_cursor = (*target_sequence_channel.borrow_and_update()).checked_add(1).unwrap();
1285                    }
1286                    Err(_) => {
1287                        // Watch channel is closed, exit loop.
1288                        return
1289                    }
1290                }
1291            },
1292            Some(maybe_checkpoint) = checkpoint_contents_tasks.next() => {
1293                match maybe_checkpoint {
1294                    Ok(checkpoint) => {
1295                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1296
1297                        store
1298                            .update_highest_synced_checkpoint(&checkpoint)
1299                            .expect("store operation should not fail");
1300                        // We don't care if no one is listening as this is a broadcast channel
1301                        let _ = checkpoint_event_sender.send(checkpoint.clone());
1302                        tx_concurrency_remaining += checkpoint.network_total_transactions - highest_synced.network_total_transactions;
1303                        highest_synced = checkpoint;
1304
1305                    }
1306                    Err(checkpoint) => {
1307                        let _: &VerifiedCheckpoint = &checkpoint;  // type hint
1308                        if let Some(lowest_peer_checkpoint) =
1309                            peer_heights.read().ok().and_then(|x| x.peers.values().map(|state_sync_info| state_sync_info.lowest).min()) {
1310                            if checkpoint.sequence_number() >= &lowest_peer_checkpoint {
1311                                info!("unable to sync contents of checkpoint through state sync {} with lowest peer checkpoint: {}", checkpoint.sequence_number(), lowest_peer_checkpoint);
1312                            }
1313                        } else {
1314                            info!("unable to sync contents of checkpoint through state sync {}", checkpoint.sequence_number());
1315
1316                        }
1317                        // Retry contents sync on failure.
1318                        checkpoint_contents_tasks.push_front(sync_one_checkpoint_contents(
1319                            network.clone(),
1320                            &store,
1321                            peer_heights.clone(),
1322                            timeout,
1323                            checkpoint,
1324                        ));
1325                    }
1326                }
1327            },
1328        }
1329
1330        // Start syncing tasks up to configured concurrency limits.
1331        while current_sequence < target_sequence_cursor
1332            && checkpoint_contents_tasks.len() < checkpoint_content_download_concurrency
1333        {
1334            let next_checkpoint = store
1335                .get_checkpoint_by_sequence_number(current_sequence)
1336                .expect("store operation should not fail")
1337                .expect(
1338                    "BUG: store should have all checkpoints older than highest_verified_checkpoint",
1339                );
1340
1341            // Enforce transaction count concurrency limit.
1342            let tx_count = next_checkpoint.network_total_transactions
1343                - highest_started_network_total_transactions;
1344            if tx_count > tx_concurrency_remaining {
1345                break;
1346            }
1347            tx_concurrency_remaining -= tx_count;
1348
1349            highest_started_network_total_transactions = next_checkpoint.network_total_transactions;
1350            current_sequence += 1;
1351            checkpoint_contents_tasks.push_back(sync_one_checkpoint_contents(
1352                network.clone(),
1353                &store,
1354                peer_heights.clone(),
1355                timeout,
1356                next_checkpoint,
1357            ));
1358        }
1359
1360        if highest_synced.sequence_number() % checkpoint_content_download_concurrency as u64 == 0
1361            || checkpoint_contents_tasks.is_empty()
1362        {
1363            // Periodically notify event loop to notify our peers that we've synced to a new
1364            // checkpoint height
1365            if let Some(sender) = sender.upgrade() {
1366                let message = StateSyncMessage::SyncedCheckpoint(Box::new(highest_synced.clone()));
1367                let _ = sender.send(message).await;
1368            }
1369        }
1370    }
1371}
1372
1373#[instrument(level = "debug", skip_all, fields(sequence_number = ?checkpoint.sequence_number()))]
1374/// Requests a single checkpoint contents from peers if the store does not
1375/// have it.
1376async fn sync_one_checkpoint_contents<S>(
1377    network: anemo::Network,
1378    store: S,
1379    peer_heights: Arc<RwLock<PeerHeights>>,
1380    timeout: Duration,
1381    checkpoint: VerifiedCheckpoint,
1382) -> Result<VerifiedCheckpoint, VerifiedCheckpoint>
1383where
1384    S: WriteStore + Clone,
1385{
1386    debug!("syncing checkpoint contents");
1387
1388    // Check if we already have produced this checkpoint locally. If so, we don't
1389    // need to get it from peers anymore.
1390    if store
1391        .get_highest_synced_checkpoint()
1392        .expect("store operation should not fail")
1393        .sequence_number()
1394        >= checkpoint.sequence_number()
1395    {
1396        debug!("checkpoint was already created via consensus output");
1397        return Ok(checkpoint);
1398    }
1399
1400    // Request checkpoint contents from peers.
1401    let peers = PeerBalancer::new(
1402        &network,
1403        peer_heights.clone(),
1404        PeerCheckpointRequestType::Content,
1405    )
1406    .with_checkpoint(*checkpoint.sequence_number());
1407    let now = tokio::time::Instant::now();
1408    let Some(_contents) = get_full_checkpoint_contents(peers, &store, &checkpoint, timeout).await
1409    else {
1410        // Delay completion in case of error so we don't hammer the network with
1411        // retries.
1412        let duration = peer_heights
1413            .read()
1414            .unwrap()
1415            .wait_interval_when_no_peer_to_sync_content();
1416        if now.elapsed() < duration {
1417            let duration = duration - now.elapsed();
1418            info!("retrying checkpoint sync after {:?}", duration);
1419            tokio::time::sleep(duration).await;
1420        }
1421        return Err(checkpoint);
1422    };
1423    debug!("completed checkpoint contents sync");
1424    Ok(checkpoint)
1425}
1426
1427#[instrument(level = "debug", skip_all)]
1428/// Request the full checkpoint contents from peers if the store does not
1429/// already have it. Requests are sent to peer one by one, until the contents
1430/// are successfully retrieved.
1431async fn get_full_checkpoint_contents<S>(
1432    peers: PeerBalancer,
1433    store: S,
1434    checkpoint: &VerifiedCheckpoint,
1435    timeout: Duration,
1436) -> Option<FullCheckpointContents>
1437where
1438    S: WriteStore,
1439{
1440    let digest = checkpoint.content_digest;
1441    if let Some(contents) = store
1442        .get_full_checkpoint_contents_by_sequence_number(*checkpoint.sequence_number())
1443        .expect("store operation should not fail")
1444        .or_else(|| {
1445            store
1446                .get_full_checkpoint_contents(&digest)
1447                .expect("store operation should not fail")
1448        })
1449    {
1450        debug!("store already contains checkpoint contents");
1451        return Some(contents);
1452    }
1453
1454    // Iterate through our selected peers trying each one in turn until we're able
1455    // to successfully get the target checkpoint
1456    for mut peer in peers {
1457        debug!(
1458            ?timeout,
1459            "requesting checkpoint contents from {}",
1460            peer.inner().peer_id(),
1461        );
1462        let request = Request::new(digest).with_timeout(timeout);
1463        if let Some(contents) = peer
1464            .get_checkpoint_contents(request)
1465            .await
1466            .tap_err(|e| trace!("{e:?}"))
1467            .ok()
1468            .and_then(Response::into_inner)
1469            .tap_none(|| trace!("peer unable to help sync"))
1470        {
1471            if contents.verify_digests(digest).is_ok() {
1472                let verified_contents = VerifiedCheckpointContents::new_unchecked(contents.clone());
1473                store
1474                    .insert_checkpoint_contents(checkpoint, verified_contents)
1475                    .expect("store operation should not fail");
1476                return Some(contents);
1477            }
1478        }
1479    }
1480    debug!("no peers had checkpoint contents");
1481    None
1482}
1483
1484async fn update_checkpoint_watermark_metrics<S>(
1485    mut recv: oneshot::Receiver<()>,
1486    store: S,
1487    metrics: Metrics,
1488) -> Result<()>
1489where
1490    S: WriteStore + Clone + Send + Sync,
1491{
1492    let mut interval = tokio::time::interval(Duration::from_secs(5));
1493    loop {
1494        tokio::select! {
1495             _now = interval.tick() => {
1496                let highest_verified_checkpoint = store.get_highest_verified_checkpoint()
1497                    .expect("store operation should not fail");
1498                metrics.set_highest_verified_checkpoint(highest_verified_checkpoint.sequence_number);
1499                let highest_synced_checkpoint = store.get_highest_synced_checkpoint()
1500                    .expect("store operation should not fail");
1501                metrics.set_highest_synced_checkpoint(highest_synced_checkpoint.sequence_number);
1502             },
1503            _ = &mut recv => break,
1504        }
1505    }
1506    Ok(())
1507}