1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::Duration,
17};
18
19use chrono::Utc;
20use diffy::create_patch;
21use iota_common::{debug_fatal, fatal};
22use iota_macros::fail_point;
23use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
24use iota_network::default_iota_network_config;
25use iota_protocol_config::ProtocolVersion;
26use iota_types::{
27 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
28 committee::StakeUnit,
29 crypto::AuthorityStrongQuorumSignInfo,
30 digests::{CheckpointContentsDigest, CheckpointDigest},
31 effects::{TransactionEffects, TransactionEffectsAPI},
32 error::{IotaError, IotaResult},
33 event::SystemEpochInfoEvent,
34 executable_transaction::VerifiedExecutableTransaction,
35 gas::GasCostSummary,
36 iota_system_state::{
37 IotaSystemState, IotaSystemStateTrait,
38 epoch_start_iota_system_state::EpochStartSystemStateTrait,
39 },
40 message_envelope::Message,
41 messages_checkpoint::{
42 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
43 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
44 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
45 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
46 VerifiedCheckpointContents,
47 },
48 messages_consensus::ConsensusTransactionKey,
49 signature::GenericSignature,
50 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
51};
52use itertools::Itertools;
53use nonempty::NonEmpty;
54use parking_lot::Mutex;
55use rand::{rngs::OsRng, seq::SliceRandom};
56use serde::{Deserialize, Serialize};
57use tokio::{
58 sync::{Notify, watch},
59 task::JoinSet,
60 time::timeout,
61};
62use tracing::{debug, error, info, instrument, warn};
63use typed_store::{
64 DBMapUtils, Map, TypedStoreError,
65 rocks::{DBMap, MetricConf},
66 traits::{TableSummary, TypedStoreDebug},
67};
68
69pub use crate::checkpoints::{
70 checkpoint_output::{
71 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
72 },
73 metrics::CheckpointMetrics,
74};
75use crate::{
76 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
77 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
78 checkpoints::{
79 causal_order::CausalOrder,
80 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
81 },
82 consensus_handler::SequencedConsensusTransactionKey,
83 execution_cache::TransactionCacheRead,
84 stake_aggregator::{InsertResult, MultiStakeAggregator},
85 state_accumulator::StateAccumulator,
86};
87
88pub type CheckpointHeight = u64;
89
90pub struct EpochStats {
91 pub checkpoint_count: u64,
92 pub transaction_count: u64,
93 pub total_gas_reward: u64,
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub struct PendingCheckpointInfo {
98 pub timestamp_ms: CheckpointTimestamp,
99 pub last_of_epoch: bool,
100 pub checkpoint_height: CheckpointHeight,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum PendingCheckpoint {
105 V1(PendingCheckpointContentsV1),
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct PendingCheckpointContentsV1 {
111 pub roots: Vec<TransactionKey>,
112 pub details: PendingCheckpointInfo,
113}
114
115impl PendingCheckpoint {
116 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
117 match self {
118 PendingCheckpoint::V1(contents) => contents,
119 }
120 }
121
122 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
123 match self {
124 PendingCheckpoint::V1(contents) => contents,
125 }
126 }
127
128 pub fn roots(&self) -> &Vec<TransactionKey> {
129 &self.as_v1().roots
130 }
131
132 pub fn details(&self) -> &PendingCheckpointInfo {
133 &self.as_v1().details
134 }
135
136 pub fn height(&self) -> CheckpointHeight {
137 self.details().checkpoint_height
138 }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
142pub struct BuilderCheckpointSummary {
143 pub summary: CheckpointSummary,
144 pub checkpoint_height: Option<CheckpointHeight>,
146 pub position_in_commit: usize,
147}
148
149#[derive(DBMapUtils)]
150pub struct CheckpointStore {
151 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
153
154 pub(crate) checkpoint_sequence_by_contents_digest:
156 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
157
158 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
162
163 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
165 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
167
168 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
172
173 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
176
177 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
180}
181
182impl CheckpointStore {
183 pub fn new(path: &Path) -> Arc<Self> {
184 Arc::new(Self::open_tables_read_write(
185 path.to_path_buf(),
186 MetricConf::new("checkpoint"),
187 None,
188 None,
189 ))
190 }
191
192 pub fn open_readonly(path: &Path) -> CheckpointStoreReadOnly {
193 Self::get_read_only_handle(
194 path.to_path_buf(),
195 None,
196 None,
197 MetricConf::new("checkpoint_readonly"),
198 )
199 }
200
201 #[instrument(level = "info", skip_all)]
202 pub fn insert_genesis_checkpoint(
203 &self,
204 checkpoint: VerifiedCheckpoint,
205 contents: CheckpointContents,
206 epoch_store: &AuthorityPerEpochStore,
207 ) {
208 assert_eq!(
209 checkpoint.epoch(),
210 0,
211 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
212 );
213 assert_eq!(
214 *checkpoint.sequence_number(),
215 0,
216 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
217 );
218
219 if self
222 .get_checkpoint_by_digest(checkpoint.digest())
223 .unwrap()
224 .is_none()
225 {
226 if epoch_store.epoch() == checkpoint.epoch {
227 epoch_store
228 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
229 .unwrap();
230 } else {
231 debug!(
232 validator_epoch =% epoch_store.epoch(),
233 genesis_epoch =% checkpoint.epoch(),
234 "Not inserting checkpoint builder data for genesis checkpoint",
235 );
236 }
237 self.insert_checkpoint_contents(contents).unwrap();
238 self.insert_verified_checkpoint(&checkpoint).unwrap();
239 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
240 }
241 }
242
243 pub fn get_checkpoint_by_digest(
244 &self,
245 digest: &CheckpointDigest,
246 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
247 self.checkpoint_by_digest
248 .get(digest)
249 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
250 }
251
252 pub fn get_checkpoint_by_sequence_number(
253 &self,
254 sequence_number: CheckpointSequenceNumber,
255 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
256 self.certified_checkpoints
257 .get(&sequence_number)
258 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
259 }
260
261 pub fn get_locally_computed_checkpoint(
262 &self,
263 sequence_number: CheckpointSequenceNumber,
264 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
265 self.locally_computed_checkpoints.get(&sequence_number)
266 }
267
268 pub fn get_sequence_number_by_contents_digest(
269 &self,
270 digest: &CheckpointContentsDigest,
271 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
272 self.checkpoint_sequence_by_contents_digest.get(digest)
273 }
274
275 pub fn delete_contents_digest_sequence_number_mapping(
276 &self,
277 digest: &CheckpointContentsDigest,
278 ) -> Result<(), TypedStoreError> {
279 self.checkpoint_sequence_by_contents_digest.remove(digest)
280 }
281
282 pub fn get_latest_certified_checkpoint(&self) -> Option<VerifiedCheckpoint> {
283 self.certified_checkpoints
284 .unbounded_iter()
285 .skip_to_last()
286 .next()
287 .map(|(_, v)| v.into())
288 }
289
290 pub fn get_latest_locally_computed_checkpoint(&self) -> Option<CheckpointSummary> {
291 self.locally_computed_checkpoints
292 .unbounded_iter()
293 .skip_to_last()
294 .next()
295 .map(|(_, v)| v)
296 }
297
298 pub fn multi_get_checkpoint_by_sequence_number(
299 &self,
300 sequence_numbers: &[CheckpointSequenceNumber],
301 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
302 let checkpoints = self
303 .certified_checkpoints
304 .multi_get(sequence_numbers)?
305 .into_iter()
306 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
307 .collect();
308
309 Ok(checkpoints)
310 }
311
312 pub fn multi_get_checkpoint_content(
313 &self,
314 contents_digest: &[CheckpointContentsDigest],
315 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
316 self.checkpoint_content.multi_get(contents_digest)
317 }
318
319 pub fn get_highest_verified_checkpoint(
320 &self,
321 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
322 let highest_verified = if let Some(highest_verified) =
323 self.watermarks.get(&CheckpointWatermark::HighestVerified)?
324 {
325 highest_verified
326 } else {
327 return Ok(None);
328 };
329 self.get_checkpoint_by_digest(&highest_verified.1)
330 }
331
332 pub fn get_highest_synced_checkpoint(
333 &self,
334 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
335 let highest_synced = if let Some(highest_synced) =
336 self.watermarks.get(&CheckpointWatermark::HighestSynced)?
337 {
338 highest_synced
339 } else {
340 return Ok(None);
341 };
342 self.get_checkpoint_by_digest(&highest_synced.1)
343 }
344
345 pub fn get_highest_executed_checkpoint_seq_number(
346 &self,
347 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
348 if let Some(highest_executed) =
349 self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
350 {
351 Ok(Some(highest_executed.0))
352 } else {
353 Ok(None)
354 }
355 }
356
357 pub fn get_highest_executed_checkpoint(
358 &self,
359 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
360 let highest_executed = if let Some(highest_executed) =
361 self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
362 {
363 highest_executed
364 } else {
365 return Ok(None);
366 };
367 self.get_checkpoint_by_digest(&highest_executed.1)
368 }
369
370 pub fn get_highest_pruned_checkpoint_seq_number(
371 &self,
372 ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
373 Ok(self
374 .watermarks
375 .get(&CheckpointWatermark::HighestPruned)?
376 .unwrap_or_default()
377 .0)
378 }
379
380 pub fn get_checkpoint_contents(
381 &self,
382 digest: &CheckpointContentsDigest,
383 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
384 self.checkpoint_content.get(digest)
385 }
386
387 pub fn get_full_checkpoint_contents_by_sequence_number(
388 &self,
389 seq: CheckpointSequenceNumber,
390 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
391 self.full_checkpoint_content.get(&seq)
392 }
393
394 fn prune_local_summaries(&self) -> IotaResult {
395 if let Some((last_local_summary, _)) = self
396 .locally_computed_checkpoints
397 .unbounded_iter()
398 .skip_to_last()
399 .next()
400 {
401 let mut batch = self.locally_computed_checkpoints.batch();
402 batch.schedule_delete_range(
403 &self.locally_computed_checkpoints,
404 &0,
405 &last_local_summary,
406 )?;
407 batch.write()?;
408 info!("Pruned local summaries up to {:?}", last_local_summary);
409 }
410 Ok(())
411 }
412
413 fn check_for_checkpoint_fork(
414 &self,
415 local_checkpoint: &CheckpointSummary,
416 verified_checkpoint: &VerifiedCheckpoint,
417 ) {
418 if local_checkpoint != verified_checkpoint.data() {
419 let verified_contents = self
420 .get_checkpoint_contents(&verified_checkpoint.content_digest)
421 .map(|opt_contents| {
422 opt_contents
423 .map(|contents| format!("{:?}", contents))
424 .unwrap_or_else(|| {
425 format!(
426 "Verified checkpoint contents not found, digest: {:?}",
427 verified_checkpoint.content_digest,
428 )
429 })
430 })
431 .map_err(|e| {
432 format!(
433 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
434 verified_checkpoint.content_digest, e
435 )
436 })
437 .unwrap_or_else(|err_msg| err_msg);
438
439 let local_contents = self
440 .get_checkpoint_contents(&local_checkpoint.content_digest)
441 .map(|opt_contents| {
442 opt_contents
443 .map(|contents| format!("{:?}", contents))
444 .unwrap_or_else(|| {
445 format!(
446 "Local checkpoint contents not found, digest: {:?}",
447 local_checkpoint.content_digest
448 )
449 })
450 })
451 .map_err(|e| {
452 format!(
453 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
454 local_checkpoint.content_digest, e
455 )
456 })
457 .unwrap_or_else(|err_msg| err_msg);
458
459 error!(
461 verified_checkpoint = ?verified_checkpoint.data(),
462 ?verified_contents,
463 ?local_checkpoint,
464 ?local_contents,
465 "Local checkpoint fork detected!",
466 );
467 fatal!(
468 "Local checkpoint fork detected for sequence number: {}",
469 local_checkpoint.sequence_number()
470 );
471 }
472 }
473
474 pub fn insert_certified_checkpoint(
480 &self,
481 checkpoint: &VerifiedCheckpoint,
482 ) -> Result<(), TypedStoreError> {
483 debug!(
484 checkpoint_seq = checkpoint.sequence_number(),
485 "Inserting certified checkpoint",
486 );
487 let mut batch = self.certified_checkpoints.batch();
488 batch
489 .insert_batch(
490 &self.certified_checkpoints,
491 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
492 )?
493 .insert_batch(
494 &self.checkpoint_by_digest,
495 [(checkpoint.digest(), checkpoint.serializable_ref())],
496 )?;
497 if checkpoint.next_epoch_committee().is_some() {
498 batch.insert_batch(
499 &self.epoch_last_checkpoint_map,
500 [(&checkpoint.epoch(), checkpoint.sequence_number())],
501 )?;
502 }
503 batch.write()?;
504
505 if let Some(local_checkpoint) = self
506 .locally_computed_checkpoints
507 .get(checkpoint.sequence_number())?
508 {
509 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
510 }
511
512 Ok(())
513 }
514
515 #[instrument(level = "debug", skip_all)]
518 pub fn insert_verified_checkpoint(
519 &self,
520 checkpoint: &VerifiedCheckpoint,
521 ) -> Result<(), TypedStoreError> {
522 self.insert_certified_checkpoint(checkpoint)?;
523 self.update_highest_verified_checkpoint(checkpoint)
524 }
525
526 pub fn update_highest_verified_checkpoint(
527 &self,
528 checkpoint: &VerifiedCheckpoint,
529 ) -> Result<(), TypedStoreError> {
530 if Some(*checkpoint.sequence_number())
531 > self
532 .get_highest_verified_checkpoint()?
533 .map(|x| *x.sequence_number())
534 {
535 debug!(
536 checkpoint_seq = checkpoint.sequence_number(),
537 "Updating highest verified checkpoint",
538 );
539 self.watermarks.insert(
540 &CheckpointWatermark::HighestVerified,
541 &(*checkpoint.sequence_number(), *checkpoint.digest()),
542 )?;
543 }
544
545 Ok(())
546 }
547
548 pub fn update_highest_synced_checkpoint(
549 &self,
550 checkpoint: &VerifiedCheckpoint,
551 ) -> Result<(), TypedStoreError> {
552 debug!(
553 checkpoint_seq = checkpoint.sequence_number(),
554 "Updating highest synced checkpoint",
555 );
556 self.watermarks.insert(
557 &CheckpointWatermark::HighestSynced,
558 &(*checkpoint.sequence_number(), *checkpoint.digest()),
559 )
560 }
561
562 pub fn update_highest_executed_checkpoint(
563 &self,
564 checkpoint: &VerifiedCheckpoint,
565 ) -> Result<(), TypedStoreError> {
566 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
567 if seq_number >= *checkpoint.sequence_number() {
568 return Ok(());
569 }
570 assert_eq!(
571 seq_number + 1,
572 *checkpoint.sequence_number(),
573 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
574 checkpoint.sequence_number(),
575 seq_number
576 );
577 }
578 debug!(
579 checkpoint_seq = checkpoint.sequence_number(),
580 "Updating highest executed checkpoint",
581 );
582 self.watermarks.insert(
583 &CheckpointWatermark::HighestExecuted,
584 &(*checkpoint.sequence_number(), *checkpoint.digest()),
585 )
586 }
587
588 pub fn update_highest_pruned_checkpoint(
589 &self,
590 checkpoint: &VerifiedCheckpoint,
591 ) -> Result<(), TypedStoreError> {
592 self.watermarks.insert(
593 &CheckpointWatermark::HighestPruned,
594 &(*checkpoint.sequence_number(), *checkpoint.digest()),
595 )
596 }
597
598 pub fn set_highest_executed_checkpoint_subtle(
604 &self,
605 checkpoint: &VerifiedCheckpoint,
606 ) -> Result<(), TypedStoreError> {
607 self.watermarks.insert(
608 &CheckpointWatermark::HighestExecuted,
609 &(*checkpoint.sequence_number(), *checkpoint.digest()),
610 )
611 }
612
613 pub fn insert_checkpoint_contents(
614 &self,
615 contents: CheckpointContents,
616 ) -> Result<(), TypedStoreError> {
617 debug!(
618 checkpoint_seq = ?contents.digest(),
619 "Inserting checkpoint contents",
620 );
621 self.checkpoint_content.insert(contents.digest(), &contents)
622 }
623
624 pub fn insert_verified_checkpoint_contents(
625 &self,
626 checkpoint: &VerifiedCheckpoint,
627 full_contents: VerifiedCheckpointContents,
628 ) -> Result<(), TypedStoreError> {
629 let mut batch = self.full_checkpoint_content.batch();
630 batch.insert_batch(
631 &self.checkpoint_sequence_by_contents_digest,
632 [(&checkpoint.content_digest, checkpoint.sequence_number())],
633 )?;
634 let full_contents = full_contents.into_inner();
635 batch.insert_batch(
636 &self.full_checkpoint_content,
637 [(checkpoint.sequence_number(), &full_contents)],
638 )?;
639
640 let contents = full_contents.into_checkpoint_contents();
641 assert_eq!(&checkpoint.content_digest, contents.digest());
642
643 batch.insert_batch(&self.checkpoint_content, [(contents.digest(), &contents)])?;
644
645 batch.write()
646 }
647
648 pub fn delete_full_checkpoint_contents(
649 &self,
650 seq: CheckpointSequenceNumber,
651 ) -> Result<(), TypedStoreError> {
652 self.full_checkpoint_content.remove(&seq)
653 }
654
655 pub fn get_epoch_last_checkpoint(
656 &self,
657 epoch_id: EpochId,
658 ) -> IotaResult<Option<VerifiedCheckpoint>> {
659 let seq = self.epoch_last_checkpoint_map.get(&epoch_id)?;
660 let checkpoint = match seq {
661 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
662 None => None,
663 };
664 Ok(checkpoint)
665 }
666
667 pub fn insert_epoch_last_checkpoint(
668 &self,
669 epoch_id: EpochId,
670 checkpoint: &VerifiedCheckpoint,
671 ) -> IotaResult {
672 self.epoch_last_checkpoint_map
673 .insert(&epoch_id, checkpoint.sequence_number())?;
674 Ok(())
675 }
676
677 pub fn get_epoch_state_commitments(
678 &self,
679 epoch: EpochId,
680 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
681 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
682 checkpoint
683 .end_of_epoch_data
684 .as_ref()
685 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
686 .epoch_commitments
687 .clone()
688 });
689 Ok(commitments)
690 }
691
692 pub fn get_epoch_stats(
695 &self,
696 epoch: EpochId,
697 last_checkpoint: &CheckpointSummary,
698 ) -> Option<EpochStats> {
699 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
700 (0, 0)
701 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
702 (
703 checkpoint.sequence_number + 1,
704 checkpoint.network_total_transactions,
705 )
706 } else {
707 return None;
708 };
709 Some(EpochStats {
710 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
711 transaction_count: last_checkpoint.network_total_transactions
712 - prev_epoch_network_transactions,
713 total_gas_reward: last_checkpoint
714 .epoch_rolling_gas_cost_summary
715 .computation_cost,
716 })
717 }
718
719 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
720 self.checkpoint_content
722 .checkpoint_db(path)
723 .map_err(Into::into)
724 }
725
726 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
727 let mut wb = self.watermarks.batch();
728 wb.delete_batch(
729 &self.watermarks,
730 std::iter::once(CheckpointWatermark::HighestExecuted),
731 )?;
732 wb.write()?;
733 Ok(())
734 }
735
736 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
737 self.delete_highest_executed_checkpoint_test_only()?;
738 self.watermarks.rocksdb.flush()?;
739 Ok(())
740 }
741
742 #[instrument(level = "debug", skip_all)]
747 pub async fn reexecute_local_checkpoints(
748 &self,
749 state: &AuthorityState,
750 epoch_store: &AuthorityPerEpochStore,
751 ) {
752 info!("rexecuting locally computed checkpoints for crash recovery");
753 let epoch = epoch_store.epoch();
754 let highest_executed = self
755 .get_highest_executed_checkpoint_seq_number()
756 .expect("get_highest_executed_checkpoint_seq_number should not fail")
757 .unwrap_or(0);
758
759 let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
760 info!("no locally built checkpoints to verify");
761 return;
762 };
763
764 for seq in highest_executed + 1..=*highest_built.sequence_number() {
765 info!(?seq, "Re-executing locally computed checkpoint");
766 let Some(checkpoint) = self
767 .get_locally_computed_checkpoint(seq)
768 .expect("get_locally_computed_checkpoint should not fail")
769 else {
770 panic!("locally computed checkpoint {:?} not found", seq);
771 };
772
773 let Some(contents) = self
774 .get_checkpoint_contents(&checkpoint.content_digest)
775 .expect("get_checkpoint_contents should not fail")
776 else {
777 panic!(
778 "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
779 seq, checkpoint.content_digest
780 );
781 };
782
783 let cache = state.get_transaction_cache_reader();
784
785 let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
786 let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
787 let txns = cache
788 .multi_get_transaction_blocks(&tx_digests)
789 .expect("multi_get_transaction_blocks should not fail");
790 for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
791 if tx.is_none() {
792 panic!("transaction {:?} not found", digest);
793 }
794 }
795
796 let txns: Vec<_> = txns
797 .into_iter()
798 .map(|tx| tx.unwrap())
799 .zip(fx_digests.into_iter())
800 .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
802 .map(|(tx, fx)| {
803 (
804 VerifiedExecutableTransaction::new_from_checkpoint(
805 (*tx).clone(),
806 epoch,
807 seq,
808 ),
809 fx,
810 )
811 })
812 .collect();
813
814 let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
815
816 info!(
817 ?seq,
818 ?tx_digests,
819 "Re-executing transactions for locally built checkpoint"
820 );
821 state.enqueue_with_expected_effects_digest(txns, epoch_store);
824
825 let waiting_logger = tokio::task::spawn(async move {
829 let mut interval = tokio::time::interval(Duration::from_secs(1));
830 loop {
831 interval.tick().await;
832 warn!(?seq, "Still waiting for re-execution to complete");
833 }
834 });
835
836 cache
837 .notify_read_executed_effects_digests(&tx_digests)
838 .await
839 .expect("notify_read_executed_effects_digests should not fail");
840
841 waiting_logger.abort();
842 waiting_logger.await.ok();
843 info!(?seq, "Re-execution completed for locally built checkpoint");
844 }
845
846 info!("Re-execution of locally built checkpoints completed");
847 }
848}
849
850#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
851pub enum CheckpointWatermark {
852 HighestVerified,
853 HighestSynced,
854 HighestExecuted,
855 HighestPruned,
856}
857
858pub struct CheckpointBuilder {
859 state: Arc<AuthorityState>,
860 tables: Arc<CheckpointStore>,
861 epoch_store: Arc<AuthorityPerEpochStore>,
862 notify: Arc<Notify>,
863 notify_aggregator: Arc<Notify>,
864 last_built: watch::Sender<CheckpointSequenceNumber>,
865 effects_store: Arc<dyn TransactionCacheRead>,
866 accumulator: Weak<StateAccumulator>,
867 output: Box<dyn CheckpointOutput>,
868 metrics: Arc<CheckpointMetrics>,
869 max_transactions_per_checkpoint: usize,
870 max_checkpoint_size_bytes: usize,
871}
872
873pub struct CheckpointAggregator {
874 tables: Arc<CheckpointStore>,
875 epoch_store: Arc<AuthorityPerEpochStore>,
876 notify: Arc<Notify>,
877 current: Option<CheckpointSignatureAggregator>,
878 output: Box<dyn CertifiedCheckpointOutput>,
879 state: Arc<AuthorityState>,
880 metrics: Arc<CheckpointMetrics>,
881}
882
883pub struct CheckpointSignatureAggregator {
885 next_index: u64,
886 summary: CheckpointSummary,
887 digest: CheckpointDigest,
888 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
890 tables: Arc<CheckpointStore>,
891 state: Arc<AuthorityState>,
892 metrics: Arc<CheckpointMetrics>,
893}
894
895impl CheckpointBuilder {
896 fn new(
897 state: Arc<AuthorityState>,
898 tables: Arc<CheckpointStore>,
899 epoch_store: Arc<AuthorityPerEpochStore>,
900 notify: Arc<Notify>,
901 effects_store: Arc<dyn TransactionCacheRead>,
902 accumulator: Weak<StateAccumulator>,
903 output: Box<dyn CheckpointOutput>,
904 notify_aggregator: Arc<Notify>,
905 last_built: watch::Sender<CheckpointSequenceNumber>,
906 metrics: Arc<CheckpointMetrics>,
907 max_transactions_per_checkpoint: usize,
908 max_checkpoint_size_bytes: usize,
909 ) -> Self {
910 Self {
911 state,
912 tables,
913 epoch_store,
914 notify,
915 effects_store,
916 accumulator,
917 output,
918 notify_aggregator,
919 last_built,
920 metrics,
921 max_transactions_per_checkpoint,
922 max_checkpoint_size_bytes,
923 }
924 }
925
926 async fn run(mut self) {
929 info!("Starting CheckpointBuilder");
930 loop {
931 self.maybe_build_checkpoints().await;
932
933 self.notify.notified().await;
934 }
935 }
936
937 async fn maybe_build_checkpoints(&mut self) {
938 let _scope = monitored_scope("BuildCheckpoints");
939
940 let summary = self
942 .epoch_store
943 .last_built_checkpoint_builder_summary()
944 .expect("epoch should not have ended");
945 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
946 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
947
948 let min_checkpoint_interval_ms = self
949 .epoch_store
950 .protocol_config()
951 .min_checkpoint_interval_ms_as_option()
952 .unwrap_or_default();
953 let mut grouped_pending_checkpoints = Vec::new();
954 let mut checkpoints_iter = self
955 .epoch_store
956 .get_pending_checkpoints(last_height)
957 .expect("unexpected epoch store error")
958 .into_iter()
959 .peekable();
960 while let Some((height, pending)) = checkpoints_iter.next() {
961 let current_timestamp = pending.details().timestamp_ms;
964 let can_build = match last_timestamp {
965 Some(last_timestamp) => {
966 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
967 }
968 None => true,
969 } || checkpoints_iter
972 .peek()
973 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
974 || pending.details().last_of_epoch;
976 grouped_pending_checkpoints.push(pending);
977 if !can_build {
978 debug!(
979 checkpoint_commit_height = height,
980 ?last_timestamp,
981 ?current_timestamp,
982 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
983 );
984 continue;
985 }
986
987 last_height = Some(height);
989 last_timestamp = Some(current_timestamp);
990 debug!(
991 checkpoint_commit_height = height,
992 "Making checkpoint at commit height"
993 );
994
995 match self
996 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
997 .await
998 {
999 Ok(seq) => {
1000 self.last_built.send_if_modified(|cur| {
1001 if seq > *cur {
1002 *cur = seq;
1003 true
1004 } else {
1005 false
1006 }
1007 });
1008 }
1009 Err(e) => {
1010 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1011 tokio::time::sleep(Duration::from_secs(1)).await;
1012 self.metrics.checkpoint_errors.inc();
1013 return;
1014 }
1015 }
1016 tokio::task::yield_now().await;
1019 }
1020 debug!(
1021 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1022 grouped_pending_checkpoints.len(),
1023 );
1024 }
1025
1026 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1027 async fn make_checkpoint(
1028 &self,
1029 pendings: Vec<PendingCheckpoint>,
1030 ) -> anyhow::Result<CheckpointSequenceNumber> {
1031 let last_details = pendings.last().unwrap().details().clone();
1032
1033 let mut effects_in_current_checkpoint = BTreeSet::new();
1039
1040 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1043 for pending_checkpoint in pendings.into_iter() {
1044 let pending = pending_checkpoint.into_v1();
1045 let txn_in_checkpoint = self
1046 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1047 .await?;
1048 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1049 }
1050 let new_checkpoints = self
1051 .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1052 .await?;
1053 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1054 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1055 .await?;
1056 Ok(highest_sequence)
1057 }
1058
1059 #[instrument(level = "debug", skip_all)]
1064 async fn resolve_checkpoint_transactions(
1065 &self,
1066 roots: Vec<TransactionKey>,
1067 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1068 ) -> IotaResult<Vec<TransactionEffects>> {
1069 self.metrics
1070 .checkpoint_roots_count
1071 .inc_by(roots.len() as u64);
1072
1073 let root_digests = self
1074 .epoch_store
1075 .notify_read_executed_digests(&roots)
1076 .in_monitored_scope("CheckpointNotifyDigests")
1077 .await?;
1078 let root_effects = self
1079 .effects_store
1080 .notify_read_executed_effects(&root_digests)
1081 .in_monitored_scope("CheckpointNotifyRead")
1082 .await?;
1083
1084 let _scope = monitored_scope("CheckpointBuilder");
1085
1086 let consensus_commit_prologue = {
1087 let consensus_commit_prologue = self
1091 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1092 .await?;
1093
1094 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1098 let unsorted_ccp = self.complete_checkpoint_effects(
1099 vec![ccp_effects.clone()],
1100 effects_in_current_checkpoint,
1101 )?;
1102
1103 assert_eq!(unsorted_ccp.len(), 1);
1106 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1107 }
1108 consensus_commit_prologue
1109 };
1110
1111 let unsorted =
1112 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1113
1114 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1115 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1116 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1117 #[cfg(debug_assertions)]
1118 {
1119 for tx in unsorted.iter() {
1122 assert!(tx.transaction_digest() != &_ccp_digest);
1123 }
1124 }
1125 sorted.push(ccp_effects);
1126 }
1127 sorted.extend(CausalOrder::causal_sort(unsorted));
1128
1129 #[cfg(msim)]
1130 {
1131 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1133 }
1134
1135 Ok(sorted)
1136 }
1137
1138 async fn extract_consensus_commit_prologue(
1143 &self,
1144 root_digests: &[TransactionDigest],
1145 root_effects: &[TransactionEffects],
1146 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1147 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1148 if root_digests.is_empty() {
1149 return Ok(None);
1150 }
1151
1152 let first_tx = self
1157 .state
1158 .get_transaction_cache_reader()
1159 .get_transaction_block(&root_digests[0])?
1160 .expect("Transaction block must exist");
1161
1162 Ok(match first_tx.transaction_data().kind() {
1163 TransactionKind::ConsensusCommitPrologueV1(_) => {
1164 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1165 Some((*first_tx.digest(), root_effects[0].clone()))
1166 }
1167 _ => None,
1168 })
1169 }
1170
1171 #[instrument(level = "debug", skip_all)]
1173 async fn write_checkpoints(
1174 &self,
1175 height: CheckpointHeight,
1176 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1177 ) -> IotaResult {
1178 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1179 let mut batch = self.tables.checkpoint_content.batch();
1180 let mut all_tx_digests =
1181 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1182
1183 for (summary, contents) in &new_checkpoints {
1184 debug!(
1185 checkpoint_commit_height = height,
1186 checkpoint_seq = summary.sequence_number,
1187 contents_digest = ?contents.digest(),
1188 "writing checkpoint",
1189 );
1190 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1191
1192 self.output
1193 .checkpoint_created(summary, contents, &self.epoch_store, &self.tables)
1194 .await?;
1195
1196 self.metrics
1197 .transactions_included_in_checkpoint
1198 .inc_by(contents.size() as u64);
1199 let sequence_number = summary.sequence_number;
1200 self.metrics
1201 .last_constructed_checkpoint
1202 .set(sequence_number as i64);
1203
1204 batch.insert_batch(
1205 &self.tables.checkpoint_content,
1206 [(contents.digest(), contents)],
1207 )?;
1208
1209 batch.insert_batch(
1210 &self.tables.locally_computed_checkpoints,
1211 [(sequence_number, summary)],
1212 )?;
1213 }
1214
1215 self.state
1227 .get_cache_commit()
1228 .persist_transactions(&all_tx_digests)
1229 .await?;
1230
1231 batch.write()?;
1232
1233 for (local_checkpoint, _) in &new_checkpoints {
1234 if let Some(certified_checkpoint) = self
1235 .tables
1236 .certified_checkpoints
1237 .get(local_checkpoint.sequence_number())?
1238 {
1239 self.tables
1240 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1241 }
1242 }
1243
1244 self.notify_aggregator.notify_one();
1245 self.epoch_store
1246 .process_pending_checkpoint(height, new_checkpoints)?;
1247 Ok(())
1248 }
1249
1250 #[expect(clippy::type_complexity)]
1251 fn split_checkpoint_chunks(
1252 &self,
1253 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1254 signatures: Vec<Vec<GenericSignature>>,
1255 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1256 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1257 let mut chunks = Vec::new();
1258 let mut chunk = Vec::new();
1259 let mut chunk_size: usize = 0;
1260 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1261 .into_iter()
1262 .zip(signatures.into_iter())
1263 {
1264 let size = transaction_size
1269 + bcs::serialized_size(&effects)?
1270 + bcs::serialized_size(&signatures)?;
1271 if chunk.len() == self.max_transactions_per_checkpoint
1272 || (chunk_size + size) > self.max_checkpoint_size_bytes
1273 {
1274 if chunk.is_empty() {
1275 warn!(
1277 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1278 self.max_checkpoint_size_bytes
1279 );
1280 } else {
1281 chunks.push(chunk);
1282 chunk = Vec::new();
1283 chunk_size = 0;
1284 }
1285 }
1286
1287 chunk.push((effects, signatures));
1288 chunk_size += size;
1289 }
1290
1291 if !chunk.is_empty() || chunks.is_empty() {
1292 chunks.push(chunk);
1297 }
1303 Ok(chunks)
1304 }
1305
1306 #[instrument(level = "debug", skip_all)]
1309 async fn create_checkpoints(
1310 &self,
1311 all_effects: Vec<TransactionEffects>,
1312 details: &PendingCheckpointInfo,
1313 ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1314 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1315 let total = all_effects.len();
1316 let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
1317 if last_checkpoint.is_none() {
1318 let epoch = self.epoch_store.epoch();
1319 if epoch > 0 {
1320 let previous_epoch = epoch - 1;
1321 let last_verified = self.tables.get_epoch_last_checkpoint(previous_epoch)?;
1322 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1323 if let Some((ref seq, _)) = last_checkpoint {
1324 debug!(
1325 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1326 );
1327 } else {
1328 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1331 }
1332 }
1333 }
1334 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1335 debug!(
1336 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1337 checkpoint_timestamp = details.timestamp_ms,
1338 "Creating checkpoint(s) for {} transactions",
1339 all_effects.len(),
1340 );
1341
1342 let all_digests: Vec<_> = all_effects
1343 .iter()
1344 .map(|effect| *effect.transaction_digest())
1345 .collect();
1346 let transactions_and_sizes = self
1347 .state
1348 .get_transaction_cache_reader()
1349 .get_transactions_and_serialized_sizes(&all_digests)?;
1350 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1351 let mut transactions = Vec::with_capacity(all_effects.len());
1352 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1353 let mut randomness_rounds = BTreeMap::new();
1354 {
1355 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1356 debug!(
1357 ?last_checkpoint_seq,
1358 "Waiting for {:?} certificates to appear in consensus",
1359 all_effects.len()
1360 );
1361
1362 for (effects, transaction_and_size) in all_effects
1363 .into_iter()
1364 .zip(transactions_and_sizes.into_iter())
1365 {
1366 let (transaction, size) = transaction_and_size
1367 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1368 match transaction.inner().transaction_data().kind() {
1369 TransactionKind::ConsensusCommitPrologueV1(_)
1370 | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1371 }
1376 TransactionKind::RandomnessStateUpdate(rsu) => {
1377 randomness_rounds
1378 .insert(*effects.transaction_digest(), rsu.randomness_round);
1379 }
1380 _ => {
1381 transaction_keys.push(SequencedConsensusTransactionKey::External(
1384 ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1385 ));
1386 }
1387 }
1388 transactions.push(transaction);
1389 all_effects_and_transaction_sizes.push((effects, size));
1390 }
1391
1392 self.epoch_store
1393 .consensus_messages_processed_notify(transaction_keys)
1394 .await?;
1395 }
1396
1397 let signatures = self
1398 .epoch_store
1399 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1400 debug!(
1401 ?last_checkpoint_seq,
1402 "Received {} checkpoint user signatures from consensus",
1403 signatures.len()
1404 );
1405
1406 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1407 let chunks_count = chunks.len();
1408
1409 let mut checkpoints = Vec::with_capacity(chunks_count);
1410 debug!(
1411 ?last_checkpoint_seq,
1412 "Creating {} checkpoints with {} transactions", chunks_count, total,
1413 );
1414
1415 let epoch = self.epoch_store.epoch();
1416 for (index, transactions) in chunks.into_iter().enumerate() {
1417 let first_checkpoint_of_epoch = index == 0
1418 && last_checkpoint
1419 .as_ref()
1420 .map(|(_, c)| c.epoch != epoch)
1421 .unwrap_or(true);
1422 if first_checkpoint_of_epoch {
1423 self.epoch_store
1424 .record_epoch_first_checkpoint_creation_time_metric();
1425 }
1426 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1427
1428 let sequence_number = last_checkpoint
1429 .as_ref()
1430 .map(|(_, c)| c.sequence_number + 1)
1431 .unwrap_or_default();
1432 let timestamp_ms = details.timestamp_ms;
1433 if let Some((_, last_checkpoint)) = &last_checkpoint {
1434 if last_checkpoint.timestamp_ms > timestamp_ms {
1435 error!(
1436 "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1437 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1438 );
1439 }
1440 }
1441
1442 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1443 let epoch_rolling_gas_cost_summary =
1444 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1445
1446 let end_of_epoch_data = if last_checkpoint_of_epoch {
1447 let (system_state_obj, system_epoch_info_event) = self
1448 .augment_epoch_last_checkpoint(
1449 &epoch_rolling_gas_cost_summary,
1450 timestamp_ms,
1451 &mut effects,
1452 &mut signatures,
1453 sequence_number,
1454 )
1455 .await?;
1456
1457 let epoch_supply_change =
1465 system_epoch_info_event.map_or(0, |event| event.supply_change());
1466
1467 let committee = system_state_obj
1468 .get_current_epoch_committee()
1469 .committee()
1470 .clone();
1471
1472 let root_state_digest = {
1475 let state_acc = self
1476 .accumulator
1477 .upgrade()
1478 .expect("No checkpoints should be getting built after local configuration");
1479 let acc = state_acc.accumulate_checkpoint(
1480 effects.clone(),
1481 sequence_number,
1482 &self.epoch_store,
1483 )?;
1484 state_acc
1485 .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
1486 .await?;
1487 state_acc
1488 .digest_epoch(self.epoch_store.clone(), sequence_number)
1489 .await?
1490 };
1491 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1492 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1493
1494 let epoch_commitments = vec![root_state_digest.into()];
1495
1496 Some(EndOfEpochData {
1497 next_epoch_committee: committee.voting_rights,
1498 next_epoch_protocol_version: ProtocolVersion::new(
1499 system_state_obj.protocol_version(),
1500 ),
1501 epoch_commitments,
1502 epoch_supply_change,
1503 })
1504 } else {
1505 None
1506 };
1507 let contents = CheckpointContents::new_with_digests_and_signatures(
1508 effects.iter().map(TransactionEffects::execution_digests),
1509 signatures,
1510 );
1511
1512 let num_txns = contents.size() as u64;
1513
1514 let network_total_transactions = last_checkpoint
1515 .as_ref()
1516 .map(|(_, c)| c.network_total_transactions + num_txns)
1517 .unwrap_or(num_txns);
1518
1519 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1520
1521 let matching_randomness_rounds: Vec<_> = effects
1522 .iter()
1523 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1524 .copied()
1525 .collect();
1526
1527 let summary = CheckpointSummary::new(
1528 self.epoch_store.protocol_config(),
1529 epoch,
1530 sequence_number,
1531 network_total_transactions,
1532 &contents,
1533 previous_digest,
1534 epoch_rolling_gas_cost_summary,
1535 end_of_epoch_data,
1536 timestamp_ms,
1537 matching_randomness_rounds,
1538 );
1539 summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1540 if last_checkpoint_of_epoch {
1541 info!(
1542 checkpoint_seq = sequence_number,
1543 "creating last checkpoint of epoch {}", epoch
1544 );
1545 if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
1546 self.epoch_store
1547 .report_epoch_metrics_at_last_checkpoint(stats);
1548 }
1549 }
1550 last_checkpoint = Some((sequence_number, summary.clone()));
1551 checkpoints.push((summary, contents));
1552 }
1553
1554 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1555 }
1556
1557 fn get_epoch_total_gas_cost(
1558 &self,
1559 last_checkpoint: Option<&CheckpointSummary>,
1560 cur_checkpoint_effects: &[TransactionEffects],
1561 ) -> GasCostSummary {
1562 let (previous_epoch, previous_gas_costs) = last_checkpoint
1563 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1564 .unwrap_or_default();
1565 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1566 if previous_epoch == self.epoch_store.epoch() {
1567 GasCostSummary::new(
1569 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1570 previous_gas_costs.computation_cost_burned
1571 + current_gas_costs.computation_cost_burned,
1572 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1573 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1574 previous_gas_costs.non_refundable_storage_fee
1575 + current_gas_costs.non_refundable_storage_fee,
1576 )
1577 } else {
1578 current_gas_costs
1579 }
1580 }
1581
1582 #[instrument(level = "error", skip_all)]
1585 async fn augment_epoch_last_checkpoint(
1586 &self,
1587 epoch_total_gas_cost: &GasCostSummary,
1588 epoch_start_timestamp_ms: CheckpointTimestamp,
1589 checkpoint_effects: &mut Vec<TransactionEffects>,
1590 signatures: &mut Vec<Vec<GenericSignature>>,
1591 checkpoint: CheckpointSequenceNumber,
1592 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1593 let (system_state, system_epoch_info_event, effects) = self
1594 .state
1595 .create_and_execute_advance_epoch_tx(
1596 &self.epoch_store,
1597 epoch_total_gas_cost,
1598 checkpoint,
1599 epoch_start_timestamp_ms,
1600 )
1601 .await?;
1602 checkpoint_effects.push(effects);
1603 signatures.push(vec![]);
1604 Ok((system_state, system_epoch_info_event))
1605 }
1606
1607 #[instrument(level = "debug", skip_all)]
1616 fn complete_checkpoint_effects(
1617 &self,
1618 mut roots: Vec<TransactionEffects>,
1619 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1620 ) -> IotaResult<Vec<TransactionEffects>> {
1621 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1622 let mut results = vec![];
1623 let mut seen = HashSet::new();
1624 loop {
1625 let mut pending = HashSet::new();
1626
1627 let transactions_included = self
1628 .epoch_store
1629 .builder_included_transactions_in_checkpoint(
1630 roots.iter().map(|e| e.transaction_digest()),
1631 )?;
1632
1633 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1634 let digest = effect.transaction_digest();
1635 seen.insert(*digest);
1638
1639 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1641 continue;
1642 }
1643
1644 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1646 continue;
1647 }
1648
1649 let existing_effects = self
1650 .epoch_store
1651 .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1652
1653 for (dependency, effects_signature_exists) in
1654 effect.dependencies().iter().zip(existing_effects.iter())
1655 {
1656 if !effects_signature_exists {
1661 continue;
1662 }
1663 if seen.insert(*dependency) {
1664 pending.insert(*dependency);
1665 }
1666 }
1667 results.push(effect);
1668 }
1669 if pending.is_empty() {
1670 break;
1671 }
1672 let pending = pending.into_iter().collect::<Vec<_>>();
1673 let effects = self.effects_store.multi_get_executed_effects(&pending)?;
1674 let effects = effects
1675 .into_iter()
1676 .zip(pending)
1677 .map(|(opt, digest)| match opt {
1678 Some(x) => x,
1679 None => panic!(
1680 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
1681 digest
1682 ),
1683 })
1684 .collect::<Vec<_>>();
1685 roots = effects;
1686 }
1687
1688 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1689 Ok(results)
1690 }
1691
1692 #[cfg(msim)]
1695 fn expensive_consensus_commit_prologue_invariants_check(
1696 &self,
1697 root_digests: &[TransactionDigest],
1698 sorted: &[TransactionEffects],
1699 ) {
1700 let root_txs = self
1702 .state
1703 .get_transaction_cache_reader()
1704 .multi_get_transaction_blocks(root_digests)
1705 .unwrap();
1706 let ccps = root_txs
1707 .iter()
1708 .filter_map(|tx| {
1709 tx.as_ref().filter(|tx| {
1710 matches!(
1711 tx.transaction_data().kind(),
1712 TransactionKind::ConsensusCommitPrologueV1(_)
1713 )
1714 })
1715 })
1716 .collect::<Vec<_>>();
1717
1718 assert!(ccps.len() <= 1);
1721
1722 let txs = self
1724 .state
1725 .get_transaction_cache_reader()
1726 .multi_get_transaction_blocks(
1727 &sorted
1728 .iter()
1729 .map(|tx| *tx.transaction_digest())
1730 .collect::<Vec<_>>(),
1731 )
1732 .unwrap();
1733
1734 if ccps.is_empty() {
1735 for tx in txs.iter().flatten() {
1739 assert!(!matches!(
1740 tx.transaction_data().kind(),
1741 TransactionKind::ConsensusCommitPrologueV1(_)
1742 ));
1743 }
1744 } else {
1745 assert!(matches!(
1748 txs[0].as_ref().unwrap().transaction_data().kind(),
1749 TransactionKind::ConsensusCommitPrologueV1(_)
1750 ));
1751
1752 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1753
1754 for tx in txs.iter().skip(1).flatten() {
1755 assert!(!matches!(
1756 tx.transaction_data().kind(),
1757 TransactionKind::ConsensusCommitPrologueV1(_)
1758 ));
1759 }
1760 }
1761 }
1762}
1763
1764impl CheckpointAggregator {
1765 fn new(
1766 tables: Arc<CheckpointStore>,
1767 epoch_store: Arc<AuthorityPerEpochStore>,
1768 notify: Arc<Notify>,
1769 output: Box<dyn CertifiedCheckpointOutput>,
1770 state: Arc<AuthorityState>,
1771 metrics: Arc<CheckpointMetrics>,
1772 ) -> Self {
1773 let current = None;
1774 Self {
1775 tables,
1776 epoch_store,
1777 notify,
1778 current,
1779 output,
1780 state,
1781 metrics,
1782 }
1783 }
1784
1785 async fn run(mut self) {
1791 info!("Starting CheckpointAggregator");
1792 loop {
1793 if let Err(e) = self.run_and_notify().await {
1794 error!(
1795 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1796 e
1797 );
1798 self.metrics.checkpoint_errors.inc();
1799 tokio::time::sleep(Duration::from_secs(1)).await;
1800 continue;
1801 }
1802
1803 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1804 }
1805 }
1806
1807 async fn run_and_notify(&mut self) -> IotaResult {
1808 let summaries = self.run_inner()?;
1809 for summary in summaries {
1810 self.output.certified_checkpoint_created(&summary).await?;
1811 }
1812 Ok(())
1813 }
1814
1815 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1816 let _scope = monitored_scope("CheckpointAggregator");
1817 let mut result = vec![];
1818 'outer: loop {
1819 let next_to_certify = self.next_checkpoint_to_certify();
1820 let current = if let Some(current) = &mut self.current {
1821 if current.summary.sequence_number < next_to_certify {
1827 self.current = None;
1828 continue;
1829 }
1830 current
1831 } else {
1832 let Some(summary) = self
1833 .epoch_store
1834 .get_built_checkpoint_summary(next_to_certify)?
1835 else {
1836 return Ok(result);
1837 };
1838 self.current = Some(CheckpointSignatureAggregator {
1839 next_index: 0,
1840 digest: summary.digest(),
1841 summary,
1842 signatures_by_digest: MultiStakeAggregator::new(
1843 self.epoch_store.committee().clone(),
1844 ),
1845 tables: self.tables.clone(),
1846 state: self.state.clone(),
1847 metrics: self.metrics.clone(),
1848 });
1849 self.current.as_mut().unwrap()
1850 };
1851
1852 let epoch_tables = self
1853 .epoch_store
1854 .tables()
1855 .expect("should not run past end of epoch");
1856 let iter = epoch_tables.get_pending_checkpoint_signatures_iter(
1857 current.summary.sequence_number,
1858 current.next_index,
1859 )?;
1860 for ((seq, index), data) in iter {
1861 if seq != current.summary.sequence_number {
1862 debug!(
1863 checkpoint_seq =? current.summary.sequence_number,
1864 "Not enough checkpoint signatures",
1865 );
1866 return Ok(result);
1868 }
1869 debug!(
1870 checkpoint_seq = current.summary.sequence_number,
1871 "Processing signature for checkpoint (digest: {:?}) from {:?}",
1872 current.summary.digest(),
1873 data.summary.auth_sig().authority.concise()
1874 );
1875 self.metrics
1876 .checkpoint_participation
1877 .with_label_values(&[&format!(
1878 "{:?}",
1879 data.summary.auth_sig().authority.concise()
1880 )])
1881 .inc();
1882 if let Ok(auth_signature) = current.try_aggregate(data) {
1883 let summary = VerifiedCheckpoint::new_unchecked(
1884 CertifiedCheckpointSummary::new_from_data_and_sig(
1885 current.summary.clone(),
1886 auth_signature,
1887 ),
1888 );
1889
1890 self.tables.insert_certified_checkpoint(&summary)?;
1891 self.metrics
1892 .last_certified_checkpoint
1893 .set(current.summary.sequence_number as i64);
1894 current
1895 .summary
1896 .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
1897 result.push(summary.into_inner());
1898 self.current = None;
1899 continue 'outer;
1900 } else {
1901 current.next_index = index + 1;
1902 }
1903 }
1904 break;
1905 }
1906 Ok(result)
1907 }
1908
1909 fn next_checkpoint_to_certify(&self) -> CheckpointSequenceNumber {
1910 self.tables
1911 .certified_checkpoints
1912 .unbounded_iter()
1913 .skip_to_last()
1914 .next()
1915 .map(|(seq, _)| seq + 1)
1916 .unwrap_or_default()
1917 }
1918}
1919
1920impl CheckpointSignatureAggregator {
1921 #[expect(clippy::result_unit_err)]
1922 pub fn try_aggregate(
1923 &mut self,
1924 data: CheckpointSignatureMessage,
1925 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
1926 let their_digest = *data.summary.digest();
1927 let (_, signature) = data.summary.into_data_and_sig();
1928 let author = signature.authority;
1929 let envelope =
1930 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
1931 match self.signatures_by_digest.insert(their_digest, envelope) {
1932 InsertResult::Failed {
1934 error:
1935 IotaError::StakeAggregatorRepeatedSigner {
1936 conflicting_sig: false,
1937 ..
1938 },
1939 } => Err(()),
1940 InsertResult::Failed { error } => {
1941 warn!(
1942 checkpoint_seq = self.summary.sequence_number,
1943 "Failed to aggregate new signature from validator {:?}: {:?}",
1944 author.concise(),
1945 error
1946 );
1947 self.check_for_split_brain();
1948 Err(())
1949 }
1950 InsertResult::QuorumReached(cert) => {
1951 if their_digest != self.digest {
1955 self.metrics.remote_checkpoint_forks.inc();
1956 warn!(
1957 checkpoint_seq = self.summary.sequence_number,
1958 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
1959 author.concise(),
1960 their_digest,
1961 self.digest
1962 );
1963 return Err(());
1964 }
1965 Ok(cert)
1966 }
1967 InsertResult::NotEnoughVotes {
1968 bad_votes: _,
1969 bad_authorities: _,
1970 } => {
1971 self.check_for_split_brain();
1972 Err(())
1973 }
1974 }
1975 }
1976
1977 fn check_for_split_brain(&self) {
1982 debug!(
1983 checkpoint_seq = self.summary.sequence_number,
1984 "Checking for split brain condition"
1985 );
1986 if self.signatures_by_digest.quorum_unreachable() {
1987 let digests_by_stake_messages = self
1993 .signatures_by_digest
1994 .get_all_unique_values()
1995 .into_iter()
1996 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
1997 .map(|(digest, (_authorities, total_stake))| {
1998 format!("{:?} (total stake: {})", digest, total_stake)
1999 })
2000 .collect::<Vec<String>>();
2001 error!(
2002 checkpoint_seq = self.summary.sequence_number,
2003 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2004 self.signatures_by_digest.uncommitted_stake(),
2005 digests_by_stake_messages,
2006 );
2007 self.metrics.split_brain_checkpoint_forks.inc();
2008
2009 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2010 let local_summary = self.summary.clone();
2011 let state = self.state.clone();
2012 let tables = self.tables.clone();
2013
2014 tokio::spawn(async move {
2015 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2016 });
2017 }
2018 }
2019}
2020
2021async fn diagnose_split_brain(
2027 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2028 local_summary: CheckpointSummary,
2029 state: Arc<AuthorityState>,
2030 tables: Arc<CheckpointStore>,
2031) {
2032 debug!(
2033 checkpoint_seq = local_summary.sequence_number,
2034 "Running split brain diagnostics..."
2035 );
2036 let time = Utc::now();
2037 let digest_to_validator = all_unique_values
2039 .iter()
2040 .filter_map(|(digest, (validators, _))| {
2041 if *digest != local_summary.digest() {
2042 let random_validator = validators.choose(&mut OsRng).unwrap();
2043 Some((*digest, *random_validator))
2044 } else {
2045 None
2046 }
2047 })
2048 .collect::<HashMap<_, _>>();
2049 if digest_to_validator.is_empty() {
2050 panic!(
2051 "Given split brain condition, there should be at \
2052 least one validator that disagrees with local signature"
2053 );
2054 }
2055
2056 let epoch_store = state.load_epoch_store_one_call_per_task();
2057 let committee = epoch_store
2058 .epoch_start_state()
2059 .get_iota_committee_with_network_metadata();
2060 let network_config = default_iota_network_config();
2061 let network_clients =
2062 make_network_authority_clients_with_network_config(&committee, &network_config);
2063
2064 let response_futures = digest_to_validator
2066 .values()
2067 .cloned()
2068 .map(|validator| {
2069 let client = network_clients
2070 .get(&validator)
2071 .expect("Failed to get network client");
2072 let request = CheckpointRequest {
2073 sequence_number: Some(local_summary.sequence_number),
2074 request_content: true,
2075 certified: false,
2076 };
2077 client.handle_checkpoint(request)
2078 })
2079 .collect::<Vec<_>>();
2080
2081 let digest_name_pair = digest_to_validator.iter();
2082 let response_data = futures::future::join_all(response_futures)
2083 .await
2084 .into_iter()
2085 .zip(digest_name_pair)
2086 .filter_map(|(response, (digest, name))| match response {
2087 Ok(response) => match response {
2088 CheckpointResponse {
2089 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2090 contents: Some(contents),
2091 } => Some((*name, *digest, summary, contents)),
2092 CheckpointResponse {
2093 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2094 contents: _,
2095 } => {
2096 panic!("Expected pending checkpoint, but got certified checkpoint");
2097 }
2098 CheckpointResponse {
2099 checkpoint: None,
2100 contents: _,
2101 } => {
2102 error!(
2103 "Summary for checkpoint {:?} not found on validator {:?}",
2104 local_summary.sequence_number, name
2105 );
2106 None
2107 }
2108 CheckpointResponse {
2109 checkpoint: _,
2110 contents: None,
2111 } => {
2112 error!(
2113 "Contents for checkpoint {:?} not found on validator {:?}",
2114 local_summary.sequence_number, name
2115 );
2116 None
2117 }
2118 },
2119 Err(e) => {
2120 error!(
2121 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2122 e
2123 );
2124 None
2125 }
2126 })
2127 .collect::<Vec<_>>();
2128
2129 let local_checkpoint_contents = tables
2130 .get_checkpoint_contents(&local_summary.content_digest)
2131 .unwrap_or_else(|_| {
2132 panic!(
2133 "Could not find checkpoint contents for digest {:?}",
2134 local_summary.digest()
2135 )
2136 })
2137 .unwrap_or_else(|| {
2138 panic!(
2139 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2140 local_summary.sequence_number,
2141 local_summary.digest()
2142 )
2143 });
2144 let local_contents_text = format!("{local_checkpoint_contents:?}");
2145
2146 let local_summary_text = format!("{local_summary:?}");
2147 let local_validator = state.name.concise();
2148 let diff_patches = response_data
2149 .iter()
2150 .map(|(name, other_digest, other_summary, contents)| {
2151 let other_contents_text = format!("{contents:?}");
2152 let other_summary_text = format!("{other_summary:?}");
2153 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2154 .enumerate_transactions(&local_summary)
2155 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2156 .unzip();
2157 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2158 .enumerate_transactions(other_summary)
2159 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2160 .unzip();
2161 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2162 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2163 let local_transactions_text = format!("{local_transactions:#?}");
2164 let other_transactions_text = format!("{other_transactions:#?}");
2165 let transactions_patch =
2166 create_patch(&local_transactions_text, &other_transactions_text);
2167 let local_effects_text = format!("{local_effects:#?}");
2168 let other_effects_text = format!("{other_effects:#?}");
2169 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2170 let seq_number = local_summary.sequence_number;
2171 let local_digest = local_summary.digest();
2172 let other_validator = name.concise();
2173 format!(
2174 "Checkpoint: {seq_number:?}\n\
2175 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2176 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2177 Summary Diff: \n{summary_patch}\n\n\
2178 Contents Diff: \n{contents_patch}\n\n\
2179 Transactions Diff: \n{transactions_patch}\n\n\
2180 Effects Diff: \n{effects_patch}",
2181 )
2182 })
2183 .collect::<Vec<_>>()
2184 .join("\n\n\n");
2185
2186 let header = format!(
2187 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2188 Datetime: {time}",
2189 );
2190 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2191 let path = tempfile::tempdir()
2192 .expect("Failed to create tempdir")
2193 .into_path()
2194 .join(Path::new("checkpoint_fork_dump.txt"));
2195 let mut file = File::create(path).unwrap();
2196 write!(file, "{}", fork_logs_text).unwrap();
2197 debug!("{}", fork_logs_text);
2198
2199 fail_point!("split_brain_reached");
2200}
2201
2202pub trait CheckpointServiceNotify {
2203 fn notify_checkpoint_signature(
2204 &self,
2205 epoch_store: &AuthorityPerEpochStore,
2206 info: &CheckpointSignatureMessage,
2207 ) -> IotaResult;
2208
2209 fn notify_checkpoint(&self) -> IotaResult;
2210}
2211
2212enum CheckpointServiceState {
2213 Unstarted((CheckpointBuilder, CheckpointAggregator)),
2214 Started,
2215}
2216
2217impl CheckpointServiceState {
2218 fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2219 let mut state = CheckpointServiceState::Started;
2220 std::mem::swap(self, &mut state);
2221
2222 match state {
2223 CheckpointServiceState::Unstarted((builder, aggregator)) => (builder, aggregator),
2224 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2225 }
2226 }
2227}
2228
2229pub struct CheckpointService {
2230 tables: Arc<CheckpointStore>,
2231 notify_builder: Arc<Notify>,
2232 notify_aggregator: Arc<Notify>,
2233 last_signature_index: Mutex<u64>,
2234 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2236 highest_previously_built_seq: CheckpointSequenceNumber,
2239 metrics: Arc<CheckpointMetrics>,
2240 state: Mutex<CheckpointServiceState>,
2241}
2242
2243impl CheckpointService {
2244 pub fn build(
2248 state: Arc<AuthorityState>,
2249 checkpoint_store: Arc<CheckpointStore>,
2250 epoch_store: Arc<AuthorityPerEpochStore>,
2251 effects_store: Arc<dyn TransactionCacheRead>,
2252 accumulator: Weak<StateAccumulator>,
2253 checkpoint_output: Box<dyn CheckpointOutput>,
2254 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2255 metrics: Arc<CheckpointMetrics>,
2256 max_transactions_per_checkpoint: usize,
2257 max_checkpoint_size_bytes: usize,
2258 ) -> Arc<Self> {
2259 info!(
2260 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2261 );
2262 let notify_builder = Arc::new(Notify::new());
2263 let notify_aggregator = Arc::new(Notify::new());
2264
2265 let highest_previously_built_seq = epoch_store
2266 .last_built_checkpoint_builder_summary()
2267 .expect("epoch should not have ended")
2268 .map(|s| s.summary.sequence_number)
2269 .unwrap_or(0);
2270
2271 let (highest_currently_built_seq_tx, _) = watch::channel(highest_previously_built_seq);
2272
2273 let aggregator = CheckpointAggregator::new(
2274 checkpoint_store.clone(),
2275 epoch_store.clone(),
2276 notify_aggregator.clone(),
2277 certified_checkpoint_output,
2278 state.clone(),
2279 metrics.clone(),
2280 );
2281
2282 let builder = CheckpointBuilder::new(
2283 state.clone(),
2284 checkpoint_store.clone(),
2285 epoch_store.clone(),
2286 notify_builder.clone(),
2287 effects_store,
2288 accumulator,
2289 checkpoint_output,
2290 notify_aggregator.clone(),
2291 highest_currently_built_seq_tx.clone(),
2292 metrics.clone(),
2293 max_transactions_per_checkpoint,
2294 max_checkpoint_size_bytes,
2295 );
2296
2297 let last_signature_index = epoch_store
2298 .get_last_checkpoint_signature_index()
2299 .expect("should not cross end of epoch");
2300 let last_signature_index = Mutex::new(last_signature_index);
2301
2302 Arc::new(Self {
2303 tables: checkpoint_store,
2304 notify_builder,
2305 notify_aggregator,
2306 last_signature_index,
2307 highest_currently_built_seq_tx,
2308 highest_previously_built_seq,
2309 metrics,
2310 state: Mutex::new(CheckpointServiceState::Unstarted((builder, aggregator))),
2311 })
2312 }
2313
2314 pub async fn spawn(&self) -> JoinSet<()> {
2323 let mut tasks = JoinSet::new();
2324
2325 let (builder, aggregator) = self.state.lock().take_unstarted();
2326 tasks.spawn(monitored_future!(builder.run()));
2327 tasks.spawn(monitored_future!(aggregator.run()));
2328
2329 loop {
2330 if tokio::time::timeout(Duration::from_secs(10), self.wait_for_rebuilt_checkpoints())
2331 .await
2332 .is_ok()
2333 {
2334 break;
2335 } else {
2336 debug_fatal!("Still waiting for checkpoints to be rebuilt");
2337 }
2338 }
2339
2340 tasks
2341 }
2342}
2343
2344impl CheckpointService {
2345 pub async fn wait_for_rebuilt_checkpoints(&self) {
2348 let highest_previously_built_seq = self.highest_previously_built_seq;
2349 let mut rx = self.highest_currently_built_seq_tx.subscribe();
2350 loop {
2351 let highest_currently_built_seq = *rx.borrow_and_update();
2352 if highest_currently_built_seq >= highest_previously_built_seq {
2353 break;
2354 }
2355 rx.changed().await.unwrap();
2356 }
2357 }
2358
2359 #[cfg(test)]
2360 fn write_and_notify_checkpoint_for_testing(
2361 &self,
2362 epoch_store: &AuthorityPerEpochStore,
2363 checkpoint: PendingCheckpoint,
2364 ) -> IotaResult {
2365 use crate::authority::authority_per_epoch_store::ConsensusCommitOutput;
2366
2367 let mut output = ConsensusCommitOutput::new();
2368 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2369 let mut batch = epoch_store.db_batch_for_test();
2370 output.write_to_batch(epoch_store, &mut batch)?;
2371 batch.write()?;
2372 self.notify_checkpoint()?;
2373 Ok(())
2374 }
2375}
2376
2377impl CheckpointServiceNotify for CheckpointService {
2378 fn notify_checkpoint_signature(
2379 &self,
2380 epoch_store: &AuthorityPerEpochStore,
2381 info: &CheckpointSignatureMessage,
2382 ) -> IotaResult {
2383 let sequence = info.summary.sequence_number;
2384 let signer = info.summary.auth_sig().authority.concise();
2385
2386 if let Some(highest_verified_checkpoint) = self
2387 .tables
2388 .get_highest_verified_checkpoint()?
2389 .map(|x| *x.sequence_number())
2390 {
2391 if sequence <= highest_verified_checkpoint {
2392 debug!(
2393 checkpoint_seq = sequence,
2394 "Ignore checkpoint signature from {} - already certified", signer,
2395 );
2396 self.metrics
2397 .last_ignored_checkpoint_signature_received
2398 .set(sequence as i64);
2399 return Ok(());
2400 }
2401 }
2402 debug!(
2403 checkpoint_seq = sequence,
2404 "Received checkpoint signature, digest {} from {}",
2405 info.summary.digest(),
2406 signer,
2407 );
2408 self.metrics
2409 .last_received_checkpoint_signatures
2410 .with_label_values(&[&signer.to_string()])
2411 .set(sequence as i64);
2412 let mut index = self.last_signature_index.lock();
2416 *index += 1;
2417 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2418 self.notify_aggregator.notify_one();
2419 Ok(())
2420 }
2421
2422 fn notify_checkpoint(&self) -> IotaResult {
2423 self.notify_builder.notify_one();
2424 Ok(())
2425 }
2426}
2427
2428pub struct CheckpointServiceNoop {}
2430impl CheckpointServiceNotify for CheckpointServiceNoop {
2431 fn notify_checkpoint_signature(
2432 &self,
2433 _: &AuthorityPerEpochStore,
2434 _: &CheckpointSignatureMessage,
2435 ) -> IotaResult {
2436 Ok(())
2437 }
2438
2439 fn notify_checkpoint(&self) -> IotaResult {
2440 Ok(())
2441 }
2442}
2443
2444#[cfg(test)]
2445mod tests {
2446 use std::{
2447 collections::{BTreeMap, HashMap},
2448 ops::Deref,
2449 };
2450
2451 use futures::{FutureExt as _, future::BoxFuture};
2452 use iota_macros::sim_test;
2453 use iota_protocol_config::{Chain, ProtocolConfig};
2454 use iota_types::{
2455 base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2456 crypto::Signature,
2457 digests::TransactionEventsDigest,
2458 effects::{TransactionEffects, TransactionEvents},
2459 messages_checkpoint::SignedCheckpointSummary,
2460 move_package::MovePackage,
2461 object,
2462 transaction::{GenesisObject, VerifiedTransaction},
2463 };
2464 use tokio::sync::mpsc;
2465
2466 use super::*;
2467 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2468
2469 #[sim_test]
2470 pub async fn checkpoint_builder_test() {
2471 telemetry_subscribers::init_for_testing();
2472
2473 let mut protocol_config =
2474 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2475 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2476 let state = TestAuthorityBuilder::new()
2477 .with_protocol_config(protocol_config)
2478 .build()
2479 .await;
2480
2481 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2482 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2483 vec![GenesisObject::RawObject {
2484 data: object::Data::Package(
2485 MovePackage::new(
2486 ObjectID::random(),
2487 SequenceNumber::new(),
2488 BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2489 100_000,
2490 Vec::new(),
2493 BTreeMap::new(),
2496 )
2497 .unwrap(),
2498 ),
2499 owner: object::Owner::Immutable,
2500 }],
2501 vec![],
2502 );
2503 for i in 0..15 {
2504 state
2505 .database_for_testing()
2506 .perpetual_tables
2507 .transactions
2508 .insert(&d(i), dummy_tx.serializable_ref())
2509 .unwrap();
2510 }
2511 for i in 15..20 {
2512 state
2513 .database_for_testing()
2514 .perpetual_tables
2515 .transactions
2516 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2517 .unwrap();
2518 }
2519
2520 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2521 commit_cert_for_test(
2522 &mut store,
2523 state.clone(),
2524 d(1),
2525 vec![d(2), d(3)],
2526 GasCostSummary::new(11, 11, 12, 11, 1),
2527 );
2528 commit_cert_for_test(
2529 &mut store,
2530 state.clone(),
2531 d(2),
2532 vec![d(3), d(4)],
2533 GasCostSummary::new(21, 21, 22, 21, 1),
2534 );
2535 commit_cert_for_test(
2536 &mut store,
2537 state.clone(),
2538 d(3),
2539 vec![],
2540 GasCostSummary::new(31, 31, 32, 31, 1),
2541 );
2542 commit_cert_for_test(
2543 &mut store,
2544 state.clone(),
2545 d(4),
2546 vec![],
2547 GasCostSummary::new(41, 41, 42, 41, 1),
2548 );
2549 for i in [5, 6, 7, 10, 11, 12, 13] {
2550 commit_cert_for_test(
2551 &mut store,
2552 state.clone(),
2553 d(i),
2554 vec![],
2555 GasCostSummary::new(41, 41, 42, 41, 1),
2556 );
2557 }
2558 for i in [15, 16, 17] {
2559 commit_cert_for_test(
2560 &mut store,
2561 state.clone(),
2562 d(i),
2563 vec![],
2564 GasCostSummary::new(51, 51, 52, 51, 1),
2565 );
2566 }
2567 let all_digests: Vec<_> = store.keys().copied().collect();
2568 for digest in all_digests {
2569 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2570 state
2571 .epoch_store_for_testing()
2572 .test_insert_user_signature(digest, vec![signature]);
2573 }
2574
2575 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2576 let (certified_output, mut certified_result) =
2577 mpsc::channel::<CertifiedCheckpointSummary>(10);
2578 let store = Arc::new(store);
2579
2580 let ckpt_dir = tempfile::tempdir().unwrap();
2581 let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2582 let epoch_store = state.epoch_store_for_testing();
2583
2584 let accumulator = Arc::new(StateAccumulator::new_for_tests(
2585 state.get_accumulator_store().clone(),
2586 ));
2587
2588 let checkpoint_service = CheckpointService::build(
2589 state.clone(),
2590 checkpoint_store,
2591 epoch_store.clone(),
2592 store,
2593 Arc::downgrade(&accumulator),
2594 Box::new(output),
2595 Box::new(certified_output),
2596 CheckpointMetrics::new_for_tests(),
2597 3,
2598 100_000,
2599 );
2600 let _tasks = checkpoint_service.spawn().await;
2601
2602 checkpoint_service
2603 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2604 .unwrap();
2605 checkpoint_service
2606 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2607 .unwrap();
2608 checkpoint_service
2609 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2610 .unwrap();
2611 checkpoint_service
2612 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2613 .unwrap();
2614 checkpoint_service
2615 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2616 .unwrap();
2617 checkpoint_service
2618 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2619 .unwrap();
2620
2621 let (c1c, c1s) = result.recv().await.unwrap();
2622 let (c2c, c2s) = result.recv().await.unwrap();
2623
2624 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2625 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2626 assert_eq!(c1t, vec![d(4)]);
2627 assert_eq!(c1s.previous_digest, None);
2628 assert_eq!(c1s.sequence_number, 0);
2629 assert_eq!(
2630 c1s.epoch_rolling_gas_cost_summary,
2631 GasCostSummary::new(41, 41, 42, 41, 1)
2632 );
2633
2634 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2635 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2636 assert_eq!(c2s.sequence_number, 1);
2637 assert_eq!(
2638 c2s.epoch_rolling_gas_cost_summary,
2639 GasCostSummary::new(104, 104, 108, 104, 4)
2640 );
2641
2642 let (c3c, c3s) = result.recv().await.unwrap();
2645 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2646 let (c4c, c4s) = result.recv().await.unwrap();
2647 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2648 assert_eq!(c3s.sequence_number, 2);
2649 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2650 assert_eq!(c4s.sequence_number, 3);
2651 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2652 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2653 assert_eq!(c4t, vec![d(13)]);
2654
2655 let (c5c, c5s) = result.recv().await.unwrap();
2658 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2659 let (c6c, c6s) = result.recv().await.unwrap();
2660 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2661 assert_eq!(c5s.sequence_number, 4);
2662 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2663 assert_eq!(c6s.sequence_number, 5);
2664 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2665 assert_eq!(c5t, vec![d(15), d(16)]);
2666 assert_eq!(c6t, vec![d(17)]);
2667
2668 let (c7c, c7s) = result.recv().await.unwrap();
2671 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2672 assert_eq!(c7t, vec![d(5), d(6)]);
2673 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2674 assert_eq!(c7s.sequence_number, 6);
2675
2676 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2677 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2678
2679 checkpoint_service
2680 .notify_checkpoint_signature(
2681 &epoch_store,
2682 &CheckpointSignatureMessage { summary: c2ss },
2683 )
2684 .unwrap();
2685 checkpoint_service
2686 .notify_checkpoint_signature(
2687 &epoch_store,
2688 &CheckpointSignatureMessage { summary: c1ss },
2689 )
2690 .unwrap();
2691
2692 let c1sc = certified_result.recv().await.unwrap();
2693 let c2sc = certified_result.recv().await.unwrap();
2694 assert_eq!(c1sc.sequence_number, 0);
2695 assert_eq!(c2sc.sequence_number, 1);
2696 }
2697
2698 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2699 fn notify_read_executed_effects(
2700 &self,
2701 digests: &[TransactionDigest],
2702 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2703 std::future::ready(Ok(digests
2704 .iter()
2705 .map(|d| self.get(d).expect("effects not found").clone())
2706 .collect()))
2707 .boxed()
2708 }
2709
2710 fn notify_read_executed_effects_digests(
2711 &self,
2712 digests: &[TransactionDigest],
2713 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2714 std::future::ready(Ok(digests
2715 .iter()
2716 .map(|d| {
2717 self.get(d)
2718 .map(|fx| fx.digest())
2719 .expect("effects not found")
2720 })
2721 .collect()))
2722 .boxed()
2723 }
2724
2725 fn multi_get_executed_effects(
2726 &self,
2727 digests: &[TransactionDigest],
2728 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2729 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2730 }
2731
2732 fn multi_get_transaction_blocks(
2739 &self,
2740 _: &[TransactionDigest],
2741 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2742 unimplemented!()
2743 }
2744
2745 fn multi_get_executed_effects_digests(
2746 &self,
2747 _: &[TransactionDigest],
2748 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2749 unimplemented!()
2750 }
2751
2752 fn multi_get_effects(
2753 &self,
2754 _: &[TransactionEffectsDigest],
2755 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2756 unimplemented!()
2757 }
2758
2759 fn multi_get_events(
2760 &self,
2761 _: &[TransactionEventsDigest],
2762 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2763 unimplemented!()
2764 }
2765 }
2766
2767 #[async_trait::async_trait]
2768 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2769 async fn checkpoint_created(
2770 &self,
2771 summary: &CheckpointSummary,
2772 contents: &CheckpointContents,
2773 _epoch_store: &Arc<AuthorityPerEpochStore>,
2774 _checkpoint_store: &Arc<CheckpointStore>,
2775 ) -> IotaResult {
2776 self.try_send((contents.clone(), summary.clone())).unwrap();
2777 Ok(())
2778 }
2779 }
2780
2781 #[async_trait::async_trait]
2782 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2783 async fn certified_checkpoint_created(
2784 &self,
2785 summary: &CertifiedCheckpointSummary,
2786 ) -> IotaResult {
2787 self.try_send(summary.clone()).unwrap();
2788 Ok(())
2789 }
2790 }
2791
2792 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2793 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2794 roots: t
2795 .into_iter()
2796 .map(|t| TransactionKey::Digest(d(t)))
2797 .collect(),
2798 details: PendingCheckpointInfo {
2799 timestamp_ms,
2800 last_of_epoch: false,
2801 checkpoint_height: i,
2802 },
2803 })
2804 }
2805
2806 fn d(i: u8) -> TransactionDigest {
2807 let mut bytes: [u8; 32] = Default::default();
2808 bytes[0] = i;
2809 TransactionDigest::new(bytes)
2810 }
2811
2812 fn e(
2813 transaction_digest: TransactionDigest,
2814 dependencies: Vec<TransactionDigest>,
2815 gas_used: GasCostSummary,
2816 ) -> TransactionEffects {
2817 let mut effects = TransactionEffects::default();
2818 *effects.transaction_digest_mut_for_testing() = transaction_digest;
2819 *effects.dependencies_mut_for_testing() = dependencies;
2820 *effects.gas_cost_summary_mut_for_testing() = gas_used;
2821 effects
2822 }
2823
2824 fn commit_cert_for_test(
2825 store: &mut HashMap<TransactionDigest, TransactionEffects>,
2826 state: Arc<AuthorityState>,
2827 digest: TransactionDigest,
2828 dependencies: Vec<TransactionDigest>,
2829 gas_used: GasCostSummary,
2830 ) {
2831 let epoch_store = state.epoch_store_for_testing();
2832 let effects = e(digest, dependencies, gas_used);
2833 store.insert(digest, effects.clone());
2834 epoch_store
2835 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2836 .expect("Inserting cert fx and sigs should not fail");
2837 }
2838}