1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::Duration,
17};
18
19use chrono::Utc;
20use diffy::create_patch;
21use iota_common::{debug_fatal, fatal};
22use iota_macros::fail_point;
23use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
24use iota_network::default_iota_network_config;
25use iota_protocol_config::ProtocolVersion;
26use iota_types::{
27 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
28 committee::StakeUnit,
29 crypto::AuthorityStrongQuorumSignInfo,
30 digests::{CheckpointContentsDigest, CheckpointDigest},
31 effects::{TransactionEffects, TransactionEffectsAPI},
32 error::{IotaError, IotaResult},
33 event::SystemEpochInfoEvent,
34 executable_transaction::VerifiedExecutableTransaction,
35 gas::GasCostSummary,
36 iota_system_state::{
37 IotaSystemState, IotaSystemStateTrait,
38 epoch_start_iota_system_state::EpochStartSystemStateTrait,
39 },
40 message_envelope::Message,
41 messages_checkpoint::{
42 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
43 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
44 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
45 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
46 VerifiedCheckpointContents,
47 },
48 messages_consensus::ConsensusTransactionKey,
49 signature::GenericSignature,
50 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
51};
52use itertools::Itertools;
53use nonempty::NonEmpty;
54use parking_lot::Mutex;
55use rand::{rngs::OsRng, seq::SliceRandom};
56use serde::{Deserialize, Serialize};
57use tokio::{
58 sync::{Notify, watch},
59 task::JoinSet,
60 time::timeout,
61};
62use tracing::{debug, error, info, instrument, trace, warn};
63use typed_store::{
64 DBMapUtils, Map, TypedStoreError,
65 rocks::{DBMap, MetricConf},
66 traits::{TableSummary, TypedStoreDebug},
67};
68
69pub use crate::checkpoints::{
70 checkpoint_output::{
71 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
72 },
73 metrics::CheckpointMetrics,
74};
75use crate::{
76 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
77 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
78 checkpoints::{
79 causal_order::CausalOrder,
80 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
81 },
82 consensus_handler::SequencedConsensusTransactionKey,
83 execution_cache::TransactionCacheRead,
84 stake_aggregator::{InsertResult, MultiStakeAggregator},
85 state_accumulator::StateAccumulator,
86};
87
88pub type CheckpointHeight = u64;
89
90pub struct EpochStats {
91 pub checkpoint_count: u64,
92 pub transaction_count: u64,
93 pub total_gas_reward: u64,
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub struct PendingCheckpointInfo {
98 pub timestamp_ms: CheckpointTimestamp,
99 pub last_of_epoch: bool,
100 pub checkpoint_height: CheckpointHeight,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum PendingCheckpoint {
105 V1(PendingCheckpointContentsV1),
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct PendingCheckpointContentsV1 {
111 pub roots: Vec<TransactionKey>,
112 pub details: PendingCheckpointInfo,
113}
114
115impl PendingCheckpoint {
116 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
117 match self {
118 PendingCheckpoint::V1(contents) => contents,
119 }
120 }
121
122 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
123 match self {
124 PendingCheckpoint::V1(contents) => contents,
125 }
126 }
127
128 pub fn roots(&self) -> &Vec<TransactionKey> {
129 &self.as_v1().roots
130 }
131
132 pub fn details(&self) -> &PendingCheckpointInfo {
133 &self.as_v1().details
134 }
135
136 pub fn height(&self) -> CheckpointHeight {
137 self.details().checkpoint_height
138 }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
142pub struct BuilderCheckpointSummary {
143 pub summary: CheckpointSummary,
144 pub checkpoint_height: Option<CheckpointHeight>,
146 pub position_in_commit: usize,
147}
148
149#[derive(DBMapUtils)]
150pub struct CheckpointStore {
151 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
153
154 pub(crate) checkpoint_sequence_by_contents_digest:
156 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
157
158 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
162
163 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
165 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
167
168 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
172
173 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
176
177 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
180}
181
182impl CheckpointStore {
183 pub fn new(path: &Path) -> Arc<Self> {
184 Arc::new(Self::open_tables_read_write(
185 path.to_path_buf(),
186 MetricConf::new("checkpoint"),
187 None,
188 None,
189 ))
190 }
191
192 pub fn open_readonly(path: &Path) -> CheckpointStoreReadOnly {
193 Self::get_read_only_handle(
194 path.to_path_buf(),
195 None,
196 None,
197 MetricConf::new("checkpoint_readonly"),
198 )
199 }
200
201 #[instrument(level = "info", skip_all)]
202 pub fn insert_genesis_checkpoint(
203 &self,
204 checkpoint: VerifiedCheckpoint,
205 contents: CheckpointContents,
206 epoch_store: &AuthorityPerEpochStore,
207 ) {
208 assert_eq!(
209 checkpoint.epoch(),
210 0,
211 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
212 );
213 assert_eq!(
214 *checkpoint.sequence_number(),
215 0,
216 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
217 );
218
219 if self
222 .get_checkpoint_by_digest(checkpoint.digest())
223 .unwrap()
224 .is_none()
225 {
226 if epoch_store.epoch() == checkpoint.epoch {
227 epoch_store
228 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
229 .unwrap();
230 } else {
231 debug!(
232 validator_epoch =% epoch_store.epoch(),
233 genesis_epoch =% checkpoint.epoch(),
234 "Not inserting checkpoint builder data for genesis checkpoint",
235 );
236 }
237 self.insert_checkpoint_contents(contents).unwrap();
238 self.insert_verified_checkpoint(&checkpoint).unwrap();
239 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
240 }
241 }
242
243 pub fn get_checkpoint_by_digest(
244 &self,
245 digest: &CheckpointDigest,
246 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
247 self.checkpoint_by_digest
248 .get(digest)
249 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
250 }
251
252 pub fn get_checkpoint_by_sequence_number(
253 &self,
254 sequence_number: CheckpointSequenceNumber,
255 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
256 self.certified_checkpoints
257 .get(&sequence_number)
258 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
259 }
260
261 pub fn get_locally_computed_checkpoint(
262 &self,
263 sequence_number: CheckpointSequenceNumber,
264 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
265 self.locally_computed_checkpoints.get(&sequence_number)
266 }
267
268 pub fn get_sequence_number_by_contents_digest(
269 &self,
270 digest: &CheckpointContentsDigest,
271 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
272 self.checkpoint_sequence_by_contents_digest.get(digest)
273 }
274
275 pub fn delete_contents_digest_sequence_number_mapping(
276 &self,
277 digest: &CheckpointContentsDigest,
278 ) -> Result<(), TypedStoreError> {
279 self.checkpoint_sequence_by_contents_digest.remove(digest)
280 }
281
282 pub fn get_latest_certified_checkpoint(&self) -> Option<VerifiedCheckpoint> {
283 self.certified_checkpoints
284 .unbounded_iter()
285 .skip_to_last()
286 .next()
287 .map(|(_, v)| v.into())
288 }
289
290 pub fn get_latest_locally_computed_checkpoint(&self) -> Option<CheckpointSummary> {
291 self.locally_computed_checkpoints
292 .unbounded_iter()
293 .skip_to_last()
294 .next()
295 .map(|(_, v)| v)
296 }
297
298 pub fn multi_get_checkpoint_by_sequence_number(
299 &self,
300 sequence_numbers: &[CheckpointSequenceNumber],
301 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
302 let checkpoints = self
303 .certified_checkpoints
304 .multi_get(sequence_numbers)?
305 .into_iter()
306 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
307 .collect();
308
309 Ok(checkpoints)
310 }
311
312 pub fn multi_get_checkpoint_content(
313 &self,
314 contents_digest: &[CheckpointContentsDigest],
315 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
316 self.checkpoint_content.multi_get(contents_digest)
317 }
318
319 pub fn get_highest_verified_checkpoint(
320 &self,
321 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
322 let highest_verified = if let Some(highest_verified) =
323 self.watermarks.get(&CheckpointWatermark::HighestVerified)?
324 {
325 highest_verified
326 } else {
327 return Ok(None);
328 };
329 self.get_checkpoint_by_digest(&highest_verified.1)
330 }
331
332 pub fn get_highest_synced_checkpoint(
333 &self,
334 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
335 let highest_synced = if let Some(highest_synced) =
336 self.watermarks.get(&CheckpointWatermark::HighestSynced)?
337 {
338 highest_synced
339 } else {
340 return Ok(None);
341 };
342 self.get_checkpoint_by_digest(&highest_synced.1)
343 }
344
345 pub fn get_highest_executed_checkpoint_seq_number(
346 &self,
347 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
348 if let Some(highest_executed) =
349 self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
350 {
351 Ok(Some(highest_executed.0))
352 } else {
353 Ok(None)
354 }
355 }
356
357 pub fn get_highest_executed_checkpoint(
358 &self,
359 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
360 let highest_executed = if let Some(highest_executed) =
361 self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
362 {
363 highest_executed
364 } else {
365 return Ok(None);
366 };
367 self.get_checkpoint_by_digest(&highest_executed.1)
368 }
369
370 pub fn get_highest_pruned_checkpoint_seq_number(
371 &self,
372 ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
373 Ok(self
374 .watermarks
375 .get(&CheckpointWatermark::HighestPruned)?
376 .unwrap_or_default()
377 .0)
378 }
379
380 pub fn get_checkpoint_contents(
381 &self,
382 digest: &CheckpointContentsDigest,
383 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
384 self.checkpoint_content.get(digest)
385 }
386
387 pub fn get_full_checkpoint_contents_by_sequence_number(
388 &self,
389 seq: CheckpointSequenceNumber,
390 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
391 self.full_checkpoint_content.get(&seq)
392 }
393
394 fn prune_local_summaries(&self) -> IotaResult {
395 if let Some((last_local_summary, _)) = self
396 .locally_computed_checkpoints
397 .unbounded_iter()
398 .skip_to_last()
399 .next()
400 {
401 let mut batch = self.locally_computed_checkpoints.batch();
402 batch.schedule_delete_range(
403 &self.locally_computed_checkpoints,
404 &0,
405 &last_local_summary,
406 )?;
407 batch.write()?;
408 info!("Pruned local summaries up to {:?}", last_local_summary);
409 }
410 Ok(())
411 }
412
413 fn check_for_checkpoint_fork(
414 &self,
415 local_checkpoint: &CheckpointSummary,
416 verified_checkpoint: &VerifiedCheckpoint,
417 ) {
418 if local_checkpoint != verified_checkpoint.data() {
419 let verified_contents = self
420 .get_checkpoint_contents(&verified_checkpoint.content_digest)
421 .map(|opt_contents| {
422 opt_contents
423 .map(|contents| format!("{contents:?}"))
424 .unwrap_or_else(|| {
425 format!(
426 "Verified checkpoint contents not found, digest: {:?}",
427 verified_checkpoint.content_digest,
428 )
429 })
430 })
431 .map_err(|e| {
432 format!(
433 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
434 verified_checkpoint.content_digest, e
435 )
436 })
437 .unwrap_or_else(|err_msg| err_msg);
438
439 let local_contents = self
440 .get_checkpoint_contents(&local_checkpoint.content_digest)
441 .map(|opt_contents| {
442 opt_contents
443 .map(|contents| format!("{contents:?}"))
444 .unwrap_or_else(|| {
445 format!(
446 "Local checkpoint contents not found, digest: {:?}",
447 local_checkpoint.content_digest
448 )
449 })
450 })
451 .map_err(|e| {
452 format!(
453 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
454 local_checkpoint.content_digest, e
455 )
456 })
457 .unwrap_or_else(|err_msg| err_msg);
458
459 error!(
461 verified_checkpoint = ?verified_checkpoint.data(),
462 ?verified_contents,
463 ?local_checkpoint,
464 ?local_contents,
465 "Local checkpoint fork detected!",
466 );
467 fatal!(
468 "Local checkpoint fork detected for sequence number: {}",
469 local_checkpoint.sequence_number()
470 );
471 }
472 }
473
474 pub fn insert_certified_checkpoint(
480 &self,
481 checkpoint: &VerifiedCheckpoint,
482 ) -> Result<(), TypedStoreError> {
483 debug!(
484 checkpoint_seq = checkpoint.sequence_number(),
485 "Inserting certified checkpoint",
486 );
487 let mut batch = self.certified_checkpoints.batch();
488 batch
489 .insert_batch(
490 &self.certified_checkpoints,
491 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
492 )?
493 .insert_batch(
494 &self.checkpoint_by_digest,
495 [(checkpoint.digest(), checkpoint.serializable_ref())],
496 )?;
497 if checkpoint.next_epoch_committee().is_some() {
498 batch.insert_batch(
499 &self.epoch_last_checkpoint_map,
500 [(&checkpoint.epoch(), checkpoint.sequence_number())],
501 )?;
502 }
503 batch.write()?;
504
505 if let Some(local_checkpoint) = self
506 .locally_computed_checkpoints
507 .get(checkpoint.sequence_number())?
508 {
509 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
510 }
511
512 Ok(())
513 }
514
515 #[instrument(level = "debug", skip_all)]
518 pub fn insert_verified_checkpoint(
519 &self,
520 checkpoint: &VerifiedCheckpoint,
521 ) -> Result<(), TypedStoreError> {
522 self.insert_certified_checkpoint(checkpoint)?;
523 self.update_highest_verified_checkpoint(checkpoint)
524 }
525
526 pub fn update_highest_verified_checkpoint(
527 &self,
528 checkpoint: &VerifiedCheckpoint,
529 ) -> Result<(), TypedStoreError> {
530 if Some(*checkpoint.sequence_number())
531 > self
532 .get_highest_verified_checkpoint()?
533 .map(|x| *x.sequence_number())
534 {
535 debug!(
536 checkpoint_seq = checkpoint.sequence_number(),
537 "Updating highest verified checkpoint",
538 );
539 self.watermarks.insert(
540 &CheckpointWatermark::HighestVerified,
541 &(*checkpoint.sequence_number(), *checkpoint.digest()),
542 )?;
543 }
544
545 Ok(())
546 }
547
548 pub fn update_highest_synced_checkpoint(
549 &self,
550 checkpoint: &VerifiedCheckpoint,
551 ) -> Result<(), TypedStoreError> {
552 debug!(
553 checkpoint_seq = checkpoint.sequence_number(),
554 "Updating highest synced checkpoint",
555 );
556 self.watermarks.insert(
557 &CheckpointWatermark::HighestSynced,
558 &(*checkpoint.sequence_number(), *checkpoint.digest()),
559 )
560 }
561
562 pub fn update_highest_executed_checkpoint(
563 &self,
564 checkpoint: &VerifiedCheckpoint,
565 ) -> Result<(), TypedStoreError> {
566 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
567 if seq_number >= *checkpoint.sequence_number() {
568 return Ok(());
569 }
570 assert_eq!(
571 seq_number + 1,
572 *checkpoint.sequence_number(),
573 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
574 checkpoint.sequence_number(),
575 seq_number
576 );
577 }
578 debug!(
579 checkpoint_seq = checkpoint.sequence_number(),
580 "Updating highest executed checkpoint",
581 );
582 self.watermarks.insert(
583 &CheckpointWatermark::HighestExecuted,
584 &(*checkpoint.sequence_number(), *checkpoint.digest()),
585 )
586 }
587
588 pub fn update_highest_pruned_checkpoint(
589 &self,
590 checkpoint: &VerifiedCheckpoint,
591 ) -> Result<(), TypedStoreError> {
592 self.watermarks.insert(
593 &CheckpointWatermark::HighestPruned,
594 &(*checkpoint.sequence_number(), *checkpoint.digest()),
595 )
596 }
597
598 pub fn set_highest_executed_checkpoint_subtle(
604 &self,
605 checkpoint: &VerifiedCheckpoint,
606 ) -> Result<(), TypedStoreError> {
607 self.watermarks.insert(
608 &CheckpointWatermark::HighestExecuted,
609 &(*checkpoint.sequence_number(), *checkpoint.digest()),
610 )
611 }
612
613 pub fn insert_checkpoint_contents(
614 &self,
615 contents: CheckpointContents,
616 ) -> Result<(), TypedStoreError> {
617 debug!(
618 checkpoint_seq = ?contents.digest(),
619 "Inserting checkpoint contents",
620 );
621 self.checkpoint_content.insert(contents.digest(), &contents)
622 }
623
624 pub fn insert_verified_checkpoint_contents(
625 &self,
626 checkpoint: &VerifiedCheckpoint,
627 full_contents: VerifiedCheckpointContents,
628 ) -> Result<(), TypedStoreError> {
629 let mut batch = self.full_checkpoint_content.batch();
630 batch.insert_batch(
631 &self.checkpoint_sequence_by_contents_digest,
632 [(&checkpoint.content_digest, checkpoint.sequence_number())],
633 )?;
634 let full_contents = full_contents.into_inner();
635 batch.insert_batch(
636 &self.full_checkpoint_content,
637 [(checkpoint.sequence_number(), &full_contents)],
638 )?;
639
640 let contents = full_contents.into_checkpoint_contents();
641 assert_eq!(&checkpoint.content_digest, contents.digest());
642
643 batch.insert_batch(&self.checkpoint_content, [(contents.digest(), &contents)])?;
644
645 batch.write()
646 }
647
648 pub fn delete_full_checkpoint_contents(
649 &self,
650 seq: CheckpointSequenceNumber,
651 ) -> Result<(), TypedStoreError> {
652 self.full_checkpoint_content.remove(&seq)
653 }
654
655 pub fn get_epoch_last_checkpoint(
656 &self,
657 epoch_id: EpochId,
658 ) -> IotaResult<Option<VerifiedCheckpoint>> {
659 let seq = self.epoch_last_checkpoint_map.get(&epoch_id)?;
660 let checkpoint = match seq {
661 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
662 None => None,
663 };
664 Ok(checkpoint)
665 }
666
667 pub fn insert_epoch_last_checkpoint(
668 &self,
669 epoch_id: EpochId,
670 checkpoint: &VerifiedCheckpoint,
671 ) -> IotaResult {
672 self.epoch_last_checkpoint_map
673 .insert(&epoch_id, checkpoint.sequence_number())?;
674 Ok(())
675 }
676
677 pub fn get_epoch_state_commitments(
678 &self,
679 epoch: EpochId,
680 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
681 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
682 checkpoint
683 .end_of_epoch_data
684 .as_ref()
685 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
686 .epoch_commitments
687 .clone()
688 });
689 Ok(commitments)
690 }
691
692 pub fn get_epoch_stats(
695 &self,
696 epoch: EpochId,
697 last_checkpoint: &CheckpointSummary,
698 ) -> Option<EpochStats> {
699 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
700 (0, 0)
701 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
702 (
703 checkpoint.sequence_number + 1,
704 checkpoint.network_total_transactions,
705 )
706 } else {
707 return None;
708 };
709 Some(EpochStats {
710 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
711 transaction_count: last_checkpoint.network_total_transactions
712 - prev_epoch_network_transactions,
713 total_gas_reward: last_checkpoint
714 .epoch_rolling_gas_cost_summary
715 .computation_cost,
716 })
717 }
718
719 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
720 self.checkpoint_content
722 .checkpoint_db(path)
723 .map_err(Into::into)
724 }
725
726 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
727 let mut wb = self.watermarks.batch();
728 wb.delete_batch(
729 &self.watermarks,
730 std::iter::once(CheckpointWatermark::HighestExecuted),
731 )?;
732 wb.write()?;
733 Ok(())
734 }
735
736 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
737 self.delete_highest_executed_checkpoint_test_only()?;
738 self.watermarks.rocksdb.flush()?;
739 Ok(())
740 }
741
742 #[instrument(level = "debug", skip_all)]
747 pub async fn reexecute_local_checkpoints(
748 &self,
749 state: &AuthorityState,
750 epoch_store: &AuthorityPerEpochStore,
751 ) {
752 info!("rexecuting locally computed checkpoints for crash recovery");
753 let epoch = epoch_store.epoch();
754 let highest_executed = self
755 .get_highest_executed_checkpoint_seq_number()
756 .expect("get_highest_executed_checkpoint_seq_number should not fail")
757 .unwrap_or(0);
758
759 let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
760 info!("no locally built checkpoints to verify");
761 return;
762 };
763
764 for seq in highest_executed + 1..=*highest_built.sequence_number() {
765 info!(?seq, "Re-executing locally computed checkpoint");
766 let Some(checkpoint) = self
767 .get_locally_computed_checkpoint(seq)
768 .expect("get_locally_computed_checkpoint should not fail")
769 else {
770 panic!("locally computed checkpoint {seq:?} not found");
771 };
772
773 let Some(contents) = self
774 .get_checkpoint_contents(&checkpoint.content_digest)
775 .expect("get_checkpoint_contents should not fail")
776 else {
777 panic!(
778 "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
779 seq, checkpoint.content_digest
780 );
781 };
782
783 let cache = state.get_transaction_cache_reader();
784
785 let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
786 let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
787 let txns = cache
788 .multi_get_transaction_blocks(&tx_digests)
789 .expect("multi_get_transaction_blocks should not fail");
790 for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
791 if tx.is_none() {
792 panic!("transaction {digest:?} not found");
793 }
794 }
795
796 let txns: Vec<_> = txns
797 .into_iter()
798 .map(|tx| tx.unwrap())
799 .zip(fx_digests.into_iter())
800 .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
802 .map(|(tx, fx)| {
803 (
804 VerifiedExecutableTransaction::new_from_checkpoint(
805 (*tx).clone(),
806 epoch,
807 seq,
808 ),
809 fx,
810 )
811 })
812 .collect();
813
814 let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
815
816 info!(
817 ?seq,
818 ?tx_digests,
819 "Re-executing transactions for locally built checkpoint"
820 );
821 state.enqueue_with_expected_effects_digest(txns, epoch_store);
824
825 let waiting_logger = tokio::task::spawn(async move {
829 let mut interval = tokio::time::interval(Duration::from_secs(1));
830 loop {
831 interval.tick().await;
832 warn!(?seq, "Still waiting for re-execution to complete");
833 }
834 });
835
836 cache
837 .notify_read_executed_effects_digests(&tx_digests)
838 .await
839 .expect("notify_read_executed_effects_digests should not fail");
840
841 waiting_logger.abort();
842 waiting_logger.await.ok();
843 info!(?seq, "Re-execution completed for locally built checkpoint");
844 }
845
846 info!("Re-execution of locally built checkpoints completed");
847 }
848}
849
850#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
851pub enum CheckpointWatermark {
852 HighestVerified,
853 HighestSynced,
854 HighestExecuted,
855 HighestPruned,
856}
857
858pub struct CheckpointBuilder {
859 state: Arc<AuthorityState>,
860 tables: Arc<CheckpointStore>,
861 epoch_store: Arc<AuthorityPerEpochStore>,
862 notify: Arc<Notify>,
863 notify_aggregator: Arc<Notify>,
864 last_built: watch::Sender<CheckpointSequenceNumber>,
865 effects_store: Arc<dyn TransactionCacheRead>,
866 accumulator: Weak<StateAccumulator>,
867 output: Box<dyn CheckpointOutput>,
868 metrics: Arc<CheckpointMetrics>,
869 max_transactions_per_checkpoint: usize,
870 max_checkpoint_size_bytes: usize,
871}
872
873pub struct CheckpointAggregator {
874 tables: Arc<CheckpointStore>,
875 epoch_store: Arc<AuthorityPerEpochStore>,
876 notify: Arc<Notify>,
877 current: Option<CheckpointSignatureAggregator>,
878 output: Box<dyn CertifiedCheckpointOutput>,
879 state: Arc<AuthorityState>,
880 metrics: Arc<CheckpointMetrics>,
881}
882
883pub struct CheckpointSignatureAggregator {
885 next_index: u64,
886 summary: CheckpointSummary,
887 digest: CheckpointDigest,
888 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
890 tables: Arc<CheckpointStore>,
891 state: Arc<AuthorityState>,
892 metrics: Arc<CheckpointMetrics>,
893}
894
895impl CheckpointBuilder {
896 fn new(
897 state: Arc<AuthorityState>,
898 tables: Arc<CheckpointStore>,
899 epoch_store: Arc<AuthorityPerEpochStore>,
900 notify: Arc<Notify>,
901 effects_store: Arc<dyn TransactionCacheRead>,
902 accumulator: Weak<StateAccumulator>,
903 output: Box<dyn CheckpointOutput>,
904 notify_aggregator: Arc<Notify>,
905 last_built: watch::Sender<CheckpointSequenceNumber>,
906 metrics: Arc<CheckpointMetrics>,
907 max_transactions_per_checkpoint: usize,
908 max_checkpoint_size_bytes: usize,
909 ) -> Self {
910 Self {
911 state,
912 tables,
913 epoch_store,
914 notify,
915 effects_store,
916 accumulator,
917 output,
918 notify_aggregator,
919 last_built,
920 metrics,
921 max_transactions_per_checkpoint,
922 max_checkpoint_size_bytes,
923 }
924 }
925
926 async fn run(mut self) {
929 info!("Starting CheckpointBuilder");
930 loop {
931 self.maybe_build_checkpoints().await;
932
933 self.notify.notified().await;
934 }
935 }
936
937 async fn maybe_build_checkpoints(&mut self) {
938 let _scope = monitored_scope("BuildCheckpoints");
939
940 let summary = self
942 .epoch_store
943 .last_built_checkpoint_builder_summary()
944 .expect("epoch should not have ended");
945 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
946 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
947
948 let min_checkpoint_interval_ms = self
949 .epoch_store
950 .protocol_config()
951 .min_checkpoint_interval_ms_as_option()
952 .unwrap_or_default();
953 let mut grouped_pending_checkpoints = Vec::new();
954 let mut checkpoints_iter = self
955 .epoch_store
956 .get_pending_checkpoints(last_height)
957 .expect("unexpected epoch store error")
958 .into_iter()
959 .peekable();
960 while let Some((height, pending)) = checkpoints_iter.next() {
961 let current_timestamp = pending.details().timestamp_ms;
964 let can_build = match last_timestamp {
965 Some(last_timestamp) => {
966 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
967 }
968 None => true,
969 } || checkpoints_iter
972 .peek()
973 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
974 || pending.details().last_of_epoch;
976 grouped_pending_checkpoints.push(pending);
977 if !can_build {
978 debug!(
979 checkpoint_commit_height = height,
980 ?last_timestamp,
981 ?current_timestamp,
982 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
983 );
984 continue;
985 }
986
987 last_height = Some(height);
989 last_timestamp = Some(current_timestamp);
990 debug!(
991 checkpoint_commit_height = height,
992 "Making checkpoint at commit height"
993 );
994
995 match self
996 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
997 .await
998 {
999 Ok(seq) => {
1000 self.last_built.send_if_modified(|cur| {
1001 if seq > *cur {
1002 *cur = seq;
1003 true
1004 } else {
1005 false
1006 }
1007 });
1008 }
1009 Err(e) => {
1010 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1011 tokio::time::sleep(Duration::from_secs(1)).await;
1012 self.metrics.checkpoint_errors.inc();
1013 return;
1014 }
1015 }
1016 tokio::task::yield_now().await;
1019 }
1020 debug!(
1021 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1022 grouped_pending_checkpoints.len(),
1023 );
1024 }
1025
1026 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1027 async fn make_checkpoint(
1028 &self,
1029 pendings: Vec<PendingCheckpoint>,
1030 ) -> anyhow::Result<CheckpointSequenceNumber> {
1031 let last_details = pendings.last().unwrap().details().clone();
1032
1033 let mut effects_in_current_checkpoint = BTreeSet::new();
1039
1040 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1043 for pending_checkpoint in pendings.into_iter() {
1044 let pending = pending_checkpoint.into_v1();
1045 let txn_in_checkpoint = self
1046 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1047 .await?;
1048 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1049 }
1050 let new_checkpoints = self
1051 .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1052 .await?;
1053 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1054 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1055 .await?;
1056 Ok(highest_sequence)
1057 }
1058
1059 #[instrument(level = "debug", skip_all)]
1064 async fn resolve_checkpoint_transactions(
1065 &self,
1066 roots: Vec<TransactionKey>,
1067 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1068 ) -> IotaResult<Vec<TransactionEffects>> {
1069 self.metrics
1070 .checkpoint_roots_count
1071 .inc_by(roots.len() as u64);
1072
1073 let root_digests = self
1074 .epoch_store
1075 .notify_read_executed_digests(&roots)
1076 .in_monitored_scope("CheckpointNotifyDigests")
1077 .await?;
1078 let root_effects = self
1079 .effects_store
1080 .notify_read_executed_effects(&root_digests)
1081 .in_monitored_scope("CheckpointNotifyRead")
1082 .await?;
1083
1084 let _scope = monitored_scope("CheckpointBuilder");
1085
1086 let consensus_commit_prologue = {
1087 let consensus_commit_prologue = self
1091 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1092 .await?;
1093
1094 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1098 let unsorted_ccp = self.complete_checkpoint_effects(
1099 vec![ccp_effects.clone()],
1100 effects_in_current_checkpoint,
1101 )?;
1102
1103 assert_eq!(unsorted_ccp.len(), 1);
1106 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1107 }
1108 consensus_commit_prologue
1109 };
1110
1111 let unsorted =
1112 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1113
1114 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1115 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1116 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1117 #[cfg(debug_assertions)]
1118 {
1119 for tx in unsorted.iter() {
1122 assert!(tx.transaction_digest() != &_ccp_digest);
1123 }
1124 }
1125 sorted.push(ccp_effects);
1126 }
1127 sorted.extend(CausalOrder::causal_sort(unsorted));
1128
1129 #[cfg(msim)]
1130 {
1131 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1133 }
1134
1135 Ok(sorted)
1136 }
1137
1138 async fn extract_consensus_commit_prologue(
1143 &self,
1144 root_digests: &[TransactionDigest],
1145 root_effects: &[TransactionEffects],
1146 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1147 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1148 if root_digests.is_empty() {
1149 return Ok(None);
1150 }
1151
1152 let first_tx = self
1157 .state
1158 .get_transaction_cache_reader()
1159 .get_transaction_block(&root_digests[0])?
1160 .expect("Transaction block must exist");
1161
1162 Ok(match first_tx.transaction_data().kind() {
1163 TransactionKind::ConsensusCommitPrologueV1(_) => {
1164 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1165 Some((*first_tx.digest(), root_effects[0].clone()))
1166 }
1167 _ => None,
1168 })
1169 }
1170
1171 #[instrument(level = "debug", skip_all)]
1173 async fn write_checkpoints(
1174 &self,
1175 height: CheckpointHeight,
1176 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1177 ) -> IotaResult {
1178 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1179 let mut batch = self.tables.checkpoint_content.batch();
1180 let mut all_tx_digests =
1181 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1182
1183 for (summary, contents) in &new_checkpoints {
1184 debug!(
1185 checkpoint_commit_height = height,
1186 checkpoint_seq = summary.sequence_number,
1187 contents_digest = ?contents.digest(),
1188 "writing checkpoint",
1189 );
1190 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1191
1192 self.output
1193 .checkpoint_created(summary, contents, &self.epoch_store, &self.tables)
1194 .await?;
1195
1196 self.metrics
1197 .transactions_included_in_checkpoint
1198 .inc_by(contents.size() as u64);
1199 let sequence_number = summary.sequence_number;
1200 self.metrics
1201 .last_constructed_checkpoint
1202 .set(sequence_number as i64);
1203
1204 batch.insert_batch(
1205 &self.tables.checkpoint_content,
1206 [(contents.digest(), contents)],
1207 )?;
1208
1209 batch.insert_batch(
1210 &self.tables.locally_computed_checkpoints,
1211 [(sequence_number, summary)],
1212 )?;
1213 }
1214
1215 self.state
1227 .get_cache_commit()
1228 .persist_transactions(&all_tx_digests)
1229 .await?;
1230
1231 batch.write()?;
1232
1233 for (local_checkpoint, _) in &new_checkpoints {
1234 if let Some(certified_checkpoint) = self
1235 .tables
1236 .certified_checkpoints
1237 .get(local_checkpoint.sequence_number())?
1238 {
1239 self.tables
1240 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1241 }
1242 }
1243
1244 self.notify_aggregator.notify_one();
1245 self.epoch_store
1246 .process_pending_checkpoint(height, new_checkpoints)?;
1247 Ok(())
1248 }
1249
1250 #[expect(clippy::type_complexity)]
1251 fn split_checkpoint_chunks(
1252 &self,
1253 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1254 signatures: Vec<Vec<GenericSignature>>,
1255 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1256 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1257 let mut chunks = Vec::new();
1258 let mut chunk = Vec::new();
1259 let mut chunk_size: usize = 0;
1260 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1261 .into_iter()
1262 .zip(signatures.into_iter())
1263 {
1264 let size = transaction_size
1269 + bcs::serialized_size(&effects)?
1270 + bcs::serialized_size(&signatures)?;
1271 if chunk.len() == self.max_transactions_per_checkpoint
1272 || (chunk_size + size) > self.max_checkpoint_size_bytes
1273 {
1274 if chunk.is_empty() {
1275 warn!(
1277 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1278 self.max_checkpoint_size_bytes
1279 );
1280 } else {
1281 chunks.push(chunk);
1282 chunk = Vec::new();
1283 chunk_size = 0;
1284 }
1285 }
1286
1287 chunk.push((effects, signatures));
1288 chunk_size += size;
1289 }
1290
1291 if !chunk.is_empty() || chunks.is_empty() {
1292 chunks.push(chunk);
1297 }
1303 Ok(chunks)
1304 }
1305
1306 #[instrument(level = "debug", skip_all)]
1309 async fn create_checkpoints(
1310 &self,
1311 all_effects: Vec<TransactionEffects>,
1312 details: &PendingCheckpointInfo,
1313 ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1314 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1315 let total = all_effects.len();
1316 let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
1317 if last_checkpoint.is_none() {
1318 let epoch = self.epoch_store.epoch();
1319 if epoch > 0 {
1320 let previous_epoch = epoch - 1;
1321 let last_verified = self.tables.get_epoch_last_checkpoint(previous_epoch)?;
1322 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1323 if let Some((ref seq, _)) = last_checkpoint {
1324 debug!(
1325 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1326 );
1327 } else {
1328 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1331 }
1332 }
1333 }
1334 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1335 debug!(
1336 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1337 checkpoint_timestamp = details.timestamp_ms,
1338 "Creating checkpoint(s) for {} transactions",
1339 all_effects.len(),
1340 );
1341
1342 let all_digests: Vec<_> = all_effects
1343 .iter()
1344 .map(|effect| *effect.transaction_digest())
1345 .collect();
1346 let transactions_and_sizes = self
1347 .state
1348 .get_transaction_cache_reader()
1349 .get_transactions_and_serialized_sizes(&all_digests)?;
1350 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1351 let mut transactions = Vec::with_capacity(all_effects.len());
1352 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1353 let mut randomness_rounds = BTreeMap::new();
1354 {
1355 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1356 debug!(
1357 ?last_checkpoint_seq,
1358 "Waiting for {:?} certificates to appear in consensus",
1359 all_effects.len()
1360 );
1361
1362 for (effects, transaction_and_size) in all_effects
1363 .into_iter()
1364 .zip(transactions_and_sizes.into_iter())
1365 {
1366 let (transaction, size) = transaction_and_size
1367 .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1368 match transaction.inner().transaction_data().kind() {
1369 TransactionKind::ConsensusCommitPrologueV1(_)
1370 | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1371 }
1376 TransactionKind::RandomnessStateUpdate(rsu) => {
1377 randomness_rounds
1378 .insert(*effects.transaction_digest(), rsu.randomness_round);
1379 }
1380 _ => {
1381 transaction_keys.push(SequencedConsensusTransactionKey::External(
1384 ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1385 ));
1386 }
1387 }
1388 transactions.push(transaction);
1389 all_effects_and_transaction_sizes.push((effects, size));
1390 }
1391
1392 self.epoch_store
1393 .consensus_messages_processed_notify(transaction_keys)
1394 .await?;
1395 }
1396
1397 let signatures = self
1398 .epoch_store
1399 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1400 debug!(
1401 ?last_checkpoint_seq,
1402 "Received {} checkpoint user signatures from consensus",
1403 signatures.len()
1404 );
1405
1406 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1407 let chunks_count = chunks.len();
1408
1409 let mut checkpoints = Vec::with_capacity(chunks_count);
1410 debug!(
1411 ?last_checkpoint_seq,
1412 "Creating {} checkpoints with {} transactions", chunks_count, total,
1413 );
1414
1415 let epoch = self.epoch_store.epoch();
1416 for (index, transactions) in chunks.into_iter().enumerate() {
1417 let first_checkpoint_of_epoch = index == 0
1418 && last_checkpoint
1419 .as_ref()
1420 .map(|(_, c)| c.epoch != epoch)
1421 .unwrap_or(true);
1422 if first_checkpoint_of_epoch {
1423 self.epoch_store
1424 .record_epoch_first_checkpoint_creation_time_metric();
1425 }
1426 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1427
1428 let sequence_number = last_checkpoint
1429 .as_ref()
1430 .map(|(_, c)| c.sequence_number + 1)
1431 .unwrap_or_default();
1432 let timestamp_ms = details.timestamp_ms;
1433 if let Some((_, last_checkpoint)) = &last_checkpoint {
1434 if last_checkpoint.timestamp_ms > timestamp_ms {
1435 error!(
1436 "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1437 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1438 );
1439 }
1440 }
1441
1442 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1443 let epoch_rolling_gas_cost_summary =
1444 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1445
1446 let end_of_epoch_data = if last_checkpoint_of_epoch {
1447 let (system_state_obj, system_epoch_info_event) = self
1448 .augment_epoch_last_checkpoint(
1449 &epoch_rolling_gas_cost_summary,
1450 timestamp_ms,
1451 &mut effects,
1452 &mut signatures,
1453 sequence_number,
1454 )
1455 .await?;
1456
1457 let epoch_supply_change =
1465 system_epoch_info_event.map_or(0, |event| event.supply_change());
1466
1467 let committee = system_state_obj
1468 .get_current_epoch_committee()
1469 .committee()
1470 .clone();
1471
1472 let root_state_digest = {
1475 let state_acc = self
1476 .accumulator
1477 .upgrade()
1478 .expect("No checkpoints should be getting built after local configuration");
1479 let acc = state_acc.accumulate_checkpoint(
1480 effects.clone(),
1481 sequence_number,
1482 &self.epoch_store,
1483 )?;
1484 state_acc
1485 .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
1486 .await?;
1487 state_acc
1488 .digest_epoch(self.epoch_store.clone(), sequence_number)
1489 .await?
1490 };
1491 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1492 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1493
1494 let epoch_commitments = vec![root_state_digest.into()];
1495
1496 Some(EndOfEpochData {
1497 next_epoch_committee: committee.voting_rights,
1498 next_epoch_protocol_version: ProtocolVersion::new(
1499 system_state_obj.protocol_version(),
1500 ),
1501 epoch_commitments,
1502 epoch_supply_change,
1503 })
1504 } else {
1505 None
1506 };
1507 let contents = CheckpointContents::new_with_digests_and_signatures(
1508 effects.iter().map(TransactionEffects::execution_digests),
1509 signatures,
1510 );
1511
1512 let num_txns = contents.size() as u64;
1513
1514 let network_total_transactions = last_checkpoint
1515 .as_ref()
1516 .map(|(_, c)| c.network_total_transactions + num_txns)
1517 .unwrap_or(num_txns);
1518
1519 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1520
1521 let matching_randomness_rounds: Vec<_> = effects
1522 .iter()
1523 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1524 .copied()
1525 .collect();
1526
1527 let summary = CheckpointSummary::new(
1528 self.epoch_store.protocol_config(),
1529 epoch,
1530 sequence_number,
1531 network_total_transactions,
1532 &contents,
1533 previous_digest,
1534 epoch_rolling_gas_cost_summary,
1535 end_of_epoch_data,
1536 timestamp_ms,
1537 matching_randomness_rounds,
1538 );
1539 summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1540 if last_checkpoint_of_epoch {
1541 info!(
1542 checkpoint_seq = sequence_number,
1543 "creating last checkpoint of epoch {}", epoch
1544 );
1545 if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
1546 self.epoch_store
1547 .report_epoch_metrics_at_last_checkpoint(stats);
1548 }
1549 }
1550 last_checkpoint = Some((sequence_number, summary.clone()));
1551 checkpoints.push((summary, contents));
1552 }
1553
1554 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1555 }
1556
1557 fn get_epoch_total_gas_cost(
1558 &self,
1559 last_checkpoint: Option<&CheckpointSummary>,
1560 cur_checkpoint_effects: &[TransactionEffects],
1561 ) -> GasCostSummary {
1562 let (previous_epoch, previous_gas_costs) = last_checkpoint
1563 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1564 .unwrap_or_default();
1565 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1566 if previous_epoch == self.epoch_store.epoch() {
1567 GasCostSummary::new(
1569 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1570 previous_gas_costs.computation_cost_burned
1571 + current_gas_costs.computation_cost_burned,
1572 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1573 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1574 previous_gas_costs.non_refundable_storage_fee
1575 + current_gas_costs.non_refundable_storage_fee,
1576 )
1577 } else {
1578 current_gas_costs
1579 }
1580 }
1581
1582 #[instrument(level = "error", skip_all)]
1585 async fn augment_epoch_last_checkpoint(
1586 &self,
1587 epoch_total_gas_cost: &GasCostSummary,
1588 epoch_start_timestamp_ms: CheckpointTimestamp,
1589 checkpoint_effects: &mut Vec<TransactionEffects>,
1590 signatures: &mut Vec<Vec<GenericSignature>>,
1591 checkpoint: CheckpointSequenceNumber,
1592 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1593 let (system_state, system_epoch_info_event, effects) = self
1594 .state
1595 .create_and_execute_advance_epoch_tx(
1596 &self.epoch_store,
1597 epoch_total_gas_cost,
1598 checkpoint,
1599 epoch_start_timestamp_ms,
1600 )
1601 .await?;
1602 checkpoint_effects.push(effects);
1603 signatures.push(vec![]);
1604 Ok((system_state, system_epoch_info_event))
1605 }
1606
1607 #[instrument(level = "debug", skip_all)]
1616 fn complete_checkpoint_effects(
1617 &self,
1618 mut roots: Vec<TransactionEffects>,
1619 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1620 ) -> IotaResult<Vec<TransactionEffects>> {
1621 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1622 let mut results = vec![];
1623 let mut seen = HashSet::new();
1624 loop {
1625 let mut pending = HashSet::new();
1626
1627 let transactions_included = self
1628 .epoch_store
1629 .builder_included_transactions_in_checkpoint(
1630 roots.iter().map(|e| e.transaction_digest()),
1631 )?;
1632
1633 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1634 let digest = effect.transaction_digest();
1635 seen.insert(*digest);
1638
1639 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1641 continue;
1642 }
1643
1644 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1646 continue;
1647 }
1648
1649 let existing_effects = self
1650 .epoch_store
1651 .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1652
1653 for (dependency, effects_signature_exists) in
1654 effect.dependencies().iter().zip(existing_effects.iter())
1655 {
1656 if !effects_signature_exists {
1661 continue;
1662 }
1663 if seen.insert(*dependency) {
1664 pending.insert(*dependency);
1665 }
1666 }
1667 results.push(effect);
1668 }
1669 if pending.is_empty() {
1670 break;
1671 }
1672 let pending = pending.into_iter().collect::<Vec<_>>();
1673 let effects = self.effects_store.multi_get_executed_effects(&pending)?;
1674 let effects = effects
1675 .into_iter()
1676 .zip(pending)
1677 .map(|(opt, digest)| match opt {
1678 Some(x) => x,
1679 None => panic!(
1680 "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1681 ),
1682 })
1683 .collect::<Vec<_>>();
1684 roots = effects;
1685 }
1686
1687 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1688 Ok(results)
1689 }
1690
1691 #[cfg(msim)]
1694 fn expensive_consensus_commit_prologue_invariants_check(
1695 &self,
1696 root_digests: &[TransactionDigest],
1697 sorted: &[TransactionEffects],
1698 ) {
1699 let root_txs = self
1701 .state
1702 .get_transaction_cache_reader()
1703 .multi_get_transaction_blocks(root_digests)
1704 .unwrap();
1705 let ccps = root_txs
1706 .iter()
1707 .filter_map(|tx| {
1708 tx.as_ref().filter(|tx| {
1709 matches!(
1710 tx.transaction_data().kind(),
1711 TransactionKind::ConsensusCommitPrologueV1(_)
1712 )
1713 })
1714 })
1715 .collect::<Vec<_>>();
1716
1717 assert!(ccps.len() <= 1);
1720
1721 let txs = self
1723 .state
1724 .get_transaction_cache_reader()
1725 .multi_get_transaction_blocks(
1726 &sorted
1727 .iter()
1728 .map(|tx| *tx.transaction_digest())
1729 .collect::<Vec<_>>(),
1730 )
1731 .unwrap();
1732
1733 if ccps.is_empty() {
1734 for tx in txs.iter().flatten() {
1738 assert!(!matches!(
1739 tx.transaction_data().kind(),
1740 TransactionKind::ConsensusCommitPrologueV1(_)
1741 ));
1742 }
1743 } else {
1744 assert!(matches!(
1747 txs[0].as_ref().unwrap().transaction_data().kind(),
1748 TransactionKind::ConsensusCommitPrologueV1(_)
1749 ));
1750
1751 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1752
1753 for tx in txs.iter().skip(1).flatten() {
1754 assert!(!matches!(
1755 tx.transaction_data().kind(),
1756 TransactionKind::ConsensusCommitPrologueV1(_)
1757 ));
1758 }
1759 }
1760 }
1761}
1762
1763impl CheckpointAggregator {
1764 fn new(
1765 tables: Arc<CheckpointStore>,
1766 epoch_store: Arc<AuthorityPerEpochStore>,
1767 notify: Arc<Notify>,
1768 output: Box<dyn CertifiedCheckpointOutput>,
1769 state: Arc<AuthorityState>,
1770 metrics: Arc<CheckpointMetrics>,
1771 ) -> Self {
1772 let current = None;
1773 Self {
1774 tables,
1775 epoch_store,
1776 notify,
1777 current,
1778 output,
1779 state,
1780 metrics,
1781 }
1782 }
1783
1784 async fn run(mut self) {
1790 info!("Starting CheckpointAggregator");
1791 loop {
1792 if let Err(e) = self.run_and_notify().await {
1793 error!(
1794 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1795 e
1796 );
1797 self.metrics.checkpoint_errors.inc();
1798 tokio::time::sleep(Duration::from_secs(1)).await;
1799 continue;
1800 }
1801
1802 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1803 }
1804 }
1805
1806 async fn run_and_notify(&mut self) -> IotaResult {
1807 let summaries = self.run_inner()?;
1808 for summary in summaries {
1809 self.output.certified_checkpoint_created(&summary).await?;
1810 }
1811 Ok(())
1812 }
1813
1814 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1815 let _scope = monitored_scope("CheckpointAggregator");
1816 let mut result = vec![];
1817 'outer: loop {
1818 let next_to_certify = self.next_checkpoint_to_certify();
1819 let current = if let Some(current) = &mut self.current {
1820 if current.summary.sequence_number < next_to_certify {
1826 self.current = None;
1827 continue;
1828 }
1829 current
1830 } else {
1831 let Some(summary) = self
1832 .epoch_store
1833 .get_built_checkpoint_summary(next_to_certify)?
1834 else {
1835 return Ok(result);
1836 };
1837 self.current = Some(CheckpointSignatureAggregator {
1838 next_index: 0,
1839 digest: summary.digest(),
1840 summary,
1841 signatures_by_digest: MultiStakeAggregator::new(
1842 self.epoch_store.committee().clone(),
1843 ),
1844 tables: self.tables.clone(),
1845 state: self.state.clone(),
1846 metrics: self.metrics.clone(),
1847 });
1848 self.current.as_mut().unwrap()
1849 };
1850
1851 let epoch_tables = self
1852 .epoch_store
1853 .tables()
1854 .expect("should not run past end of epoch");
1855 let iter = epoch_tables.get_pending_checkpoint_signatures_iter(
1856 current.summary.sequence_number,
1857 current.next_index,
1858 )?;
1859 for ((seq, index), data) in iter {
1860 if seq != current.summary.sequence_number {
1861 trace!(
1862 checkpoint_seq =? current.summary.sequence_number,
1863 "Not enough checkpoint signatures",
1864 );
1865 return Ok(result);
1867 }
1868 trace!(
1869 checkpoint_seq = current.summary.sequence_number,
1870 "Processing signature for checkpoint (digest: {:?}) from {:?}",
1871 current.summary.digest(),
1872 data.summary.auth_sig().authority.concise()
1873 );
1874 self.metrics
1875 .checkpoint_participation
1876 .with_label_values(&[&format!(
1877 "{:?}",
1878 data.summary.auth_sig().authority.concise()
1879 )])
1880 .inc();
1881 if let Ok(auth_signature) = current.try_aggregate(data) {
1882 debug!(
1883 checkpoint_seq = current.summary.sequence_number,
1884 "Successfully aggregated signatures for checkpoint (digest: {:?})",
1885 current.summary.digest(),
1886 );
1887 let summary = VerifiedCheckpoint::new_unchecked(
1888 CertifiedCheckpointSummary::new_from_data_and_sig(
1889 current.summary.clone(),
1890 auth_signature,
1891 ),
1892 );
1893
1894 self.tables.insert_certified_checkpoint(&summary)?;
1895 self.metrics
1896 .last_certified_checkpoint
1897 .set(current.summary.sequence_number as i64);
1898 current
1899 .summary
1900 .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
1901 result.push(summary.into_inner());
1902 self.current = None;
1903 continue 'outer;
1904 } else {
1905 current.next_index = index + 1;
1906 }
1907 }
1908 break;
1909 }
1910 Ok(result)
1911 }
1912
1913 fn next_checkpoint_to_certify(&self) -> CheckpointSequenceNumber {
1914 self.tables
1915 .certified_checkpoints
1916 .unbounded_iter()
1917 .skip_to_last()
1918 .next()
1919 .map(|(seq, _)| seq + 1)
1920 .unwrap_or_default()
1921 }
1922}
1923
1924impl CheckpointSignatureAggregator {
1925 #[expect(clippy::result_unit_err)]
1926 pub fn try_aggregate(
1927 &mut self,
1928 data: CheckpointSignatureMessage,
1929 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
1930 let their_digest = *data.summary.digest();
1931 let (_, signature) = data.summary.into_data_and_sig();
1932 let author = signature.authority;
1933 let envelope =
1934 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
1935 match self.signatures_by_digest.insert(their_digest, envelope) {
1936 InsertResult::Failed {
1938 error:
1939 IotaError::StakeAggregatorRepeatedSigner {
1940 conflicting_sig: false,
1941 ..
1942 },
1943 } => Err(()),
1944 InsertResult::Failed { error } => {
1945 warn!(
1946 checkpoint_seq = self.summary.sequence_number,
1947 "Failed to aggregate new signature from validator {:?}: {:?}",
1948 author.concise(),
1949 error
1950 );
1951 self.check_for_split_brain();
1952 Err(())
1953 }
1954 InsertResult::QuorumReached(cert) => {
1955 if their_digest != self.digest {
1959 self.metrics.remote_checkpoint_forks.inc();
1960 warn!(
1961 checkpoint_seq = self.summary.sequence_number,
1962 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
1963 author.concise(),
1964 their_digest,
1965 self.digest
1966 );
1967 return Err(());
1968 }
1969 Ok(cert)
1970 }
1971 InsertResult::NotEnoughVotes {
1972 bad_votes: _,
1973 bad_authorities: _,
1974 } => {
1975 self.check_for_split_brain();
1976 Err(())
1977 }
1978 }
1979 }
1980
1981 fn check_for_split_brain(&self) {
1986 debug!(
1987 checkpoint_seq = self.summary.sequence_number,
1988 "Checking for split brain condition"
1989 );
1990 if self.signatures_by_digest.quorum_unreachable() {
1991 let digests_by_stake_messages = self
1997 .signatures_by_digest
1998 .get_all_unique_values()
1999 .into_iter()
2000 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2001 .map(|(digest, (_authorities, total_stake))| {
2002 format!("{digest:?} (total stake: {total_stake})")
2003 })
2004 .collect::<Vec<String>>();
2005 error!(
2006 checkpoint_seq = self.summary.sequence_number,
2007 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2008 self.signatures_by_digest.uncommitted_stake(),
2009 digests_by_stake_messages,
2010 );
2011 self.metrics.split_brain_checkpoint_forks.inc();
2012
2013 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2014 let local_summary = self.summary.clone();
2015 let state = self.state.clone();
2016 let tables = self.tables.clone();
2017
2018 tokio::spawn(async move {
2019 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2020 });
2021 }
2022 }
2023}
2024
2025async fn diagnose_split_brain(
2031 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2032 local_summary: CheckpointSummary,
2033 state: Arc<AuthorityState>,
2034 tables: Arc<CheckpointStore>,
2035) {
2036 debug!(
2037 checkpoint_seq = local_summary.sequence_number,
2038 "Running split brain diagnostics..."
2039 );
2040 let time = Utc::now();
2041 let digest_to_validator = all_unique_values
2043 .iter()
2044 .filter_map(|(digest, (validators, _))| {
2045 if *digest != local_summary.digest() {
2046 let random_validator = validators.choose(&mut OsRng).unwrap();
2047 Some((*digest, *random_validator))
2048 } else {
2049 None
2050 }
2051 })
2052 .collect::<HashMap<_, _>>();
2053 if digest_to_validator.is_empty() {
2054 panic!(
2055 "Given split brain condition, there should be at \
2056 least one validator that disagrees with local signature"
2057 );
2058 }
2059
2060 let epoch_store = state.load_epoch_store_one_call_per_task();
2061 let committee = epoch_store
2062 .epoch_start_state()
2063 .get_iota_committee_with_network_metadata();
2064 let network_config = default_iota_network_config();
2065 let network_clients =
2066 make_network_authority_clients_with_network_config(&committee, &network_config);
2067
2068 let response_futures = digest_to_validator
2070 .values()
2071 .cloned()
2072 .map(|validator| {
2073 let client = network_clients
2074 .get(&validator)
2075 .expect("Failed to get network client");
2076 let request = CheckpointRequest {
2077 sequence_number: Some(local_summary.sequence_number),
2078 request_content: true,
2079 certified: false,
2080 };
2081 client.handle_checkpoint(request)
2082 })
2083 .collect::<Vec<_>>();
2084
2085 let digest_name_pair = digest_to_validator.iter();
2086 let response_data = futures::future::join_all(response_futures)
2087 .await
2088 .into_iter()
2089 .zip(digest_name_pair)
2090 .filter_map(|(response, (digest, name))| match response {
2091 Ok(response) => match response {
2092 CheckpointResponse {
2093 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2094 contents: Some(contents),
2095 } => Some((*name, *digest, summary, contents)),
2096 CheckpointResponse {
2097 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2098 contents: _,
2099 } => {
2100 panic!("Expected pending checkpoint, but got certified checkpoint");
2101 }
2102 CheckpointResponse {
2103 checkpoint: None,
2104 contents: _,
2105 } => {
2106 error!(
2107 "Summary for checkpoint {:?} not found on validator {:?}",
2108 local_summary.sequence_number, name
2109 );
2110 None
2111 }
2112 CheckpointResponse {
2113 checkpoint: _,
2114 contents: None,
2115 } => {
2116 error!(
2117 "Contents for checkpoint {:?} not found on validator {:?}",
2118 local_summary.sequence_number, name
2119 );
2120 None
2121 }
2122 },
2123 Err(e) => {
2124 error!(
2125 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2126 e
2127 );
2128 None
2129 }
2130 })
2131 .collect::<Vec<_>>();
2132
2133 let local_checkpoint_contents = tables
2134 .get_checkpoint_contents(&local_summary.content_digest)
2135 .unwrap_or_else(|_| {
2136 panic!(
2137 "Could not find checkpoint contents for digest {:?}",
2138 local_summary.digest()
2139 )
2140 })
2141 .unwrap_or_else(|| {
2142 panic!(
2143 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2144 local_summary.sequence_number,
2145 local_summary.digest()
2146 )
2147 });
2148 let local_contents_text = format!("{local_checkpoint_contents:?}");
2149
2150 let local_summary_text = format!("{local_summary:?}");
2151 let local_validator = state.name.concise();
2152 let diff_patches = response_data
2153 .iter()
2154 .map(|(name, other_digest, other_summary, contents)| {
2155 let other_contents_text = format!("{contents:?}");
2156 let other_summary_text = format!("{other_summary:?}");
2157 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2158 .enumerate_transactions(&local_summary)
2159 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2160 .unzip();
2161 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2162 .enumerate_transactions(other_summary)
2163 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2164 .unzip();
2165 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2166 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2167 let local_transactions_text = format!("{local_transactions:#?}");
2168 let other_transactions_text = format!("{other_transactions:#?}");
2169 let transactions_patch =
2170 create_patch(&local_transactions_text, &other_transactions_text);
2171 let local_effects_text = format!("{local_effects:#?}");
2172 let other_effects_text = format!("{other_effects:#?}");
2173 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2174 let seq_number = local_summary.sequence_number;
2175 let local_digest = local_summary.digest();
2176 let other_validator = name.concise();
2177 format!(
2178 "Checkpoint: {seq_number:?}\n\
2179 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2180 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2181 Summary Diff: \n{summary_patch}\n\n\
2182 Contents Diff: \n{contents_patch}\n\n\
2183 Transactions Diff: \n{transactions_patch}\n\n\
2184 Effects Diff: \n{effects_patch}",
2185 )
2186 })
2187 .collect::<Vec<_>>()
2188 .join("\n\n\n");
2189
2190 let header = format!(
2191 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2192 Datetime: {time}",
2193 );
2194 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2195 let path = tempfile::tempdir()
2196 .expect("Failed to create tempdir")
2197 .into_path()
2198 .join(Path::new("checkpoint_fork_dump.txt"));
2199 let mut file = File::create(path).unwrap();
2200 write!(file, "{fork_logs_text}").unwrap();
2201 debug!("{}", fork_logs_text);
2202
2203 fail_point!("split_brain_reached");
2204}
2205
2206pub trait CheckpointServiceNotify {
2207 fn notify_checkpoint_signature(
2208 &self,
2209 epoch_store: &AuthorityPerEpochStore,
2210 info: &CheckpointSignatureMessage,
2211 ) -> IotaResult;
2212
2213 fn notify_checkpoint(&self) -> IotaResult;
2214}
2215
2216enum CheckpointServiceState {
2217 Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2218 Started,
2219}
2220
2221impl CheckpointServiceState {
2222 fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2223 let mut state = CheckpointServiceState::Started;
2224 std::mem::swap(self, &mut state);
2225
2226 match state {
2227 CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2228 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2229 }
2230 }
2231}
2232
2233pub struct CheckpointService {
2234 tables: Arc<CheckpointStore>,
2235 notify_builder: Arc<Notify>,
2236 notify_aggregator: Arc<Notify>,
2237 last_signature_index: Mutex<u64>,
2238 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2240 highest_previously_built_seq: CheckpointSequenceNumber,
2243 metrics: Arc<CheckpointMetrics>,
2244 state: Mutex<CheckpointServiceState>,
2245}
2246
2247impl CheckpointService {
2248 pub fn build(
2252 state: Arc<AuthorityState>,
2253 checkpoint_store: Arc<CheckpointStore>,
2254 epoch_store: Arc<AuthorityPerEpochStore>,
2255 effects_store: Arc<dyn TransactionCacheRead>,
2256 accumulator: Weak<StateAccumulator>,
2257 checkpoint_output: Box<dyn CheckpointOutput>,
2258 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2259 metrics: Arc<CheckpointMetrics>,
2260 max_transactions_per_checkpoint: usize,
2261 max_checkpoint_size_bytes: usize,
2262 ) -> Arc<Self> {
2263 info!(
2264 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2265 );
2266 let notify_builder = Arc::new(Notify::new());
2267 let notify_aggregator = Arc::new(Notify::new());
2268
2269 let highest_previously_built_seq = epoch_store
2270 .last_built_checkpoint_builder_summary()
2271 .expect("epoch should not have ended")
2272 .map(|s| s.summary.sequence_number)
2273 .unwrap_or(0);
2274
2275 let (highest_currently_built_seq_tx, _) = watch::channel(highest_previously_built_seq);
2276
2277 let aggregator = CheckpointAggregator::new(
2278 checkpoint_store.clone(),
2279 epoch_store.clone(),
2280 notify_aggregator.clone(),
2281 certified_checkpoint_output,
2282 state.clone(),
2283 metrics.clone(),
2284 );
2285
2286 let builder = CheckpointBuilder::new(
2287 state.clone(),
2288 checkpoint_store.clone(),
2289 epoch_store.clone(),
2290 notify_builder.clone(),
2291 effects_store,
2292 accumulator,
2293 checkpoint_output,
2294 notify_aggregator.clone(),
2295 highest_currently_built_seq_tx.clone(),
2296 metrics.clone(),
2297 max_transactions_per_checkpoint,
2298 max_checkpoint_size_bytes,
2299 );
2300
2301 let last_signature_index = epoch_store
2302 .get_last_checkpoint_signature_index()
2303 .expect("should not cross end of epoch");
2304 let last_signature_index = Mutex::new(last_signature_index);
2305
2306 Arc::new(Self {
2307 tables: checkpoint_store,
2308 notify_builder,
2309 notify_aggregator,
2310 last_signature_index,
2311 highest_currently_built_seq_tx,
2312 highest_previously_built_seq,
2313 metrics,
2314 state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2315 builder, aggregator,
2316 )))),
2317 })
2318 }
2319
2320 pub async fn spawn(&self) -> JoinSet<()> {
2329 let mut tasks = JoinSet::new();
2330
2331 let (builder, aggregator) = self.state.lock().take_unstarted();
2332 tasks.spawn(monitored_future!(builder.run()));
2333 tasks.spawn(monitored_future!(aggregator.run()));
2334
2335 if tokio::time::timeout(
2341 Duration::from_secs(120),
2342 self.wait_for_rebuilt_checkpoints(),
2343 )
2344 .await
2345 .is_err()
2346 {
2347 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2348 }
2349
2350 tasks
2351 }
2352}
2353
2354impl CheckpointService {
2355 pub async fn wait_for_rebuilt_checkpoints(&self) {
2362 let highest_previously_built_seq = self.highest_previously_built_seq;
2363 let mut rx = self.highest_currently_built_seq_tx.subscribe();
2364 loop {
2365 let highest_currently_built_seq = *rx.borrow_and_update();
2366 if highest_currently_built_seq >= highest_previously_built_seq {
2367 break;
2368 }
2369 rx.changed().await.unwrap();
2370 }
2371 }
2372
2373 #[cfg(test)]
2374 fn write_and_notify_checkpoint_for_testing(
2375 &self,
2376 epoch_store: &AuthorityPerEpochStore,
2377 checkpoint: PendingCheckpoint,
2378 ) -> IotaResult {
2379 use crate::authority::authority_per_epoch_store::ConsensusCommitOutput;
2380
2381 let mut output = ConsensusCommitOutput::new();
2382 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2383 let mut batch = epoch_store.db_batch_for_test();
2384 output.write_to_batch(epoch_store, &mut batch)?;
2385 batch.write()?;
2386 self.notify_checkpoint()?;
2387 Ok(())
2388 }
2389}
2390
2391impl CheckpointServiceNotify for CheckpointService {
2392 fn notify_checkpoint_signature(
2393 &self,
2394 epoch_store: &AuthorityPerEpochStore,
2395 info: &CheckpointSignatureMessage,
2396 ) -> IotaResult {
2397 let sequence = info.summary.sequence_number;
2398 let signer = info.summary.auth_sig().authority.concise();
2399
2400 if let Some(highest_verified_checkpoint) = self
2401 .tables
2402 .get_highest_verified_checkpoint()?
2403 .map(|x| *x.sequence_number())
2404 {
2405 if sequence <= highest_verified_checkpoint {
2406 trace!(
2407 checkpoint_seq = sequence,
2408 "Ignore checkpoint signature from {} - already certified", signer,
2409 );
2410 self.metrics
2411 .last_ignored_checkpoint_signature_received
2412 .set(sequence as i64);
2413 return Ok(());
2414 }
2415 }
2416 trace!(
2417 checkpoint_seq = sequence,
2418 "Received checkpoint signature, digest {} from {}",
2419 info.summary.digest(),
2420 signer,
2421 );
2422 self.metrics
2423 .last_received_checkpoint_signatures
2424 .with_label_values(&[&signer.to_string()])
2425 .set(sequence as i64);
2426 let mut index = self.last_signature_index.lock();
2430 *index += 1;
2431 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2432 self.notify_aggregator.notify_one();
2433 Ok(())
2434 }
2435
2436 fn notify_checkpoint(&self) -> IotaResult {
2437 self.notify_builder.notify_one();
2438 Ok(())
2439 }
2440}
2441
2442pub struct CheckpointServiceNoop {}
2444impl CheckpointServiceNotify for CheckpointServiceNoop {
2445 fn notify_checkpoint_signature(
2446 &self,
2447 _: &AuthorityPerEpochStore,
2448 _: &CheckpointSignatureMessage,
2449 ) -> IotaResult {
2450 Ok(())
2451 }
2452
2453 fn notify_checkpoint(&self) -> IotaResult {
2454 Ok(())
2455 }
2456}
2457
2458#[cfg(test)]
2459mod tests {
2460 use std::{
2461 collections::{BTreeMap, HashMap},
2462 ops::Deref,
2463 };
2464
2465 use futures::{FutureExt as _, future::BoxFuture};
2466 use iota_macros::sim_test;
2467 use iota_protocol_config::{Chain, ProtocolConfig};
2468 use iota_types::{
2469 base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2470 crypto::Signature,
2471 digests::TransactionEventsDigest,
2472 effects::{TransactionEffects, TransactionEvents},
2473 messages_checkpoint::SignedCheckpointSummary,
2474 move_package::MovePackage,
2475 object,
2476 transaction::{GenesisObject, VerifiedTransaction},
2477 };
2478 use tokio::sync::mpsc;
2479
2480 use super::*;
2481 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2482
2483 #[sim_test]
2484 pub async fn checkpoint_builder_test() {
2485 telemetry_subscribers::init_for_testing();
2486
2487 let mut protocol_config =
2488 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2489 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2490 let state = TestAuthorityBuilder::new()
2491 .with_protocol_config(protocol_config)
2492 .build()
2493 .await;
2494
2495 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2496 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2497 vec![GenesisObject::RawObject {
2498 data: object::Data::Package(
2499 MovePackage::new(
2500 ObjectID::random(),
2501 SequenceNumber::new(),
2502 BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2503 100_000,
2504 Vec::new(),
2507 BTreeMap::new(),
2510 )
2511 .unwrap(),
2512 ),
2513 owner: object::Owner::Immutable,
2514 }],
2515 vec![],
2516 );
2517 for i in 0..15 {
2518 state
2519 .database_for_testing()
2520 .perpetual_tables
2521 .transactions
2522 .insert(&d(i), dummy_tx.serializable_ref())
2523 .unwrap();
2524 }
2525 for i in 15..20 {
2526 state
2527 .database_for_testing()
2528 .perpetual_tables
2529 .transactions
2530 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2531 .unwrap();
2532 }
2533
2534 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2535 commit_cert_for_test(
2536 &mut store,
2537 state.clone(),
2538 d(1),
2539 vec![d(2), d(3)],
2540 GasCostSummary::new(11, 11, 12, 11, 1),
2541 );
2542 commit_cert_for_test(
2543 &mut store,
2544 state.clone(),
2545 d(2),
2546 vec![d(3), d(4)],
2547 GasCostSummary::new(21, 21, 22, 21, 1),
2548 );
2549 commit_cert_for_test(
2550 &mut store,
2551 state.clone(),
2552 d(3),
2553 vec![],
2554 GasCostSummary::new(31, 31, 32, 31, 1),
2555 );
2556 commit_cert_for_test(
2557 &mut store,
2558 state.clone(),
2559 d(4),
2560 vec![],
2561 GasCostSummary::new(41, 41, 42, 41, 1),
2562 );
2563 for i in [5, 6, 7, 10, 11, 12, 13] {
2564 commit_cert_for_test(
2565 &mut store,
2566 state.clone(),
2567 d(i),
2568 vec![],
2569 GasCostSummary::new(41, 41, 42, 41, 1),
2570 );
2571 }
2572 for i in [15, 16, 17] {
2573 commit_cert_for_test(
2574 &mut store,
2575 state.clone(),
2576 d(i),
2577 vec![],
2578 GasCostSummary::new(51, 51, 52, 51, 1),
2579 );
2580 }
2581 let all_digests: Vec<_> = store.keys().copied().collect();
2582 for digest in all_digests {
2583 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2584 state
2585 .epoch_store_for_testing()
2586 .test_insert_user_signature(digest, vec![signature]);
2587 }
2588
2589 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2590 let (certified_output, mut certified_result) =
2591 mpsc::channel::<CertifiedCheckpointSummary>(10);
2592 let store = Arc::new(store);
2593
2594 let ckpt_dir = tempfile::tempdir().unwrap();
2595 let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2596 let epoch_store = state.epoch_store_for_testing();
2597
2598 let accumulator = Arc::new(StateAccumulator::new_for_tests(
2599 state.get_accumulator_store().clone(),
2600 ));
2601
2602 let checkpoint_service = CheckpointService::build(
2603 state.clone(),
2604 checkpoint_store,
2605 epoch_store.clone(),
2606 store,
2607 Arc::downgrade(&accumulator),
2608 Box::new(output),
2609 Box::new(certified_output),
2610 CheckpointMetrics::new_for_tests(),
2611 3,
2612 100_000,
2613 );
2614 let _tasks = checkpoint_service.spawn().await;
2615
2616 checkpoint_service
2617 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2618 .unwrap();
2619 checkpoint_service
2620 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2621 .unwrap();
2622 checkpoint_service
2623 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2624 .unwrap();
2625 checkpoint_service
2626 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2627 .unwrap();
2628 checkpoint_service
2629 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2630 .unwrap();
2631 checkpoint_service
2632 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2633 .unwrap();
2634
2635 let (c1c, c1s) = result.recv().await.unwrap();
2636 let (c2c, c2s) = result.recv().await.unwrap();
2637
2638 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2639 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2640 assert_eq!(c1t, vec![d(4)]);
2641 assert_eq!(c1s.previous_digest, None);
2642 assert_eq!(c1s.sequence_number, 0);
2643 assert_eq!(
2644 c1s.epoch_rolling_gas_cost_summary,
2645 GasCostSummary::new(41, 41, 42, 41, 1)
2646 );
2647
2648 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2649 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2650 assert_eq!(c2s.sequence_number, 1);
2651 assert_eq!(
2652 c2s.epoch_rolling_gas_cost_summary,
2653 GasCostSummary::new(104, 104, 108, 104, 4)
2654 );
2655
2656 let (c3c, c3s) = result.recv().await.unwrap();
2659 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2660 let (c4c, c4s) = result.recv().await.unwrap();
2661 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2662 assert_eq!(c3s.sequence_number, 2);
2663 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2664 assert_eq!(c4s.sequence_number, 3);
2665 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2666 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2667 assert_eq!(c4t, vec![d(13)]);
2668
2669 let (c5c, c5s) = result.recv().await.unwrap();
2672 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2673 let (c6c, c6s) = result.recv().await.unwrap();
2674 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2675 assert_eq!(c5s.sequence_number, 4);
2676 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2677 assert_eq!(c6s.sequence_number, 5);
2678 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2679 assert_eq!(c5t, vec![d(15), d(16)]);
2680 assert_eq!(c6t, vec![d(17)]);
2681
2682 let (c7c, c7s) = result.recv().await.unwrap();
2685 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2686 assert_eq!(c7t, vec![d(5), d(6)]);
2687 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2688 assert_eq!(c7s.sequence_number, 6);
2689
2690 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2691 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2692
2693 checkpoint_service
2694 .notify_checkpoint_signature(
2695 &epoch_store,
2696 &CheckpointSignatureMessage { summary: c2ss },
2697 )
2698 .unwrap();
2699 checkpoint_service
2700 .notify_checkpoint_signature(
2701 &epoch_store,
2702 &CheckpointSignatureMessage { summary: c1ss },
2703 )
2704 .unwrap();
2705
2706 let c1sc = certified_result.recv().await.unwrap();
2707 let c2sc = certified_result.recv().await.unwrap();
2708 assert_eq!(c1sc.sequence_number, 0);
2709 assert_eq!(c2sc.sequence_number, 1);
2710 }
2711
2712 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2713 fn notify_read_executed_effects(
2714 &self,
2715 digests: &[TransactionDigest],
2716 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2717 std::future::ready(Ok(digests
2718 .iter()
2719 .map(|d| self.get(d).expect("effects not found").clone())
2720 .collect()))
2721 .boxed()
2722 }
2723
2724 fn notify_read_executed_effects_digests(
2725 &self,
2726 digests: &[TransactionDigest],
2727 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2728 std::future::ready(Ok(digests
2729 .iter()
2730 .map(|d| {
2731 self.get(d)
2732 .map(|fx| fx.digest())
2733 .expect("effects not found")
2734 })
2735 .collect()))
2736 .boxed()
2737 }
2738
2739 fn multi_get_executed_effects(
2740 &self,
2741 digests: &[TransactionDigest],
2742 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2743 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2744 }
2745
2746 fn multi_get_transaction_blocks(
2753 &self,
2754 _: &[TransactionDigest],
2755 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2756 unimplemented!()
2757 }
2758
2759 fn multi_get_executed_effects_digests(
2760 &self,
2761 _: &[TransactionDigest],
2762 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2763 unimplemented!()
2764 }
2765
2766 fn multi_get_effects(
2767 &self,
2768 _: &[TransactionEffectsDigest],
2769 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2770 unimplemented!()
2771 }
2772
2773 fn multi_get_events(
2774 &self,
2775 _: &[TransactionEventsDigest],
2776 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2777 unimplemented!()
2778 }
2779 }
2780
2781 #[async_trait::async_trait]
2782 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2783 async fn checkpoint_created(
2784 &self,
2785 summary: &CheckpointSummary,
2786 contents: &CheckpointContents,
2787 _epoch_store: &Arc<AuthorityPerEpochStore>,
2788 _checkpoint_store: &Arc<CheckpointStore>,
2789 ) -> IotaResult {
2790 self.try_send((contents.clone(), summary.clone())).unwrap();
2791 Ok(())
2792 }
2793 }
2794
2795 #[async_trait::async_trait]
2796 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2797 async fn certified_checkpoint_created(
2798 &self,
2799 summary: &CertifiedCheckpointSummary,
2800 ) -> IotaResult {
2801 self.try_send(summary.clone()).unwrap();
2802 Ok(())
2803 }
2804 }
2805
2806 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2807 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2808 roots: t
2809 .into_iter()
2810 .map(|t| TransactionKey::Digest(d(t)))
2811 .collect(),
2812 details: PendingCheckpointInfo {
2813 timestamp_ms,
2814 last_of_epoch: false,
2815 checkpoint_height: i,
2816 },
2817 })
2818 }
2819
2820 fn d(i: u8) -> TransactionDigest {
2821 let mut bytes: [u8; 32] = Default::default();
2822 bytes[0] = i;
2823 TransactionDigest::new(bytes)
2824 }
2825
2826 fn e(
2827 transaction_digest: TransactionDigest,
2828 dependencies: Vec<TransactionDigest>,
2829 gas_used: GasCostSummary,
2830 ) -> TransactionEffects {
2831 let mut effects = TransactionEffects::default();
2832 *effects.transaction_digest_mut_for_testing() = transaction_digest;
2833 *effects.dependencies_mut_for_testing() = dependencies;
2834 *effects.gas_cost_summary_mut_for_testing() = gas_used;
2835 effects
2836 }
2837
2838 fn commit_cert_for_test(
2839 store: &mut HashMap<TransactionDigest, TransactionEffects>,
2840 state: Arc<AuthorityState>,
2841 digest: TransactionDigest,
2842 dependencies: Vec<TransactionDigest>,
2843 gas_used: GasCostSummary,
2844 ) {
2845 let epoch_store = state.epoch_store_for_testing();
2846 let effects = e(digest, dependencies, gas_used);
2847 store.insert(digest, effects.clone());
2848 epoch_store
2849 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2850 .expect("Inserting cert fx and sigs should not fail");
2851 }
2852}