iota_core/checkpoints/
checkpoint_output.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use iota_types::{
9    base_types::AuthorityName,
10    error::IotaResult,
11    message_envelope::Message,
12    messages_checkpoint::{
13        CertifiedCheckpointSummary, CheckpointContents, CheckpointSignatureMessage,
14        CheckpointSummary, SignedCheckpointSummary, VerifiedCheckpoint,
15    },
16    messages_consensus::ConsensusTransaction,
17};
18use tracing::{debug, info, instrument, trace};
19
20use super::{CheckpointMetrics, CheckpointStore};
21use crate::{
22    authority::{StableSyncAuthoritySigner, authority_per_epoch_store::AuthorityPerEpochStore},
23    consensus_adapter::SubmitToConsensus,
24    epoch::reconfiguration::ReconfigurationInitiator,
25};
26
27#[async_trait]
28pub trait CheckpointOutput: Sync + Send + 'static {
29    async fn checkpoint_created(
30        &self,
31        summary: &CheckpointSummary,
32        contents: &CheckpointContents,
33        epoch_store: &Arc<AuthorityPerEpochStore>,
34        checkpoint_store: &Arc<CheckpointStore>,
35    ) -> IotaResult;
36}
37
38#[async_trait]
39pub trait CertifiedCheckpointOutput: Sync + Send + 'static {
40    async fn certified_checkpoint_created(
41        &self,
42        summary: &CertifiedCheckpointSummary,
43    ) -> IotaResult;
44}
45
46pub struct SubmitCheckpointToConsensus<T> {
47    pub sender: T,
48    pub signer: StableSyncAuthoritySigner,
49    pub authority: AuthorityName,
50    pub next_reconfiguration_timestamp_ms: u64,
51    pub metrics: Arc<CheckpointMetrics>,
52}
53
54pub struct LogCheckpointOutput;
55
56impl LogCheckpointOutput {
57    pub fn boxed() -> Box<dyn CheckpointOutput> {
58        Box::new(Self)
59    }
60
61    pub fn boxed_certified() -> Box<dyn CertifiedCheckpointOutput> {
62        Box::new(Self)
63    }
64}
65
66#[async_trait]
67impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
68    for SubmitCheckpointToConsensus<T>
69{
70    #[instrument(level = "debug", skip_all)]
71    async fn checkpoint_created(
72        &self,
73        summary: &CheckpointSummary,
74        contents: &CheckpointContents,
75        epoch_store: &Arc<AuthorityPerEpochStore>,
76        checkpoint_store: &Arc<CheckpointStore>,
77    ) -> IotaResult {
78        LogCheckpointOutput
79            .checkpoint_created(summary, contents, epoch_store, checkpoint_store)
80            .await?;
81
82        let checkpoint_timestamp = summary.timestamp_ms;
83        let checkpoint_seq = summary.sequence_number;
84        self.metrics.checkpoint_creation_latency.observe(
85            summary
86                .timestamp()
87                .elapsed()
88                .unwrap_or_default()
89                .as_secs_f64(),
90        );
91
92        let highest_verified_checkpoint = checkpoint_store
93            .get_highest_verified_checkpoint()?
94            .map(|x| *x.sequence_number());
95
96        if Some(checkpoint_seq) > highest_verified_checkpoint {
97            debug!(
98                "Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {checkpoint_timestamp}.
99                {}ms left till end of epoch at timestamp {}",
100                self.next_reconfiguration_timestamp_ms.saturating_sub(checkpoint_timestamp), self.next_reconfiguration_timestamp_ms
101            );
102
103            let summary = SignedCheckpointSummary::new(
104                epoch_store.epoch(),
105                summary.clone(),
106                &*self.signer,
107                self.authority,
108            );
109
110            let message = CheckpointSignatureMessage { summary };
111            let transaction = ConsensusTransaction::new_checkpoint_signature_message(message);
112            self.sender
113                .submit_to_consensus(&[transaction], epoch_store)?;
114            self.metrics
115                .last_sent_checkpoint_signature
116                .set(checkpoint_seq as i64);
117        } else {
118            debug!(
119                "Checkpoint at sequence {checkpoint_seq} is already certified, skipping signature submission to consensus",
120            );
121            self.metrics
122                .last_skipped_checkpoint_signature_submission
123                .set(checkpoint_seq as i64);
124        }
125
126        // If scoring is enabled in protocol config, we also send misbehavior reports to
127        // consensus at this point. Misbehavior reports containing proofs of
128        // misbehaviour can be send whenever the misbehavior is detected, but we
129        // choose to send the ones that include only unprovable counts at this
130        // point, due to periodicity reasons and to ensure a (approximate)
131        // synchronization with the score updates.
132        if epoch_store.protocol_config().calculate_validator_scores() {
133            let misbehavior_report = epoch_store.scorer.current_local_metrics_count.to_report();
134            let transaction = ConsensusTransaction::new_misbehavior_report(
135                epoch_store.name,
136                &misbehavior_report,
137                checkpoint_seq,
138            );
139            info!(?transaction, "submitting misbehavior report to consensus");
140            self.sender
141                .submit_to_consensus(&[transaction], epoch_store)?;
142        }
143
144        if checkpoint_timestamp >= self.next_reconfiguration_timestamp_ms {
145            // close_epoch is ok if called multiple times
146            self.sender.close_epoch(epoch_store);
147        }
148        Ok(())
149    }
150}
151
152#[async_trait]
153impl CheckpointOutput for LogCheckpointOutput {
154    async fn checkpoint_created(
155        &self,
156        summary: &CheckpointSummary,
157        contents: &CheckpointContents,
158        _epoch_store: &Arc<AuthorityPerEpochStore>,
159        _checkpoint_store: &Arc<CheckpointStore>,
160    ) -> IotaResult {
161        trace!(
162            "Including following transactions in checkpoint {}: {:?}",
163            summary.sequence_number, contents
164        );
165        info!(
166            "Creating checkpoint {:?} at epoch {}, sequence {}, previous digest {:?}, transactions count {}, content digest {:?}, end_of_epoch_data {:?}",
167            summary.digest(),
168            summary.epoch,
169            summary.sequence_number,
170            summary.previous_digest,
171            contents.size(),
172            summary.content_digest,
173            summary.end_of_epoch_data,
174        );
175
176        Ok(())
177    }
178}
179
180#[async_trait]
181impl CertifiedCheckpointOutput for LogCheckpointOutput {
182    async fn certified_checkpoint_created(
183        &self,
184        summary: &CertifiedCheckpointSummary,
185    ) -> IotaResult {
186        debug!(
187            "Certified checkpoint with sequence {} and digest {}",
188            summary.sequence_number,
189            summary.digest()
190        );
191        Ok(())
192    }
193}
194
195pub struct SendCheckpointToStateSync {
196    handle: iota_network::state_sync::Handle,
197}
198
199impl SendCheckpointToStateSync {
200    pub fn new(handle: iota_network::state_sync::Handle) -> Self {
201        Self { handle }
202    }
203}
204
205#[async_trait]
206impl CertifiedCheckpointOutput for SendCheckpointToStateSync {
207    #[instrument(level = "trace", name = "checkpoint_created_from_consensus", skip_all)]
208    async fn certified_checkpoint_created(
209        &self,
210        summary: &CertifiedCheckpointSummary,
211    ) -> IotaResult {
212        debug!(
213            "Certified checkpoint with sequence {} and digest {}",
214            summary.sequence_number,
215            summary.digest()
216        );
217        self.handle
218            .send_checkpoint(VerifiedCheckpoint::new_unchecked(summary.to_owned()))
219            .await;
220
221        Ok(())
222    }
223}