1use std::sync::Arc;
6
7use async_trait::async_trait;
8use iota_types::{
9 base_types::AuthorityName,
10 error::IotaResult,
11 message_envelope::Message,
12 messages_checkpoint::{
13 CertifiedCheckpointSummary, CheckpointContents, CheckpointSignatureMessage,
14 CheckpointSummary, SignedCheckpointSummary, VerifiedCheckpoint,
15 },
16 messages_consensus::ConsensusTransaction,
17};
18use tracing::{debug, info, instrument, trace};
19
20use super::{CheckpointMetrics, CheckpointStore};
21use crate::{
22 authority::{StableSyncAuthoritySigner, authority_per_epoch_store::AuthorityPerEpochStore},
23 consensus_adapter::SubmitToConsensus,
24 epoch::reconfiguration::ReconfigurationInitiator,
25};
26
27#[async_trait]
28pub trait CheckpointOutput: Sync + Send + 'static {
29 async fn checkpoint_created(
30 &self,
31 summary: &CheckpointSummary,
32 contents: &CheckpointContents,
33 epoch_store: &Arc<AuthorityPerEpochStore>,
34 checkpoint_store: &Arc<CheckpointStore>,
35 ) -> IotaResult;
36}
37
38#[async_trait]
39pub trait CertifiedCheckpointOutput: Sync + Send + 'static {
40 async fn certified_checkpoint_created(
41 &self,
42 summary: &CertifiedCheckpointSummary,
43 ) -> IotaResult;
44}
45
46pub struct SubmitCheckpointToConsensus<T> {
47 pub sender: T,
48 pub signer: StableSyncAuthoritySigner,
49 pub authority: AuthorityName,
50 pub next_reconfiguration_timestamp_ms: u64,
51 pub metrics: Arc<CheckpointMetrics>,
52}
53
54pub struct LogCheckpointOutput;
55
56impl LogCheckpointOutput {
57 pub fn boxed() -> Box<dyn CheckpointOutput> {
58 Box::new(Self)
59 }
60
61 pub fn boxed_certified() -> Box<dyn CertifiedCheckpointOutput> {
62 Box::new(Self)
63 }
64}
65
66#[async_trait]
67impl<T: SubmitToConsensus + ReconfigurationInitiator> CheckpointOutput
68 for SubmitCheckpointToConsensus<T>
69{
70 #[instrument(level = "debug", skip_all)]
71 async fn checkpoint_created(
72 &self,
73 summary: &CheckpointSummary,
74 contents: &CheckpointContents,
75 epoch_store: &Arc<AuthorityPerEpochStore>,
76 checkpoint_store: &Arc<CheckpointStore>,
77 ) -> IotaResult {
78 LogCheckpointOutput
79 .checkpoint_created(summary, contents, epoch_store, checkpoint_store)
80 .await?;
81
82 let checkpoint_timestamp = summary.timestamp_ms;
83 let checkpoint_seq = summary.sequence_number;
84 self.metrics.checkpoint_creation_latency.observe(
85 summary
86 .timestamp()
87 .elapsed()
88 .unwrap_or_default()
89 .as_secs_f64(),
90 );
91
92 let highest_verified_checkpoint = checkpoint_store
93 .get_highest_verified_checkpoint()?
94 .map(|x| *x.sequence_number());
95
96 if Some(checkpoint_seq) > highest_verified_checkpoint {
97 debug!(
98 "Sending checkpoint signature at sequence {checkpoint_seq} to consensus, timestamp {checkpoint_timestamp}.
99 {}ms left till end of epoch at timestamp {}",
100 self.next_reconfiguration_timestamp_ms.saturating_sub(checkpoint_timestamp), self.next_reconfiguration_timestamp_ms
101 );
102
103 let summary = SignedCheckpointSummary::new(
104 epoch_store.epoch(),
105 summary.clone(),
106 &*self.signer,
107 self.authority,
108 );
109
110 let message = CheckpointSignatureMessage { summary };
111 let transaction = ConsensusTransaction::new_checkpoint_signature_message(message);
112 self.sender
113 .submit_to_consensus(&[transaction], epoch_store)?;
114 self.metrics
115 .last_sent_checkpoint_signature
116 .set(checkpoint_seq as i64);
117 } else {
118 debug!(
119 "Checkpoint at sequence {checkpoint_seq} is already certified, skipping signature submission to consensus",
120 );
121 self.metrics
122 .last_skipped_checkpoint_signature_submission
123 .set(checkpoint_seq as i64);
124 }
125
126 if epoch_store.protocol_config().calculate_validator_scores() {
133 let misbehavior_report = epoch_store.scorer.current_local_metrics_count.to_report();
134 let transaction = ConsensusTransaction::new_misbehavior_report(
135 epoch_store.name,
136 &misbehavior_report,
137 checkpoint_seq,
138 );
139 info!(?transaction, "submitting misbehavior report to consensus");
140 self.sender
141 .submit_to_consensus(&[transaction], epoch_store)?;
142 }
143
144 if checkpoint_timestamp >= self.next_reconfiguration_timestamp_ms {
145 self.sender.close_epoch(epoch_store);
147 }
148 Ok(())
149 }
150}
151
152#[async_trait]
153impl CheckpointOutput for LogCheckpointOutput {
154 async fn checkpoint_created(
155 &self,
156 summary: &CheckpointSummary,
157 contents: &CheckpointContents,
158 _epoch_store: &Arc<AuthorityPerEpochStore>,
159 _checkpoint_store: &Arc<CheckpointStore>,
160 ) -> IotaResult {
161 trace!(
162 "Including following transactions in checkpoint {}: {:?}",
163 summary.sequence_number, contents
164 );
165 info!(
166 "Creating checkpoint {:?} at epoch {}, sequence {}, previous digest {:?}, transactions count {}, content digest {:?}, end_of_epoch_data {:?}",
167 summary.digest(),
168 summary.epoch,
169 summary.sequence_number,
170 summary.previous_digest,
171 contents.size(),
172 summary.content_digest,
173 summary.end_of_epoch_data,
174 );
175
176 Ok(())
177 }
178}
179
180#[async_trait]
181impl CertifiedCheckpointOutput for LogCheckpointOutput {
182 async fn certified_checkpoint_created(
183 &self,
184 summary: &CertifiedCheckpointSummary,
185 ) -> IotaResult {
186 debug!(
187 "Certified checkpoint with sequence {} and digest {}",
188 summary.sequence_number,
189 summary.digest()
190 );
191 Ok(())
192 }
193}
194
195pub struct SendCheckpointToStateSync {
196 handle: iota_network::state_sync::Handle,
197}
198
199impl SendCheckpointToStateSync {
200 pub fn new(handle: iota_network::state_sync::Handle) -> Self {
201 Self { handle }
202 }
203}
204
205#[async_trait]
206impl CertifiedCheckpointOutput for SendCheckpointToStateSync {
207 #[instrument(level = "trace", name = "checkpoint_created_from_consensus", skip_all)]
208 async fn certified_checkpoint_created(
209 &self,
210 summary: &CertifiedCheckpointSummary,
211 ) -> IotaResult {
212 debug!(
213 "Certified checkpoint with sequence {} and digest {}",
214 summary.sequence_number,
215 summary.digest()
216 );
217 self.handle
218 .send_checkpoint(VerifiedCheckpoint::new_unchecked(summary.to_owned()))
219 .await;
220
221 Ok(())
222 }
223}