1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::Duration,
17};
18
19use chrono::Utc;
20use diffy::create_patch;
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27 committee::StakeUnit,
28 crypto::AuthorityStrongQuorumSignInfo,
29 digests::{CheckpointContentsDigest, CheckpointDigest},
30 effects::{TransactionEffects, TransactionEffectsAPI},
31 error::{IotaError, IotaResult},
32 event::SystemEpochInfoEvent,
33 executable_transaction::VerifiedExecutableTransaction,
34 gas::GasCostSummary,
35 iota_system_state::{
36 IotaSystemState, IotaSystemStateTrait,
37 epoch_start_iota_system_state::EpochStartSystemStateTrait,
38 },
39 message_envelope::Message,
40 messages_checkpoint::{
41 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
42 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
43 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
44 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
45 VerifiedCheckpointContents,
46 },
47 messages_consensus::ConsensusTransactionKey,
48 signature::GenericSignature,
49 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
50};
51use itertools::Itertools;
52use parking_lot::Mutex;
53use rand::{rngs::OsRng, seq::SliceRandom};
54use serde::{Deserialize, Serialize};
55use tokio::{sync::Notify, task::JoinSet, time::timeout};
56use tracing::{debug, error, info, instrument, warn};
57use typed_store::{
58 DBMapUtils, Map, TypedStoreError,
59 rocks::{DBMap, MetricConf},
60 traits::{TableSummary, TypedStoreDebug},
61};
62
63pub use crate::checkpoints::{
64 checkpoint_output::{
65 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
66 },
67 metrics::CheckpointMetrics,
68};
69use crate::{
70 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
71 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
72 checkpoints::{
73 causal_order::CausalOrder,
74 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
75 },
76 consensus_handler::SequencedConsensusTransactionKey,
77 execution_cache::TransactionCacheRead,
78 stake_aggregator::{InsertResult, MultiStakeAggregator},
79 state_accumulator::StateAccumulator,
80};
81
82pub type CheckpointHeight = u64;
83
84pub struct EpochStats {
85 pub checkpoint_count: u64,
86 pub transaction_count: u64,
87 pub total_gas_reward: u64,
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize)]
91pub struct PendingCheckpointInfo {
92 pub timestamp_ms: CheckpointTimestamp,
93 pub last_of_epoch: bool,
94 pub checkpoint_height: CheckpointHeight,
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize)]
98pub enum PendingCheckpoint {
99 V1(PendingCheckpointContentsV1),
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct PendingCheckpointContentsV1 {
105 pub roots: Vec<TransactionKey>,
106 pub details: PendingCheckpointInfo,
107}
108
109impl PendingCheckpoint {
110 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
111 match self {
112 PendingCheckpoint::V1(contents) => contents,
113 }
114 }
115
116 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
117 match self {
118 PendingCheckpoint::V1(contents) => contents,
119 }
120 }
121
122 pub fn roots(&self) -> &Vec<TransactionKey> {
123 &self.as_v1().roots
124 }
125
126 pub fn details(&self) -> &PendingCheckpointInfo {
127 &self.as_v1().details
128 }
129
130 pub fn height(&self) -> CheckpointHeight {
131 self.details().checkpoint_height
132 }
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136pub struct BuilderCheckpointSummary {
137 pub summary: CheckpointSummary,
138 pub checkpoint_height: Option<CheckpointHeight>,
140 pub position_in_commit: usize,
141}
142
143#[derive(DBMapUtils)]
144pub struct CheckpointStore {
145 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
147
148 pub(crate) checkpoint_sequence_by_contents_digest:
150 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
151
152 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
156
157 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
159 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
161
162 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
166
167 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
170
171 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
174}
175
176impl CheckpointStore {
177 pub fn new(path: &Path) -> Arc<Self> {
178 Arc::new(Self::open_tables_read_write(
179 path.to_path_buf(),
180 MetricConf::new("checkpoint"),
181 None,
182 None,
183 ))
184 }
185
186 pub fn open_readonly(path: &Path) -> CheckpointStoreReadOnly {
187 Self::get_read_only_handle(
188 path.to_path_buf(),
189 None,
190 None,
191 MetricConf::new("checkpoint_readonly"),
192 )
193 }
194
195 #[instrument(level = "info", skip_all)]
196 pub fn insert_genesis_checkpoint(
197 &self,
198 checkpoint: VerifiedCheckpoint,
199 contents: CheckpointContents,
200 epoch_store: &AuthorityPerEpochStore,
201 ) {
202 assert_eq!(
203 checkpoint.epoch(),
204 0,
205 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
206 );
207 assert_eq!(
208 *checkpoint.sequence_number(),
209 0,
210 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
211 );
212
213 if self
216 .get_checkpoint_by_digest(checkpoint.digest())
217 .unwrap()
218 .is_none()
219 {
220 if epoch_store.epoch() == checkpoint.epoch {
221 epoch_store
222 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
223 .unwrap();
224 } else {
225 debug!(
226 validator_epoch =% epoch_store.epoch(),
227 genesis_epoch =% checkpoint.epoch(),
228 "Not inserting checkpoint builder data for genesis checkpoint",
229 );
230 }
231 self.insert_checkpoint_contents(contents).unwrap();
232 self.insert_verified_checkpoint(&checkpoint).unwrap();
233 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
234 }
235 }
236
237 pub fn get_checkpoint_by_digest(
238 &self,
239 digest: &CheckpointDigest,
240 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
241 self.checkpoint_by_digest
242 .get(digest)
243 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
244 }
245
246 pub fn get_checkpoint_by_sequence_number(
247 &self,
248 sequence_number: CheckpointSequenceNumber,
249 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
250 self.certified_checkpoints
251 .get(&sequence_number)
252 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
253 }
254
255 pub fn get_locally_computed_checkpoint(
256 &self,
257 sequence_number: CheckpointSequenceNumber,
258 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
259 self.locally_computed_checkpoints.get(&sequence_number)
260 }
261
262 pub fn get_sequence_number_by_contents_digest(
263 &self,
264 digest: &CheckpointContentsDigest,
265 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
266 self.checkpoint_sequence_by_contents_digest.get(digest)
267 }
268
269 pub fn delete_contents_digest_sequence_number_mapping(
270 &self,
271 digest: &CheckpointContentsDigest,
272 ) -> Result<(), TypedStoreError> {
273 self.checkpoint_sequence_by_contents_digest.remove(digest)
274 }
275
276 pub fn get_latest_certified_checkpoint(&self) -> Option<VerifiedCheckpoint> {
277 self.certified_checkpoints
278 .unbounded_iter()
279 .skip_to_last()
280 .next()
281 .map(|(_, v)| v.into())
282 }
283
284 pub fn get_latest_locally_computed_checkpoint(&self) -> Option<CheckpointSummary> {
285 self.locally_computed_checkpoints
286 .unbounded_iter()
287 .skip_to_last()
288 .next()
289 .map(|(_, v)| v)
290 }
291
292 pub fn multi_get_checkpoint_by_sequence_number(
293 &self,
294 sequence_numbers: &[CheckpointSequenceNumber],
295 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
296 let checkpoints = self
297 .certified_checkpoints
298 .multi_get(sequence_numbers)?
299 .into_iter()
300 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
301 .collect();
302
303 Ok(checkpoints)
304 }
305
306 pub fn multi_get_checkpoint_content(
307 &self,
308 contents_digest: &[CheckpointContentsDigest],
309 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
310 self.checkpoint_content.multi_get(contents_digest)
311 }
312
313 pub fn get_highest_verified_checkpoint(
314 &self,
315 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
316 let highest_verified = if let Some(highest_verified) =
317 self.watermarks.get(&CheckpointWatermark::HighestVerified)?
318 {
319 highest_verified
320 } else {
321 return Ok(None);
322 };
323 self.get_checkpoint_by_digest(&highest_verified.1)
324 }
325
326 pub fn get_highest_synced_checkpoint(
327 &self,
328 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
329 let highest_synced = if let Some(highest_synced) =
330 self.watermarks.get(&CheckpointWatermark::HighestSynced)?
331 {
332 highest_synced
333 } else {
334 return Ok(None);
335 };
336 self.get_checkpoint_by_digest(&highest_synced.1)
337 }
338
339 pub fn get_highest_executed_checkpoint_seq_number(
340 &self,
341 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
342 if let Some(highest_executed) =
343 self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
344 {
345 Ok(Some(highest_executed.0))
346 } else {
347 Ok(None)
348 }
349 }
350
351 pub fn get_highest_executed_checkpoint(
352 &self,
353 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
354 let highest_executed = if let Some(highest_executed) =
355 self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
356 {
357 highest_executed
358 } else {
359 return Ok(None);
360 };
361 self.get_checkpoint_by_digest(&highest_executed.1)
362 }
363
364 pub fn get_highest_pruned_checkpoint_seq_number(
365 &self,
366 ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
367 Ok(self
368 .watermarks
369 .get(&CheckpointWatermark::HighestPruned)?
370 .unwrap_or_default()
371 .0)
372 }
373
374 pub fn get_checkpoint_contents(
375 &self,
376 digest: &CheckpointContentsDigest,
377 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
378 self.checkpoint_content.get(digest)
379 }
380
381 pub fn get_full_checkpoint_contents_by_sequence_number(
382 &self,
383 seq: CheckpointSequenceNumber,
384 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
385 self.full_checkpoint_content.get(&seq)
386 }
387
388 fn prune_local_summaries(&self) -> IotaResult {
389 if let Some((last_local_summary, _)) = self
390 .locally_computed_checkpoints
391 .unbounded_iter()
392 .skip_to_last()
393 .next()
394 {
395 let mut batch = self.locally_computed_checkpoints.batch();
396 batch.schedule_delete_range(
397 &self.locally_computed_checkpoints,
398 &0,
399 &last_local_summary,
400 )?;
401 batch.write()?;
402 info!("Pruned local summaries up to {:?}", last_local_summary);
403 }
404 Ok(())
405 }
406
407 fn check_for_checkpoint_fork(
408 &self,
409 local_checkpoint: &CheckpointSummary,
410 verified_checkpoint: &VerifiedCheckpoint,
411 ) {
412 if local_checkpoint != verified_checkpoint.data() {
413 let verified_contents = self
414 .get_checkpoint_contents(&verified_checkpoint.content_digest)
415 .map(|opt_contents| {
416 opt_contents
417 .map(|contents| format!("{:?}", contents))
418 .unwrap_or_else(|| {
419 format!(
420 "Verified checkpoint contents not found, digest: {:?}",
421 verified_checkpoint.content_digest,
422 )
423 })
424 })
425 .map_err(|e| {
426 format!(
427 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
428 verified_checkpoint.content_digest, e
429 )
430 })
431 .unwrap_or_else(|err_msg| err_msg);
432
433 let local_contents = self
434 .get_checkpoint_contents(&local_checkpoint.content_digest)
435 .map(|opt_contents| {
436 opt_contents
437 .map(|contents| format!("{:?}", contents))
438 .unwrap_or_else(|| {
439 format!(
440 "Local checkpoint contents not found, digest: {:?}",
441 local_checkpoint.content_digest
442 )
443 })
444 })
445 .map_err(|e| {
446 format!(
447 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
448 local_checkpoint.content_digest, e
449 )
450 })
451 .unwrap_or_else(|err_msg| err_msg);
452
453 error!(
455 verified_checkpoint = ?verified_checkpoint.data(),
456 ?verified_contents,
457 ?local_checkpoint,
458 ?local_contents,
459 "Local checkpoint fork detected!",
460 );
461 panic!(
462 "Local checkpoint fork detected for sequence number: {}",
463 local_checkpoint.sequence_number()
464 );
465 }
466 }
467
468 pub fn insert_certified_checkpoint(
474 &self,
475 checkpoint: &VerifiedCheckpoint,
476 ) -> Result<(), TypedStoreError> {
477 debug!(
478 checkpoint_seq = checkpoint.sequence_number(),
479 "Inserting certified checkpoint",
480 );
481 let mut batch = self.certified_checkpoints.batch();
482 batch
483 .insert_batch(
484 &self.certified_checkpoints,
485 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
486 )?
487 .insert_batch(
488 &self.checkpoint_by_digest,
489 [(checkpoint.digest(), checkpoint.serializable_ref())],
490 )?;
491 if checkpoint.next_epoch_committee().is_some() {
492 batch.insert_batch(
493 &self.epoch_last_checkpoint_map,
494 [(&checkpoint.epoch(), checkpoint.sequence_number())],
495 )?;
496 }
497 batch.write()?;
498
499 if let Some(local_checkpoint) = self
500 .locally_computed_checkpoints
501 .get(checkpoint.sequence_number())?
502 {
503 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
504 }
505
506 Ok(())
507 }
508
509 #[instrument(level = "debug", skip_all)]
512 pub fn insert_verified_checkpoint(
513 &self,
514 checkpoint: &VerifiedCheckpoint,
515 ) -> Result<(), TypedStoreError> {
516 self.insert_certified_checkpoint(checkpoint)?;
517 self.update_highest_verified_checkpoint(checkpoint)
518 }
519
520 pub fn update_highest_verified_checkpoint(
521 &self,
522 checkpoint: &VerifiedCheckpoint,
523 ) -> Result<(), TypedStoreError> {
524 if Some(*checkpoint.sequence_number())
525 > self
526 .get_highest_verified_checkpoint()?
527 .map(|x| *x.sequence_number())
528 {
529 debug!(
530 checkpoint_seq = checkpoint.sequence_number(),
531 "Updating highest verified checkpoint",
532 );
533 self.watermarks.insert(
534 &CheckpointWatermark::HighestVerified,
535 &(*checkpoint.sequence_number(), *checkpoint.digest()),
536 )?;
537 }
538
539 Ok(())
540 }
541
542 pub fn update_highest_synced_checkpoint(
543 &self,
544 checkpoint: &VerifiedCheckpoint,
545 ) -> Result<(), TypedStoreError> {
546 debug!(
547 checkpoint_seq = checkpoint.sequence_number(),
548 "Updating highest synced checkpoint",
549 );
550 self.watermarks.insert(
551 &CheckpointWatermark::HighestSynced,
552 &(*checkpoint.sequence_number(), *checkpoint.digest()),
553 )
554 }
555
556 pub fn update_highest_executed_checkpoint(
557 &self,
558 checkpoint: &VerifiedCheckpoint,
559 ) -> Result<(), TypedStoreError> {
560 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
561 if seq_number >= *checkpoint.sequence_number() {
562 return Ok(());
563 }
564 assert_eq!(
565 seq_number + 1,
566 *checkpoint.sequence_number(),
567 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
568 checkpoint.sequence_number(),
569 seq_number
570 );
571 }
572 debug!(
573 checkpoint_seq = checkpoint.sequence_number(),
574 "Updating highest executed checkpoint",
575 );
576 self.watermarks.insert(
577 &CheckpointWatermark::HighestExecuted,
578 &(*checkpoint.sequence_number(), *checkpoint.digest()),
579 )
580 }
581
582 pub fn update_highest_pruned_checkpoint(
583 &self,
584 checkpoint: &VerifiedCheckpoint,
585 ) -> Result<(), TypedStoreError> {
586 self.watermarks.insert(
587 &CheckpointWatermark::HighestPruned,
588 &(*checkpoint.sequence_number(), *checkpoint.digest()),
589 )
590 }
591
592 pub fn set_highest_executed_checkpoint_subtle(
598 &self,
599 checkpoint: &VerifiedCheckpoint,
600 ) -> Result<(), TypedStoreError> {
601 self.watermarks.insert(
602 &CheckpointWatermark::HighestExecuted,
603 &(*checkpoint.sequence_number(), *checkpoint.digest()),
604 )
605 }
606
607 pub fn insert_checkpoint_contents(
608 &self,
609 contents: CheckpointContents,
610 ) -> Result<(), TypedStoreError> {
611 debug!(
612 checkpoint_seq = ?contents.digest(),
613 "Inserting checkpoint contents",
614 );
615 self.checkpoint_content.insert(contents.digest(), &contents)
616 }
617
618 pub fn insert_verified_checkpoint_contents(
619 &self,
620 checkpoint: &VerifiedCheckpoint,
621 full_contents: VerifiedCheckpointContents,
622 ) -> Result<(), TypedStoreError> {
623 let mut batch = self.full_checkpoint_content.batch();
624 batch.insert_batch(
625 &self.checkpoint_sequence_by_contents_digest,
626 [(&checkpoint.content_digest, checkpoint.sequence_number())],
627 )?;
628 let full_contents = full_contents.into_inner();
629 batch.insert_batch(
630 &self.full_checkpoint_content,
631 [(checkpoint.sequence_number(), &full_contents)],
632 )?;
633
634 let contents = full_contents.into_checkpoint_contents();
635 assert_eq!(&checkpoint.content_digest, contents.digest());
636
637 batch.insert_batch(&self.checkpoint_content, [(contents.digest(), &contents)])?;
638
639 batch.write()
640 }
641
642 pub fn delete_full_checkpoint_contents(
643 &self,
644 seq: CheckpointSequenceNumber,
645 ) -> Result<(), TypedStoreError> {
646 self.full_checkpoint_content.remove(&seq)
647 }
648
649 pub fn get_epoch_last_checkpoint(
650 &self,
651 epoch_id: EpochId,
652 ) -> IotaResult<Option<VerifiedCheckpoint>> {
653 let seq = self.epoch_last_checkpoint_map.get(&epoch_id)?;
654 let checkpoint = match seq {
655 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
656 None => None,
657 };
658 Ok(checkpoint)
659 }
660
661 pub fn insert_epoch_last_checkpoint(
662 &self,
663 epoch_id: EpochId,
664 checkpoint: &VerifiedCheckpoint,
665 ) -> IotaResult {
666 self.epoch_last_checkpoint_map
667 .insert(&epoch_id, checkpoint.sequence_number())?;
668 Ok(())
669 }
670
671 pub fn get_epoch_state_commitments(
672 &self,
673 epoch: EpochId,
674 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
675 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
676 checkpoint
677 .end_of_epoch_data
678 .as_ref()
679 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
680 .epoch_commitments
681 .clone()
682 });
683 Ok(commitments)
684 }
685
686 pub fn get_epoch_stats(
689 &self,
690 epoch: EpochId,
691 last_checkpoint: &CheckpointSummary,
692 ) -> Option<EpochStats> {
693 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
694 (0, 0)
695 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
696 (
697 checkpoint.sequence_number + 1,
698 checkpoint.network_total_transactions,
699 )
700 } else {
701 return None;
702 };
703 Some(EpochStats {
704 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
705 transaction_count: last_checkpoint.network_total_transactions
706 - prev_epoch_network_transactions,
707 total_gas_reward: last_checkpoint
708 .epoch_rolling_gas_cost_summary
709 .computation_cost,
710 })
711 }
712
713 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
714 self.checkpoint_content
716 .checkpoint_db(path)
717 .map_err(Into::into)
718 }
719
720 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
721 let mut wb = self.watermarks.batch();
722 wb.delete_batch(
723 &self.watermarks,
724 std::iter::once(CheckpointWatermark::HighestExecuted),
725 )?;
726 wb.write()?;
727 Ok(())
728 }
729
730 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
731 self.delete_highest_executed_checkpoint_test_only()?;
732 self.watermarks.rocksdb.flush()?;
733 Ok(())
734 }
735
736 #[instrument(level = "debug", skip_all)]
741 pub async fn reexecute_local_checkpoints(
742 &self,
743 state: &AuthorityState,
744 epoch_store: &AuthorityPerEpochStore,
745 ) {
746 info!("rexecuting locally computed checkpoints for crash recovery");
747 let epoch = epoch_store.epoch();
748 let highest_executed = self
749 .get_highest_executed_checkpoint_seq_number()
750 .expect("get_highest_executed_checkpoint_seq_number should not fail")
751 .unwrap_or(0);
752
753 let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
754 info!("no locally built checkpoints to verify");
755 return;
756 };
757
758 for seq in highest_executed + 1..=*highest_built.sequence_number() {
759 info!(?seq, "Re-executing locally computed checkpoint");
760 let Some(checkpoint) = self
761 .get_locally_computed_checkpoint(seq)
762 .expect("get_locally_computed_checkpoint should not fail")
763 else {
764 panic!("locally computed checkpoint {:?} not found", seq);
765 };
766
767 let Some(contents) = self
768 .get_checkpoint_contents(&checkpoint.content_digest)
769 .expect("get_checkpoint_contents should not fail")
770 else {
771 panic!(
772 "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
773 seq, checkpoint.content_digest
774 );
775 };
776
777 let cache = state.get_transaction_cache_reader();
778
779 let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
780 let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
781 let txns = cache
782 .multi_get_transaction_blocks(&tx_digests)
783 .expect("multi_get_transaction_blocks should not fail");
784 for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
785 if tx.is_none() {
786 panic!("transaction {:?} not found", digest);
787 }
788 }
789
790 let txns: Vec<_> = txns
791 .into_iter()
792 .map(|tx| tx.unwrap())
793 .zip(fx_digests.into_iter())
794 .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
796 .map(|(tx, fx)| {
797 (
798 VerifiedExecutableTransaction::new_from_checkpoint(
799 (*tx).clone(),
800 epoch,
801 seq,
802 ),
803 fx,
804 )
805 })
806 .collect();
807
808 let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
809
810 info!(
811 ?seq,
812 ?tx_digests,
813 "Re-executing transactions for locally built checkpoint"
814 );
815 state.enqueue_with_expected_effects_digest(txns, epoch_store);
818
819 let waiting_logger = tokio::task::spawn(async move {
823 let mut interval = tokio::time::interval(Duration::from_secs(1));
824 loop {
825 interval.tick().await;
826 warn!(?seq, "Still waiting for re-execution to complete");
827 }
828 });
829
830 cache
831 .notify_read_executed_effects_digests(&tx_digests)
832 .await
833 .expect("notify_read_executed_effects_digests should not fail");
834
835 waiting_logger.abort();
836 waiting_logger.await.ok();
837 info!(?seq, "Re-execution completed for locally built checkpoint");
838 }
839
840 info!("Re-execution of locally built checkpoints completed");
841 }
842}
843
844#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
845pub enum CheckpointWatermark {
846 HighestVerified,
847 HighestSynced,
848 HighestExecuted,
849 HighestPruned,
850}
851
852pub struct CheckpointBuilder {
853 state: Arc<AuthorityState>,
854 tables: Arc<CheckpointStore>,
855 epoch_store: Arc<AuthorityPerEpochStore>,
856 notify: Arc<Notify>,
857 notify_aggregator: Arc<Notify>,
858 effects_store: Arc<dyn TransactionCacheRead>,
859 accumulator: Weak<StateAccumulator>,
860 output: Box<dyn CheckpointOutput>,
861 metrics: Arc<CheckpointMetrics>,
862 max_transactions_per_checkpoint: usize,
863 max_checkpoint_size_bytes: usize,
864}
865
866pub struct CheckpointAggregator {
867 tables: Arc<CheckpointStore>,
868 epoch_store: Arc<AuthorityPerEpochStore>,
869 notify: Arc<Notify>,
870 current: Option<CheckpointSignatureAggregator>,
871 output: Box<dyn CertifiedCheckpointOutput>,
872 state: Arc<AuthorityState>,
873 metrics: Arc<CheckpointMetrics>,
874}
875
876pub struct CheckpointSignatureAggregator {
878 next_index: u64,
879 summary: CheckpointSummary,
880 digest: CheckpointDigest,
881 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
883 tables: Arc<CheckpointStore>,
884 state: Arc<AuthorityState>,
885 metrics: Arc<CheckpointMetrics>,
886}
887
888impl CheckpointBuilder {
889 fn new(
890 state: Arc<AuthorityState>,
891 tables: Arc<CheckpointStore>,
892 epoch_store: Arc<AuthorityPerEpochStore>,
893 notify: Arc<Notify>,
894 effects_store: Arc<dyn TransactionCacheRead>,
895 accumulator: Weak<StateAccumulator>,
896 output: Box<dyn CheckpointOutput>,
897 notify_aggregator: Arc<Notify>,
898 metrics: Arc<CheckpointMetrics>,
899 max_transactions_per_checkpoint: usize,
900 max_checkpoint_size_bytes: usize,
901 ) -> Self {
902 Self {
903 state,
904 tables,
905 epoch_store,
906 notify,
907 effects_store,
908 accumulator,
909 output,
910 notify_aggregator,
911 metrics,
912 max_transactions_per_checkpoint,
913 max_checkpoint_size_bytes,
914 }
915 }
916
917 async fn run(mut self) {
920 info!("Starting CheckpointBuilder");
921 loop {
922 self.maybe_build_checkpoints().await;
923
924 self.notify.notified().await;
925 }
926 }
927
928 async fn maybe_build_checkpoints(&mut self) {
929 let _scope = monitored_scope("BuildCheckpoints");
930
931 let summary = self
933 .epoch_store
934 .last_built_checkpoint_builder_summary()
935 .expect("epoch should not have ended");
936 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
937 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
938
939 let min_checkpoint_interval_ms = self
940 .epoch_store
941 .protocol_config()
942 .min_checkpoint_interval_ms_as_option()
943 .unwrap_or_default();
944 let mut grouped_pending_checkpoints = Vec::new();
945 let mut checkpoints_iter = self
946 .epoch_store
947 .get_pending_checkpoints(last_height)
948 .expect("unexpected epoch store error")
949 .into_iter()
950 .peekable();
951 while let Some((height, pending)) = checkpoints_iter.next() {
952 let current_timestamp = pending.details().timestamp_ms;
955 let can_build = match last_timestamp {
956 Some(last_timestamp) => {
957 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
958 }
959 None => true,
960 } || checkpoints_iter
963 .peek()
964 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
965 || pending.details().last_of_epoch;
967 grouped_pending_checkpoints.push(pending);
968 if !can_build {
969 debug!(
970 checkpoint_commit_height = height,
971 ?last_timestamp,
972 ?current_timestamp,
973 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
974 );
975 continue;
976 }
977
978 last_height = Some(height);
980 last_timestamp = Some(current_timestamp);
981 debug!(
982 checkpoint_commit_height = height,
983 "Making checkpoint at commit height"
984 );
985 if let Err(e) = self
986 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
987 .await
988 {
989 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
990 tokio::time::sleep(Duration::from_secs(1)).await;
991 self.metrics.checkpoint_errors.inc();
992 return;
993 }
994 }
995 debug!(
996 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
997 grouped_pending_checkpoints.len(),
998 );
999 }
1000
1001 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1002 async fn make_checkpoint(&self, pendings: Vec<PendingCheckpoint>) -> anyhow::Result<()> {
1003 let last_details = pendings.last().unwrap().details().clone();
1004
1005 let mut effects_in_current_checkpoint = BTreeSet::new();
1011
1012 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1015 for pending_checkpoint in pendings.into_iter() {
1016 let pending = pending_checkpoint.into_v1();
1017 let txn_in_checkpoint = self
1018 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1019 .await?;
1020 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1021 }
1022 let new_checkpoint = self
1023 .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1024 .await?;
1025 self.write_checkpoints(last_details.checkpoint_height, new_checkpoint)
1026 .await?;
1027 Ok(())
1028 }
1029
1030 #[instrument(level = "debug", skip_all)]
1035 async fn resolve_checkpoint_transactions(
1036 &self,
1037 roots: Vec<TransactionKey>,
1038 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1039 ) -> IotaResult<Vec<TransactionEffects>> {
1040 self.metrics
1041 .checkpoint_roots_count
1042 .inc_by(roots.len() as u64);
1043
1044 let root_digests = self
1045 .epoch_store
1046 .notify_read_executed_digests(&roots)
1047 .in_monitored_scope("CheckpointNotifyDigests")
1048 .await?;
1049 let root_effects = self
1050 .effects_store
1051 .notify_read_executed_effects(&root_digests)
1052 .in_monitored_scope("CheckpointNotifyRead")
1053 .await?;
1054
1055 let _scope = monitored_scope("CheckpointBuilder");
1056
1057 let consensus_commit_prologue = {
1058 let consensus_commit_prologue = self
1062 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1063 .await?;
1064
1065 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1069 let unsorted_ccp = self.complete_checkpoint_effects(
1070 vec![ccp_effects.clone()],
1071 effects_in_current_checkpoint,
1072 )?;
1073
1074 assert_eq!(unsorted_ccp.len(), 1);
1077 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1078 }
1079 consensus_commit_prologue
1080 };
1081
1082 let unsorted =
1083 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1084
1085 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1086 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1087 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1088 #[cfg(debug_assertions)]
1089 {
1090 for tx in unsorted.iter() {
1093 assert!(tx.transaction_digest() != &_ccp_digest);
1094 }
1095 }
1096 sorted.push(ccp_effects);
1097 }
1098 sorted.extend(CausalOrder::causal_sort(unsorted));
1099
1100 #[cfg(msim)]
1101 {
1102 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1104 }
1105
1106 Ok(sorted)
1107 }
1108
1109 async fn extract_consensus_commit_prologue(
1114 &self,
1115 root_digests: &[TransactionDigest],
1116 root_effects: &[TransactionEffects],
1117 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1118 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1119 if root_digests.is_empty() {
1120 return Ok(None);
1121 }
1122
1123 let first_tx = self
1128 .state
1129 .get_transaction_cache_reader()
1130 .get_transaction_block(&root_digests[0])?
1131 .expect("Transaction block must exist");
1132
1133 Ok(match first_tx.transaction_data().kind() {
1134 TransactionKind::ConsensusCommitPrologueV1(_) => {
1135 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1136 Some((*first_tx.digest(), root_effects[0].clone()))
1137 }
1138 _ => None,
1139 })
1140 }
1141
1142 #[instrument(level = "debug", skip_all)]
1144 async fn write_checkpoints(
1145 &self,
1146 height: CheckpointHeight,
1147 new_checkpoints: Vec<(CheckpointSummary, CheckpointContents)>,
1148 ) -> IotaResult {
1149 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1150 let mut batch = self.tables.checkpoint_content.batch();
1151 let mut all_tx_digests =
1152 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1153
1154 for (summary, contents) in &new_checkpoints {
1155 debug!(
1156 checkpoint_commit_height = height,
1157 checkpoint_seq = summary.sequence_number,
1158 contents_digest = ?contents.digest(),
1159 "writing checkpoint",
1160 );
1161 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1162
1163 self.output
1164 .checkpoint_created(summary, contents, &self.epoch_store, &self.tables)
1165 .await?;
1166
1167 self.metrics
1168 .transactions_included_in_checkpoint
1169 .inc_by(contents.size() as u64);
1170 let sequence_number = summary.sequence_number;
1171 self.metrics
1172 .last_constructed_checkpoint
1173 .set(sequence_number as i64);
1174
1175 batch.insert_batch(
1176 &self.tables.checkpoint_content,
1177 [(contents.digest(), contents)],
1178 )?;
1179
1180 batch.insert_batch(
1181 &self.tables.locally_computed_checkpoints,
1182 [(sequence_number, summary)],
1183 )?;
1184 }
1185
1186 self.state
1198 .get_cache_commit()
1199 .persist_transactions(&all_tx_digests)
1200 .await?;
1201
1202 batch.write()?;
1203
1204 for (local_checkpoint, _) in &new_checkpoints {
1205 if let Some(certified_checkpoint) = self
1206 .tables
1207 .certified_checkpoints
1208 .get(local_checkpoint.sequence_number())?
1209 {
1210 self.tables
1211 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1212 }
1213 }
1214
1215 self.notify_aggregator.notify_one();
1216 self.epoch_store
1217 .process_pending_checkpoint(height, new_checkpoints)?;
1218 Ok(())
1219 }
1220
1221 #[expect(clippy::type_complexity)]
1222 fn split_checkpoint_chunks(
1223 &self,
1224 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1225 signatures: Vec<Vec<GenericSignature>>,
1226 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1227 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1228 let mut chunks = Vec::new();
1229 let mut chunk = Vec::new();
1230 let mut chunk_size: usize = 0;
1231 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1232 .into_iter()
1233 .zip(signatures.into_iter())
1234 {
1235 let size = transaction_size
1240 + bcs::serialized_size(&effects)?
1241 + bcs::serialized_size(&signatures)?;
1242 if chunk.len() == self.max_transactions_per_checkpoint
1243 || (chunk_size + size) > self.max_checkpoint_size_bytes
1244 {
1245 if chunk.is_empty() {
1246 warn!(
1248 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1249 self.max_checkpoint_size_bytes
1250 );
1251 } else {
1252 chunks.push(chunk);
1253 chunk = Vec::new();
1254 chunk_size = 0;
1255 }
1256 }
1257
1258 chunk.push((effects, signatures));
1259 chunk_size += size;
1260 }
1261
1262 if !chunk.is_empty() || chunks.is_empty() {
1263 chunks.push(chunk);
1268 }
1274 Ok(chunks)
1275 }
1276
1277 #[instrument(level = "debug", skip_all)]
1280 async fn create_checkpoints(
1281 &self,
1282 all_effects: Vec<TransactionEffects>,
1283 details: &PendingCheckpointInfo,
1284 ) -> anyhow::Result<Vec<(CheckpointSummary, CheckpointContents)>> {
1285 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1286 let total = all_effects.len();
1287 let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
1288 if last_checkpoint.is_none() {
1289 let epoch = self.epoch_store.epoch();
1290 if epoch > 0 {
1291 let previous_epoch = epoch - 1;
1292 let last_verified = self.tables.get_epoch_last_checkpoint(previous_epoch)?;
1293 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1294 if let Some((ref seq, _)) = last_checkpoint {
1295 debug!(
1296 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1297 );
1298 } else {
1299 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1302 }
1303 }
1304 }
1305 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1306 debug!(
1307 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1308 checkpoint_timestamp = details.timestamp_ms,
1309 "Creating checkpoint(s) for {} transactions",
1310 all_effects.len(),
1311 );
1312
1313 let all_digests: Vec<_> = all_effects
1314 .iter()
1315 .map(|effect| *effect.transaction_digest())
1316 .collect();
1317 let transactions_and_sizes = self
1318 .state
1319 .get_transaction_cache_reader()
1320 .get_transactions_and_serialized_sizes(&all_digests)?;
1321 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1322 let mut transactions = Vec::with_capacity(all_effects.len());
1323 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1324 let mut randomness_rounds = BTreeMap::new();
1325 {
1326 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1327 debug!(
1328 ?last_checkpoint_seq,
1329 "Waiting for {:?} certificates to appear in consensus",
1330 all_effects.len()
1331 );
1332
1333 for (effects, transaction_and_size) in all_effects
1334 .into_iter()
1335 .zip(transactions_and_sizes.into_iter())
1336 {
1337 let (transaction, size) = transaction_and_size
1338 .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1339 match transaction.inner().transaction_data().kind() {
1340 TransactionKind::ConsensusCommitPrologueV1(_)
1341 | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1342 }
1347 TransactionKind::RandomnessStateUpdate(rsu) => {
1348 randomness_rounds
1349 .insert(*effects.transaction_digest(), rsu.randomness_round);
1350 }
1351 _ => {
1352 transaction_keys.push(SequencedConsensusTransactionKey::External(
1355 ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1356 ));
1357 }
1358 }
1359 transactions.push(transaction);
1360 all_effects_and_transaction_sizes.push((effects, size));
1361 }
1362
1363 self.epoch_store
1364 .consensus_messages_processed_notify(transaction_keys)
1365 .await?;
1366 }
1367
1368 let signatures = self
1369 .epoch_store
1370 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1371 debug!(
1372 ?last_checkpoint_seq,
1373 "Received {} checkpoint user signatures from consensus",
1374 signatures.len()
1375 );
1376
1377 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1378 let chunks_count = chunks.len();
1379
1380 let mut checkpoints = Vec::with_capacity(chunks_count);
1381 debug!(
1382 ?last_checkpoint_seq,
1383 "Creating {} checkpoints with {} transactions", chunks_count, total,
1384 );
1385
1386 let epoch = self.epoch_store.epoch();
1387 for (index, transactions) in chunks.into_iter().enumerate() {
1388 let first_checkpoint_of_epoch = index == 0
1389 && last_checkpoint
1390 .as_ref()
1391 .map(|(_, c)| c.epoch != epoch)
1392 .unwrap_or(true);
1393 if first_checkpoint_of_epoch {
1394 self.epoch_store
1395 .record_epoch_first_checkpoint_creation_time_metric();
1396 }
1397 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1398
1399 let sequence_number = last_checkpoint
1400 .as_ref()
1401 .map(|(_, c)| c.sequence_number + 1)
1402 .unwrap_or_default();
1403 let timestamp_ms = details.timestamp_ms;
1404 if let Some((_, last_checkpoint)) = &last_checkpoint {
1405 if last_checkpoint.timestamp_ms > timestamp_ms {
1406 error!(
1407 "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1408 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1409 );
1410 }
1411 }
1412
1413 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1414 let epoch_rolling_gas_cost_summary =
1415 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1416
1417 let end_of_epoch_data = if last_checkpoint_of_epoch {
1418 let (system_state_obj, system_epoch_info_event) = self
1419 .augment_epoch_last_checkpoint(
1420 &epoch_rolling_gas_cost_summary,
1421 timestamp_ms,
1422 &mut effects,
1423 &mut signatures,
1424 sequence_number,
1425 )
1426 .await?;
1427
1428 let epoch_supply_change =
1436 system_epoch_info_event.map_or(0, |event| event.supply_change());
1437
1438 let committee = system_state_obj
1439 .get_current_epoch_committee()
1440 .committee()
1441 .clone();
1442
1443 let root_state_digest = {
1446 let state_acc = self
1447 .accumulator
1448 .upgrade()
1449 .expect("No checkpoints should be getting built after local configuration");
1450 let acc = state_acc.accumulate_checkpoint(
1451 effects.clone(),
1452 sequence_number,
1453 &self.epoch_store,
1454 )?;
1455 state_acc
1456 .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
1457 .await?;
1458 state_acc
1459 .digest_epoch(self.epoch_store.clone(), sequence_number)
1460 .await?
1461 };
1462 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1463 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1464
1465 let epoch_commitments = vec![root_state_digest.into()];
1466
1467 Some(EndOfEpochData {
1468 next_epoch_committee: committee.voting_rights,
1469 next_epoch_protocol_version: ProtocolVersion::new(
1470 system_state_obj.protocol_version(),
1471 ),
1472 epoch_commitments,
1473 epoch_supply_change,
1474 })
1475 } else {
1476 None
1477 };
1478 let contents = CheckpointContents::new_with_digests_and_signatures(
1479 effects.iter().map(TransactionEffects::execution_digests),
1480 signatures,
1481 );
1482
1483 let num_txns = contents.size() as u64;
1484
1485 let network_total_transactions = last_checkpoint
1486 .as_ref()
1487 .map(|(_, c)| c.network_total_transactions + num_txns)
1488 .unwrap_or(num_txns);
1489
1490 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1491
1492 let matching_randomness_rounds: Vec<_> = effects
1493 .iter()
1494 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1495 .copied()
1496 .collect();
1497
1498 let summary = CheckpointSummary::new(
1499 self.epoch_store.protocol_config(),
1500 epoch,
1501 sequence_number,
1502 network_total_transactions,
1503 &contents,
1504 previous_digest,
1505 epoch_rolling_gas_cost_summary,
1506 end_of_epoch_data,
1507 timestamp_ms,
1508 matching_randomness_rounds,
1509 );
1510 summary.report_checkpoint_age_ms(&self.metrics.last_created_checkpoint_age_ms);
1511 if last_checkpoint_of_epoch {
1512 info!(
1513 checkpoint_seq = sequence_number,
1514 "creating last checkpoint of epoch {}", epoch
1515 );
1516 if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
1517 self.epoch_store
1518 .report_epoch_metrics_at_last_checkpoint(stats);
1519 }
1520 }
1521 last_checkpoint = Some((sequence_number, summary.clone()));
1522 checkpoints.push((summary, contents));
1523 }
1524
1525 Ok(checkpoints)
1526 }
1527
1528 fn get_epoch_total_gas_cost(
1529 &self,
1530 last_checkpoint: Option<&CheckpointSummary>,
1531 cur_checkpoint_effects: &[TransactionEffects],
1532 ) -> GasCostSummary {
1533 let (previous_epoch, previous_gas_costs) = last_checkpoint
1534 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1535 .unwrap_or_default();
1536 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1537 if previous_epoch == self.epoch_store.epoch() {
1538 GasCostSummary::new(
1540 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1541 previous_gas_costs.computation_cost_burned
1542 + current_gas_costs.computation_cost_burned,
1543 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1544 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1545 previous_gas_costs.non_refundable_storage_fee
1546 + current_gas_costs.non_refundable_storage_fee,
1547 )
1548 } else {
1549 current_gas_costs
1550 }
1551 }
1552
1553 #[instrument(level = "error", skip_all)]
1556 async fn augment_epoch_last_checkpoint(
1557 &self,
1558 epoch_total_gas_cost: &GasCostSummary,
1559 epoch_start_timestamp_ms: CheckpointTimestamp,
1560 checkpoint_effects: &mut Vec<TransactionEffects>,
1561 signatures: &mut Vec<Vec<GenericSignature>>,
1562 checkpoint: CheckpointSequenceNumber,
1563 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1564 let (system_state, system_epoch_info_event, effects) = self
1565 .state
1566 .create_and_execute_advance_epoch_tx(
1567 &self.epoch_store,
1568 epoch_total_gas_cost,
1569 checkpoint,
1570 epoch_start_timestamp_ms,
1571 )
1572 .await?;
1573 checkpoint_effects.push(effects);
1574 signatures.push(vec![]);
1575 Ok((system_state, system_epoch_info_event))
1576 }
1577
1578 #[instrument(level = "debug", skip_all)]
1587 fn complete_checkpoint_effects(
1588 &self,
1589 mut roots: Vec<TransactionEffects>,
1590 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1591 ) -> IotaResult<Vec<TransactionEffects>> {
1592 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1593 let mut results = vec![];
1594 let mut seen = HashSet::new();
1595 loop {
1596 let mut pending = HashSet::new();
1597
1598 let transactions_included = self
1599 .epoch_store
1600 .builder_included_transactions_in_checkpoint(
1601 roots.iter().map(|e| e.transaction_digest()),
1602 )?;
1603
1604 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1605 let digest = effect.transaction_digest();
1606 seen.insert(*digest);
1609
1610 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1612 continue;
1613 }
1614
1615 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1617 continue;
1618 }
1619
1620 let existing_effects = self
1621 .epoch_store
1622 .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1623
1624 for (dependency, effects_signature_exists) in
1625 effect.dependencies().iter().zip(existing_effects.iter())
1626 {
1627 if !effects_signature_exists {
1632 continue;
1633 }
1634 if seen.insert(*dependency) {
1635 pending.insert(*dependency);
1636 }
1637 }
1638 results.push(effect);
1639 }
1640 if pending.is_empty() {
1641 break;
1642 }
1643 let pending = pending.into_iter().collect::<Vec<_>>();
1644 let effects = self.effects_store.multi_get_executed_effects(&pending)?;
1645 let effects = effects
1646 .into_iter()
1647 .zip(pending)
1648 .map(|(opt, digest)| match opt {
1649 Some(x) => x,
1650 None => panic!(
1651 "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
1652 digest
1653 ),
1654 })
1655 .collect::<Vec<_>>();
1656 roots = effects;
1657 }
1658
1659 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1660 Ok(results)
1661 }
1662
1663 #[cfg(msim)]
1666 fn expensive_consensus_commit_prologue_invariants_check(
1667 &self,
1668 root_digests: &[TransactionDigest],
1669 sorted: &[TransactionEffects],
1670 ) {
1671 let root_txs = self
1673 .state
1674 .get_transaction_cache_reader()
1675 .multi_get_transaction_blocks(root_digests)
1676 .unwrap();
1677 let ccps = root_txs
1678 .iter()
1679 .filter_map(|tx| {
1680 tx.as_ref().filter(|tx| {
1681 matches!(
1682 tx.transaction_data().kind(),
1683 TransactionKind::ConsensusCommitPrologueV1(_)
1684 )
1685 })
1686 })
1687 .collect::<Vec<_>>();
1688
1689 assert!(ccps.len() <= 1);
1692
1693 let txs = self
1695 .state
1696 .get_transaction_cache_reader()
1697 .multi_get_transaction_blocks(
1698 &sorted
1699 .iter()
1700 .map(|tx| *tx.transaction_digest())
1701 .collect::<Vec<_>>(),
1702 )
1703 .unwrap();
1704
1705 if ccps.is_empty() {
1706 for tx in txs.iter().flatten() {
1710 assert!(!matches!(
1711 tx.transaction_data().kind(),
1712 TransactionKind::ConsensusCommitPrologueV1(_)
1713 ));
1714 }
1715 } else {
1716 assert!(matches!(
1719 txs[0].as_ref().unwrap().transaction_data().kind(),
1720 TransactionKind::ConsensusCommitPrologueV1(_)
1721 ));
1722
1723 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1724
1725 for tx in txs.iter().skip(1).flatten() {
1726 assert!(!matches!(
1727 tx.transaction_data().kind(),
1728 TransactionKind::ConsensusCommitPrologueV1(_)
1729 ));
1730 }
1731 }
1732 }
1733}
1734
1735impl CheckpointAggregator {
1736 fn new(
1737 tables: Arc<CheckpointStore>,
1738 epoch_store: Arc<AuthorityPerEpochStore>,
1739 notify: Arc<Notify>,
1740 output: Box<dyn CertifiedCheckpointOutput>,
1741 state: Arc<AuthorityState>,
1742 metrics: Arc<CheckpointMetrics>,
1743 ) -> Self {
1744 let current = None;
1745 Self {
1746 tables,
1747 epoch_store,
1748 notify,
1749 current,
1750 output,
1751 state,
1752 metrics,
1753 }
1754 }
1755
1756 async fn run(mut self) {
1762 info!("Starting CheckpointAggregator");
1763 loop {
1764 if let Err(e) = self.run_and_notify().await {
1765 error!(
1766 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1767 e
1768 );
1769 self.metrics.checkpoint_errors.inc();
1770 tokio::time::sleep(Duration::from_secs(1)).await;
1771 continue;
1772 }
1773
1774 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1775 }
1776 }
1777
1778 async fn run_and_notify(&mut self) -> IotaResult {
1779 let summaries = self.run_inner()?;
1780 for summary in summaries {
1781 self.output.certified_checkpoint_created(&summary).await?;
1782 }
1783 Ok(())
1784 }
1785
1786 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1787 let _scope = monitored_scope("CheckpointAggregator");
1788 let mut result = vec![];
1789 'outer: loop {
1790 let next_to_certify = self.next_checkpoint_to_certify();
1791 let current = if let Some(current) = &mut self.current {
1792 if current.summary.sequence_number < next_to_certify {
1798 self.current = None;
1799 continue;
1800 }
1801 current
1802 } else {
1803 let Some(summary) = self
1804 .epoch_store
1805 .get_built_checkpoint_summary(next_to_certify)?
1806 else {
1807 return Ok(result);
1808 };
1809 self.current = Some(CheckpointSignatureAggregator {
1810 next_index: 0,
1811 digest: summary.digest(),
1812 summary,
1813 signatures_by_digest: MultiStakeAggregator::new(
1814 self.epoch_store.committee().clone(),
1815 ),
1816 tables: self.tables.clone(),
1817 state: self.state.clone(),
1818 metrics: self.metrics.clone(),
1819 });
1820 self.current.as_mut().unwrap()
1821 };
1822
1823 let epoch_tables = self
1824 .epoch_store
1825 .tables()
1826 .expect("should not run past end of epoch");
1827 let iter = epoch_tables.get_pending_checkpoint_signatures_iter(
1828 current.summary.sequence_number,
1829 current.next_index,
1830 )?;
1831 for ((seq, index), data) in iter {
1832 if seq != current.summary.sequence_number {
1833 debug!(
1834 checkpoint_seq =? current.summary.sequence_number,
1835 "Not enough checkpoint signatures",
1836 );
1837 return Ok(result);
1839 }
1840 debug!(
1841 checkpoint_seq = current.summary.sequence_number,
1842 "Processing signature for checkpoint (digest: {:?}) from {:?}",
1843 current.summary.digest(),
1844 data.summary.auth_sig().authority.concise()
1845 );
1846 self.metrics
1847 .checkpoint_participation
1848 .with_label_values(&[&format!(
1849 "{:?}",
1850 data.summary.auth_sig().authority.concise()
1851 )])
1852 .inc();
1853 if let Ok(auth_signature) = current.try_aggregate(data) {
1854 let summary = VerifiedCheckpoint::new_unchecked(
1855 CertifiedCheckpointSummary::new_from_data_and_sig(
1856 current.summary.clone(),
1857 auth_signature,
1858 ),
1859 );
1860
1861 self.tables.insert_certified_checkpoint(&summary)?;
1862 self.metrics
1863 .last_certified_checkpoint
1864 .set(current.summary.sequence_number as i64);
1865 current
1866 .summary
1867 .report_checkpoint_age_ms(&self.metrics.last_certified_checkpoint_age_ms);
1868 result.push(summary.into_inner());
1869 self.current = None;
1870 continue 'outer;
1871 } else {
1872 current.next_index = index + 1;
1873 }
1874 }
1875 break;
1876 }
1877 Ok(result)
1878 }
1879
1880 fn next_checkpoint_to_certify(&self) -> CheckpointSequenceNumber {
1881 self.tables
1882 .certified_checkpoints
1883 .unbounded_iter()
1884 .skip_to_last()
1885 .next()
1886 .map(|(seq, _)| seq + 1)
1887 .unwrap_or_default()
1888 }
1889}
1890
1891impl CheckpointSignatureAggregator {
1892 #[expect(clippy::result_unit_err)]
1893 pub fn try_aggregate(
1894 &mut self,
1895 data: CheckpointSignatureMessage,
1896 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
1897 let their_digest = *data.summary.digest();
1898 let (_, signature) = data.summary.into_data_and_sig();
1899 let author = signature.authority;
1900 let envelope =
1901 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
1902 match self.signatures_by_digest.insert(their_digest, envelope) {
1903 InsertResult::Failed {
1905 error:
1906 IotaError::StakeAggregatorRepeatedSigner {
1907 conflicting_sig: false,
1908 ..
1909 },
1910 } => Err(()),
1911 InsertResult::Failed { error } => {
1912 warn!(
1913 checkpoint_seq = self.summary.sequence_number,
1914 "Failed to aggregate new signature from validator {:?}: {:?}",
1915 author.concise(),
1916 error
1917 );
1918 self.check_for_split_brain();
1919 Err(())
1920 }
1921 InsertResult::QuorumReached(cert) => {
1922 if their_digest != self.digest {
1926 self.metrics.remote_checkpoint_forks.inc();
1927 warn!(
1928 checkpoint_seq = self.summary.sequence_number,
1929 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
1930 author.concise(),
1931 their_digest,
1932 self.digest
1933 );
1934 return Err(());
1935 }
1936 Ok(cert)
1937 }
1938 InsertResult::NotEnoughVotes {
1939 bad_votes: _,
1940 bad_authorities: _,
1941 } => {
1942 self.check_for_split_brain();
1943 Err(())
1944 }
1945 }
1946 }
1947
1948 fn check_for_split_brain(&self) {
1953 debug!(
1954 checkpoint_seq = self.summary.sequence_number,
1955 "Checking for split brain condition"
1956 );
1957 if self.signatures_by_digest.quorum_unreachable() {
1958 let digests_by_stake_messages = self
1964 .signatures_by_digest
1965 .get_all_unique_values()
1966 .into_iter()
1967 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
1968 .map(|(digest, (_authorities, total_stake))| {
1969 format!("{:?} (total stake: {})", digest, total_stake)
1970 })
1971 .collect::<Vec<String>>();
1972 error!(
1973 checkpoint_seq = self.summary.sequence_number,
1974 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
1975 self.signatures_by_digest.uncommitted_stake(),
1976 digests_by_stake_messages,
1977 );
1978 self.metrics.split_brain_checkpoint_forks.inc();
1979
1980 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
1981 let local_summary = self.summary.clone();
1982 let state = self.state.clone();
1983 let tables = self.tables.clone();
1984
1985 tokio::spawn(async move {
1986 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
1987 });
1988 }
1989 }
1990}
1991
1992async fn diagnose_split_brain(
1998 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
1999 local_summary: CheckpointSummary,
2000 state: Arc<AuthorityState>,
2001 tables: Arc<CheckpointStore>,
2002) {
2003 debug!(
2004 checkpoint_seq = local_summary.sequence_number,
2005 "Running split brain diagnostics..."
2006 );
2007 let time = Utc::now();
2008 let digest_to_validator = all_unique_values
2010 .iter()
2011 .filter_map(|(digest, (validators, _))| {
2012 if *digest != local_summary.digest() {
2013 let random_validator = validators.choose(&mut OsRng).unwrap();
2014 Some((*digest, *random_validator))
2015 } else {
2016 None
2017 }
2018 })
2019 .collect::<HashMap<_, _>>();
2020 if digest_to_validator.is_empty() {
2021 panic!(
2022 "Given split brain condition, there should be at \
2023 least one validator that disagrees with local signature"
2024 );
2025 }
2026
2027 let epoch_store = state.load_epoch_store_one_call_per_task();
2028 let committee = epoch_store
2029 .epoch_start_state()
2030 .get_iota_committee_with_network_metadata();
2031 let network_config = default_iota_network_config();
2032 let network_clients =
2033 make_network_authority_clients_with_network_config(&committee, &network_config);
2034
2035 let response_futures = digest_to_validator
2037 .values()
2038 .cloned()
2039 .map(|validator| {
2040 let client = network_clients
2041 .get(&validator)
2042 .expect("Failed to get network client");
2043 let request = CheckpointRequest {
2044 sequence_number: Some(local_summary.sequence_number),
2045 request_content: true,
2046 certified: false,
2047 };
2048 client.handle_checkpoint(request)
2049 })
2050 .collect::<Vec<_>>();
2051
2052 let digest_name_pair = digest_to_validator.iter();
2053 let response_data = futures::future::join_all(response_futures)
2054 .await
2055 .into_iter()
2056 .zip(digest_name_pair)
2057 .filter_map(|(response, (digest, name))| match response {
2058 Ok(response) => match response {
2059 CheckpointResponse {
2060 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2061 contents: Some(contents),
2062 } => Some((*name, *digest, summary, contents)),
2063 CheckpointResponse {
2064 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2065 contents: _,
2066 } => {
2067 panic!("Expected pending checkpoint, but got certified checkpoint");
2068 }
2069 CheckpointResponse {
2070 checkpoint: None,
2071 contents: _,
2072 } => {
2073 error!(
2074 "Summary for checkpoint {:?} not found on validator {:?}",
2075 local_summary.sequence_number, name
2076 );
2077 None
2078 }
2079 CheckpointResponse {
2080 checkpoint: _,
2081 contents: None,
2082 } => {
2083 error!(
2084 "Contents for checkpoint {:?} not found on validator {:?}",
2085 local_summary.sequence_number, name
2086 );
2087 None
2088 }
2089 },
2090 Err(e) => {
2091 error!(
2092 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2093 e
2094 );
2095 None
2096 }
2097 })
2098 .collect::<Vec<_>>();
2099
2100 let local_checkpoint_contents = tables
2101 .get_checkpoint_contents(&local_summary.content_digest)
2102 .unwrap_or_else(|_| {
2103 panic!(
2104 "Could not find checkpoint contents for digest {:?}",
2105 local_summary.digest()
2106 )
2107 })
2108 .unwrap_or_else(|| {
2109 panic!(
2110 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2111 local_summary.sequence_number,
2112 local_summary.digest()
2113 )
2114 });
2115 let local_contents_text = format!("{local_checkpoint_contents:?}");
2116
2117 let local_summary_text = format!("{local_summary:?}");
2118 let local_validator = state.name.concise();
2119 let diff_patches = response_data
2120 .iter()
2121 .map(|(name, other_digest, other_summary, contents)| {
2122 let other_contents_text = format!("{contents:?}");
2123 let other_summary_text = format!("{other_summary:?}");
2124 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2125 .enumerate_transactions(&local_summary)
2126 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2127 .unzip();
2128 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2129 .enumerate_transactions(other_summary)
2130 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2131 .unzip();
2132 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2133 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2134 let local_transactions_text = format!("{local_transactions:#?}");
2135 let other_transactions_text = format!("{other_transactions:#?}");
2136 let transactions_patch =
2137 create_patch(&local_transactions_text, &other_transactions_text);
2138 let local_effects_text = format!("{local_effects:#?}");
2139 let other_effects_text = format!("{other_effects:#?}");
2140 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2141 let seq_number = local_summary.sequence_number;
2142 let local_digest = local_summary.digest();
2143 let other_validator = name.concise();
2144 format!(
2145 "Checkpoint: {seq_number:?}\n\
2146 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2147 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2148 Summary Diff: \n{summary_patch}\n\n\
2149 Contents Diff: \n{contents_patch}\n\n\
2150 Transactions Diff: \n{transactions_patch}\n\n\
2151 Effects Diff: \n{effects_patch}",
2152 )
2153 })
2154 .collect::<Vec<_>>()
2155 .join("\n\n\n");
2156
2157 let header = format!(
2158 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2159 Datetime: {time}",
2160 );
2161 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2162 let path = tempfile::tempdir()
2163 .expect("Failed to create tempdir")
2164 .into_path()
2165 .join(Path::new("checkpoint_fork_dump.txt"));
2166 let mut file = File::create(path).unwrap();
2167 write!(file, "{}", fork_logs_text).unwrap();
2168 debug!("{}", fork_logs_text);
2169
2170 fail_point!("split_brain_reached");
2171}
2172
2173pub trait CheckpointServiceNotify {
2174 fn notify_checkpoint_signature(
2175 &self,
2176 epoch_store: &AuthorityPerEpochStore,
2177 info: &CheckpointSignatureMessage,
2178 ) -> IotaResult;
2179
2180 fn notify_checkpoint(&self) -> IotaResult;
2181}
2182
2183pub struct CheckpointService {
2186 tables: Arc<CheckpointStore>,
2187 notify_builder: Arc<Notify>,
2188 notify_aggregator: Arc<Notify>,
2189 last_signature_index: Mutex<u64>,
2190 metrics: Arc<CheckpointMetrics>,
2191}
2192
2193impl CheckpointService {
2194 pub fn spawn(
2197 state: Arc<AuthorityState>,
2198 checkpoint_store: Arc<CheckpointStore>,
2199 epoch_store: Arc<AuthorityPerEpochStore>,
2200 effects_store: Arc<dyn TransactionCacheRead>,
2201 accumulator: Weak<StateAccumulator>,
2202 checkpoint_output: Box<dyn CheckpointOutput>,
2203 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2204 metrics: Arc<CheckpointMetrics>,
2205 max_transactions_per_checkpoint: usize,
2206 max_checkpoint_size_bytes: usize,
2207 ) -> (Arc<Self>, JoinSet<()> ) {
2208 info!(
2209 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2210 );
2211 let notify_builder = Arc::new(Notify::new());
2212 let notify_aggregator = Arc::new(Notify::new());
2213
2214 let mut tasks = JoinSet::new();
2215
2216 let builder = CheckpointBuilder::new(
2217 state.clone(),
2218 checkpoint_store.clone(),
2219 epoch_store.clone(),
2220 notify_builder.clone(),
2221 effects_store,
2222 accumulator,
2223 checkpoint_output,
2224 notify_aggregator.clone(),
2225 metrics.clone(),
2226 max_transactions_per_checkpoint,
2227 max_checkpoint_size_bytes,
2228 );
2229 tasks.spawn(monitored_future!(builder.run()));
2230
2231 let aggregator = CheckpointAggregator::new(
2232 checkpoint_store.clone(),
2233 epoch_store.clone(),
2234 notify_aggregator.clone(),
2235 certified_checkpoint_output,
2236 state.clone(),
2237 metrics.clone(),
2238 );
2239 tasks.spawn(monitored_future!(aggregator.run()));
2240
2241 let last_signature_index = epoch_store
2242 .get_last_checkpoint_signature_index()
2243 .expect("should not cross end of epoch");
2244 let last_signature_index = Mutex::new(last_signature_index);
2245
2246 let service = Arc::new(Self {
2247 tables: checkpoint_store,
2248 notify_builder,
2249 notify_aggregator,
2250 last_signature_index,
2251 metrics,
2252 });
2253
2254 (service, tasks)
2255 }
2256
2257 #[cfg(test)]
2258 fn write_and_notify_checkpoint_for_testing(
2259 &self,
2260 epoch_store: &AuthorityPerEpochStore,
2261 checkpoint: PendingCheckpoint,
2262 ) -> IotaResult {
2263 use crate::authority::authority_per_epoch_store::ConsensusCommitOutput;
2264
2265 let mut output = ConsensusCommitOutput::new();
2266 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2267 let mut batch = epoch_store.db_batch_for_test();
2268 output.write_to_batch(epoch_store, &mut batch)?;
2269 batch.write()?;
2270 self.notify_checkpoint()?;
2271 Ok(())
2272 }
2273}
2274
2275impl CheckpointServiceNotify for CheckpointService {
2276 fn notify_checkpoint_signature(
2277 &self,
2278 epoch_store: &AuthorityPerEpochStore,
2279 info: &CheckpointSignatureMessage,
2280 ) -> IotaResult {
2281 let sequence = info.summary.sequence_number;
2282 let signer = info.summary.auth_sig().authority.concise();
2283
2284 if let Some(highest_verified_checkpoint) = self
2285 .tables
2286 .get_highest_verified_checkpoint()?
2287 .map(|x| *x.sequence_number())
2288 {
2289 if sequence <= highest_verified_checkpoint {
2290 debug!(
2291 checkpoint_seq = sequence,
2292 "Ignore checkpoint signature from {} - already certified", signer,
2293 );
2294 self.metrics
2295 .last_ignored_checkpoint_signature_received
2296 .set(sequence as i64);
2297 return Ok(());
2298 }
2299 }
2300 debug!(
2301 checkpoint_seq = sequence,
2302 "Received checkpoint signature, digest {} from {}",
2303 info.summary.digest(),
2304 signer,
2305 );
2306 self.metrics
2307 .last_received_checkpoint_signatures
2308 .with_label_values(&[&signer.to_string()])
2309 .set(sequence as i64);
2310 let mut index = self.last_signature_index.lock();
2314 *index += 1;
2315 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2316 self.notify_aggregator.notify_one();
2317 Ok(())
2318 }
2319
2320 fn notify_checkpoint(&self) -> IotaResult {
2321 self.notify_builder.notify_one();
2322 Ok(())
2323 }
2324}
2325
2326pub struct CheckpointServiceNoop {}
2328impl CheckpointServiceNotify for CheckpointServiceNoop {
2329 fn notify_checkpoint_signature(
2330 &self,
2331 _: &AuthorityPerEpochStore,
2332 _: &CheckpointSignatureMessage,
2333 ) -> IotaResult {
2334 Ok(())
2335 }
2336
2337 fn notify_checkpoint(&self) -> IotaResult {
2338 Ok(())
2339 }
2340}
2341
2342#[cfg(test)]
2343mod tests {
2344 use std::{
2345 collections::{BTreeMap, HashMap},
2346 ops::Deref,
2347 };
2348
2349 use futures::{FutureExt as _, future::BoxFuture};
2350 use iota_macros::sim_test;
2351 use iota_protocol_config::{Chain, ProtocolConfig};
2352 use iota_types::{
2353 base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2354 crypto::Signature,
2355 digests::TransactionEventsDigest,
2356 effects::{TransactionEffects, TransactionEvents},
2357 messages_checkpoint::SignedCheckpointSummary,
2358 move_package::MovePackage,
2359 object,
2360 transaction::{GenesisObject, VerifiedTransaction},
2361 };
2362 use tokio::sync::mpsc;
2363
2364 use super::*;
2365 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2366
2367 #[sim_test]
2368 pub async fn checkpoint_builder_test() {
2369 telemetry_subscribers::init_for_testing();
2370
2371 let mut protocol_config =
2372 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2373 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2374 let state = TestAuthorityBuilder::new()
2375 .with_protocol_config(protocol_config)
2376 .build()
2377 .await;
2378
2379 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2380 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2381 vec![GenesisObject::RawObject {
2382 data: object::Data::Package(
2383 MovePackage::new(
2384 ObjectID::random(),
2385 SequenceNumber::new(),
2386 BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2387 100_000,
2388 Vec::new(),
2391 BTreeMap::new(),
2394 )
2395 .unwrap(),
2396 ),
2397 owner: object::Owner::Immutable,
2398 }],
2399 vec![],
2400 );
2401 for i in 0..15 {
2402 state
2403 .database_for_testing()
2404 .perpetual_tables
2405 .transactions
2406 .insert(&d(i), dummy_tx.serializable_ref())
2407 .unwrap();
2408 }
2409 for i in 15..20 {
2410 state
2411 .database_for_testing()
2412 .perpetual_tables
2413 .transactions
2414 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2415 .unwrap();
2416 }
2417
2418 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2419 commit_cert_for_test(
2420 &mut store,
2421 state.clone(),
2422 d(1),
2423 vec![d(2), d(3)],
2424 GasCostSummary::new(11, 11, 12, 11, 1),
2425 );
2426 commit_cert_for_test(
2427 &mut store,
2428 state.clone(),
2429 d(2),
2430 vec![d(3), d(4)],
2431 GasCostSummary::new(21, 21, 22, 21, 1),
2432 );
2433 commit_cert_for_test(
2434 &mut store,
2435 state.clone(),
2436 d(3),
2437 vec![],
2438 GasCostSummary::new(31, 31, 32, 31, 1),
2439 );
2440 commit_cert_for_test(
2441 &mut store,
2442 state.clone(),
2443 d(4),
2444 vec![],
2445 GasCostSummary::new(41, 41, 42, 41, 1),
2446 );
2447 for i in [5, 6, 7, 10, 11, 12, 13] {
2448 commit_cert_for_test(
2449 &mut store,
2450 state.clone(),
2451 d(i),
2452 vec![],
2453 GasCostSummary::new(41, 41, 42, 41, 1),
2454 );
2455 }
2456 for i in [15, 16, 17] {
2457 commit_cert_for_test(
2458 &mut store,
2459 state.clone(),
2460 d(i),
2461 vec![],
2462 GasCostSummary::new(51, 51, 52, 51, 1),
2463 );
2464 }
2465 let all_digests: Vec<_> = store.keys().copied().collect();
2466 for digest in all_digests {
2467 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2468 state
2469 .epoch_store_for_testing()
2470 .test_insert_user_signature(digest, vec![signature]);
2471 }
2472
2473 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2474 let (certified_output, mut certified_result) =
2475 mpsc::channel::<CertifiedCheckpointSummary>(10);
2476 let store = Arc::new(store);
2477
2478 let ckpt_dir = tempfile::tempdir().unwrap();
2479 let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2480 let epoch_store = state.epoch_store_for_testing();
2481
2482 let accumulator = Arc::new(StateAccumulator::new_for_tests(
2483 state.get_accumulator_store().clone(),
2484 ));
2485
2486 let (checkpoint_service, _tasks) = CheckpointService::spawn(
2487 state.clone(),
2488 checkpoint_store,
2489 epoch_store.clone(),
2490 store,
2491 Arc::downgrade(&accumulator),
2492 Box::new(output),
2493 Box::new(certified_output),
2494 CheckpointMetrics::new_for_tests(),
2495 3,
2496 100_000,
2497 );
2498
2499 checkpoint_service
2500 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2501 .unwrap();
2502 checkpoint_service
2503 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2504 .unwrap();
2505 checkpoint_service
2506 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2507 .unwrap();
2508 checkpoint_service
2509 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2510 .unwrap();
2511 checkpoint_service
2512 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2513 .unwrap();
2514 checkpoint_service
2515 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2516 .unwrap();
2517
2518 let (c1c, c1s) = result.recv().await.unwrap();
2519 let (c2c, c2s) = result.recv().await.unwrap();
2520
2521 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2522 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2523 assert_eq!(c1t, vec![d(4)]);
2524 assert_eq!(c1s.previous_digest, None);
2525 assert_eq!(c1s.sequence_number, 0);
2526 assert_eq!(
2527 c1s.epoch_rolling_gas_cost_summary,
2528 GasCostSummary::new(41, 41, 42, 41, 1)
2529 );
2530
2531 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2532 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2533 assert_eq!(c2s.sequence_number, 1);
2534 assert_eq!(
2535 c2s.epoch_rolling_gas_cost_summary,
2536 GasCostSummary::new(104, 104, 108, 104, 4)
2537 );
2538
2539 let (c3c, c3s) = result.recv().await.unwrap();
2542 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2543 let (c4c, c4s) = result.recv().await.unwrap();
2544 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2545 assert_eq!(c3s.sequence_number, 2);
2546 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2547 assert_eq!(c4s.sequence_number, 3);
2548 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2549 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2550 assert_eq!(c4t, vec![d(13)]);
2551
2552 let (c5c, c5s) = result.recv().await.unwrap();
2555 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2556 let (c6c, c6s) = result.recv().await.unwrap();
2557 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2558 assert_eq!(c5s.sequence_number, 4);
2559 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2560 assert_eq!(c6s.sequence_number, 5);
2561 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2562 assert_eq!(c5t, vec![d(15), d(16)]);
2563 assert_eq!(c6t, vec![d(17)]);
2564
2565 let (c7c, c7s) = result.recv().await.unwrap();
2568 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2569 assert_eq!(c7t, vec![d(5), d(6)]);
2570 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2571 assert_eq!(c7s.sequence_number, 6);
2572
2573 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2574 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2575
2576 checkpoint_service
2577 .notify_checkpoint_signature(
2578 &epoch_store,
2579 &CheckpointSignatureMessage { summary: c2ss },
2580 )
2581 .unwrap();
2582 checkpoint_service
2583 .notify_checkpoint_signature(
2584 &epoch_store,
2585 &CheckpointSignatureMessage { summary: c1ss },
2586 )
2587 .unwrap();
2588
2589 let c1sc = certified_result.recv().await.unwrap();
2590 let c2sc = certified_result.recv().await.unwrap();
2591 assert_eq!(c1sc.sequence_number, 0);
2592 assert_eq!(c2sc.sequence_number, 1);
2593 }
2594
2595 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2596 fn notify_read_executed_effects(
2597 &self,
2598 digests: &[TransactionDigest],
2599 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2600 std::future::ready(Ok(digests
2601 .iter()
2602 .map(|d| self.get(d).expect("effects not found").clone())
2603 .collect()))
2604 .boxed()
2605 }
2606
2607 fn notify_read_executed_effects_digests(
2608 &self,
2609 digests: &[TransactionDigest],
2610 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2611 std::future::ready(Ok(digests
2612 .iter()
2613 .map(|d| {
2614 self.get(d)
2615 .map(|fx| fx.digest())
2616 .expect("effects not found")
2617 })
2618 .collect()))
2619 .boxed()
2620 }
2621
2622 fn multi_get_executed_effects(
2623 &self,
2624 digests: &[TransactionDigest],
2625 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2626 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2627 }
2628
2629 fn multi_get_transaction_blocks(
2636 &self,
2637 _: &[TransactionDigest],
2638 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2639 unimplemented!()
2640 }
2641
2642 fn multi_get_executed_effects_digests(
2643 &self,
2644 _: &[TransactionDigest],
2645 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2646 unimplemented!()
2647 }
2648
2649 fn multi_get_effects(
2650 &self,
2651 _: &[TransactionEffectsDigest],
2652 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2653 unimplemented!()
2654 }
2655
2656 fn multi_get_events(
2657 &self,
2658 _: &[TransactionEventsDigest],
2659 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2660 unimplemented!()
2661 }
2662 }
2663
2664 #[async_trait::async_trait]
2665 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2666 async fn checkpoint_created(
2667 &self,
2668 summary: &CheckpointSummary,
2669 contents: &CheckpointContents,
2670 _epoch_store: &Arc<AuthorityPerEpochStore>,
2671 _checkpoint_store: &Arc<CheckpointStore>,
2672 ) -> IotaResult {
2673 self.try_send((contents.clone(), summary.clone())).unwrap();
2674 Ok(())
2675 }
2676 }
2677
2678 #[async_trait::async_trait]
2679 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2680 async fn certified_checkpoint_created(
2681 &self,
2682 summary: &CertifiedCheckpointSummary,
2683 ) -> IotaResult {
2684 self.try_send(summary.clone()).unwrap();
2685 Ok(())
2686 }
2687 }
2688
2689 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2690 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2691 roots: t
2692 .into_iter()
2693 .map(|t| TransactionKey::Digest(d(t)))
2694 .collect(),
2695 details: PendingCheckpointInfo {
2696 timestamp_ms,
2697 last_of_epoch: false,
2698 checkpoint_height: i,
2699 },
2700 })
2701 }
2702
2703 fn d(i: u8) -> TransactionDigest {
2704 let mut bytes: [u8; 32] = Default::default();
2705 bytes[0] = i;
2706 TransactionDigest::new(bytes)
2707 }
2708
2709 fn e(
2710 transaction_digest: TransactionDigest,
2711 dependencies: Vec<TransactionDigest>,
2712 gas_used: GasCostSummary,
2713 ) -> TransactionEffects {
2714 let mut effects = TransactionEffects::default();
2715 *effects.transaction_digest_mut_for_testing() = transaction_digest;
2716 *effects.dependencies_mut_for_testing() = dependencies;
2717 *effects.gas_cost_summary_mut_for_testing() = gas_used;
2718 effects
2719 }
2720
2721 fn commit_cert_for_test(
2722 store: &mut HashMap<TransactionDigest, TransactionEffects>,
2723 state: Arc<AuthorityState>,
2724 digest: TransactionDigest,
2725 dependencies: Vec<TransactionDigest>,
2726 gas_used: GasCostSummary,
2727 ) {
2728 let epoch_store = state.epoch_store_for_testing();
2729 let effects = e(digest, dependencies, gas_used);
2730 store.insert(digest, effects.clone());
2731 epoch_store
2732 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2733 .expect("Inserting cert fx and sigs should not fail");
2734 }
2735}