1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27 committee::StakeUnit,
28 crypto::AuthorityStrongQuorumSignInfo,
29 digests::{CheckpointContentsDigest, CheckpointDigest},
30 effects::{TransactionEffects, TransactionEffectsAPI},
31 error::{IotaError, IotaResult},
32 event::SystemEpochInfoEvent,
33 gas::GasCostSummary,
34 iota_system_state::{
35 IotaSystemState, IotaSystemStateTrait,
36 epoch_start_iota_system_state::EpochStartSystemStateTrait,
37 },
38 message_envelope::Message,
39 messages_checkpoint::{
40 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
41 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
42 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
43 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
44 VerifiedCheckpointContents,
45 },
46 messages_consensus::ConsensusTransactionKey,
47 signature::GenericSignature,
48 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
49};
50use itertools::Itertools;
51use nonempty::NonEmpty;
52use parking_lot::Mutex;
53use rand::{rngs::OsRng, seq::SliceRandom};
54use serde::{Deserialize, Serialize};
55use tokio::{
56 sync::{Notify, watch},
57 task::JoinSet,
58 time::timeout,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use typed_store::{
62 DBMapUtils, Map, TypedStoreError,
63 rocks::{DBMap, MetricConf},
64 traits::{TableSummary, TypedStoreDebug},
65};
66
67pub use crate::checkpoints::{
68 checkpoint_output::{
69 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
70 },
71 metrics::CheckpointMetrics,
72};
73use crate::{
74 authority::{
75 AuthorityState,
76 authority_per_epoch_store::{AuthorityPerEpochStore, scorer::MAX_SCORE},
77 },
78 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
79 checkpoints::{
80 causal_order::CausalOrder,
81 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
82 },
83 consensus_handler::SequencedConsensusTransactionKey,
84 execution_cache::TransactionCacheRead,
85 stake_aggregator::{InsertResult, MultiStakeAggregator},
86 state_accumulator::StateAccumulator,
87};
88
89pub type CheckpointHeight = u64;
90
91pub struct EpochStats {
92 pub checkpoint_count: u64,
93 pub transaction_count: u64,
94 pub total_gas_reward: u64,
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize)]
98pub struct PendingCheckpointInfo {
99 pub timestamp_ms: CheckpointTimestamp,
100 pub last_of_epoch: bool,
101 pub checkpoint_height: CheckpointHeight,
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize)]
105pub enum PendingCheckpoint {
106 V1(PendingCheckpointContentsV1),
108}
109
110#[derive(Clone, Debug, Serialize, Deserialize)]
111pub struct PendingCheckpointContentsV1 {
112 pub roots: Vec<TransactionKey>,
113 pub details: PendingCheckpointInfo,
114}
115
116impl PendingCheckpoint {
117 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
118 match self {
119 PendingCheckpoint::V1(contents) => contents,
120 }
121 }
122
123 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
124 match self {
125 PendingCheckpoint::V1(contents) => contents,
126 }
127 }
128
129 pub fn roots(&self) -> &Vec<TransactionKey> {
130 &self.as_v1().roots
131 }
132
133 pub fn details(&self) -> &PendingCheckpointInfo {
134 &self.as_v1().details
135 }
136
137 pub fn height(&self) -> CheckpointHeight {
138 self.details().checkpoint_height
139 }
140}
141
142#[derive(Clone, Debug, Serialize, Deserialize)]
143pub struct BuilderCheckpointSummary {
144 pub summary: CheckpointSummary,
145 pub checkpoint_height: Option<CheckpointHeight>,
147 pub position_in_commit: usize,
148}
149
150#[derive(DBMapUtils)]
151pub struct CheckpointStoreTables {
152 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
154
155 pub(crate) checkpoint_sequence_by_contents_digest:
159 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
160
161 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
166
167 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
169 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
171
172 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
176
177 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
180
181 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
184}
185
186impl CheckpointStoreTables {
187 pub fn new(path: &Path, metric_name: &'static str) -> Self {
188 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
189 }
190 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
191 Self::get_read_only_handle(
192 path.to_path_buf(),
193 None,
194 None,
195 MetricConf::new("checkpoint_readonly"),
196 )
197 }
198}
199
200pub struct CheckpointStore {
201 pub(crate) tables: CheckpointStoreTables,
202 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
203 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
204}
205
206impl CheckpointStore {
207 pub fn new(path: &Path) -> Arc<Self> {
208 let tables = CheckpointStoreTables::new(path, "checkpoint");
209 Arc::new(Self {
210 tables,
211 synced_checkpoint_notify_read: NotifyRead::new(),
212 executed_checkpoint_notify_read: NotifyRead::new(),
213 })
214 }
215
216 pub fn new_for_tests() -> Arc<Self> {
217 let ckpt_dir = tempfile::tempdir().unwrap();
218 CheckpointStore::new(ckpt_dir.path())
219 }
220
221 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
222 let tables = CheckpointStoreTables::new(path, "db_checkpoint");
223 Arc::new(Self {
224 tables,
225 synced_checkpoint_notify_read: NotifyRead::new(),
226 executed_checkpoint_notify_read: NotifyRead::new(),
227 })
228 }
229
230 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
231 CheckpointStoreTables::open_readonly(path)
232 }
233
234 #[instrument(level = "info", skip_all)]
235 pub fn insert_genesis_checkpoint(
236 &self,
237 checkpoint: VerifiedCheckpoint,
238 contents: CheckpointContents,
239 epoch_store: &AuthorityPerEpochStore,
240 ) {
241 assert_eq!(
242 checkpoint.epoch(),
243 0,
244 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
245 );
246 assert_eq!(
247 *checkpoint.sequence_number(),
248 0,
249 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
250 );
251
252 if self
255 .get_checkpoint_by_digest(checkpoint.digest())
256 .unwrap()
257 .is_none()
258 {
259 if epoch_store.epoch() == checkpoint.epoch {
260 epoch_store
261 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
262 .unwrap();
263 } else {
264 debug!(
265 validator_epoch =% epoch_store.epoch(),
266 genesis_epoch =% checkpoint.epoch(),
267 "Not inserting checkpoint builder data for genesis checkpoint",
268 );
269 }
270 self.insert_checkpoint_contents(contents).unwrap();
271 self.insert_verified_checkpoint(&checkpoint).unwrap();
272 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
273 }
274 }
275
276 pub fn get_checkpoint_by_digest(
277 &self,
278 digest: &CheckpointDigest,
279 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
280 self.tables
281 .checkpoint_by_digest
282 .get(digest)
283 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
284 }
285
286 pub fn get_checkpoint_by_sequence_number(
287 &self,
288 sequence_number: CheckpointSequenceNumber,
289 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
290 self.tables
291 .certified_checkpoints
292 .get(&sequence_number)
293 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
294 }
295
296 pub fn get_locally_computed_checkpoint(
297 &self,
298 sequence_number: CheckpointSequenceNumber,
299 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
300 self.tables
301 .locally_computed_checkpoints
302 .get(&sequence_number)
303 }
304
305 pub fn get_sequence_number_by_contents_digest(
310 &self,
311 digest: &CheckpointContentsDigest,
312 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
313 self.tables
314 .checkpoint_sequence_by_contents_digest
315 .get(digest)
316 }
317
318 pub fn delete_contents_digest_sequence_number_mapping(
319 &self,
320 digest: &CheckpointContentsDigest,
321 ) -> Result<(), TypedStoreError> {
322 self.tables
323 .checkpoint_sequence_by_contents_digest
324 .remove(digest)
325 }
326
327 pub fn get_latest_certified_checkpoint(
328 &self,
329 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
330 Ok(self
331 .tables
332 .certified_checkpoints
333 .reversed_safe_iter_with_bounds(None, None)?
334 .next()
335 .transpose()?
336 .map(|(_, v)| v.into()))
337 }
338
339 pub fn get_latest_locally_computed_checkpoint(
340 &self,
341 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
342 Ok(self
343 .tables
344 .locally_computed_checkpoints
345 .reversed_safe_iter_with_bounds(None, None)?
346 .next()
347 .transpose()?
348 .map(|(_, v)| v))
349 }
350
351 pub fn multi_get_checkpoint_by_sequence_number(
352 &self,
353 sequence_numbers: &[CheckpointSequenceNumber],
354 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
355 let checkpoints = self
356 .tables
357 .certified_checkpoints
358 .multi_get(sequence_numbers)?
359 .into_iter()
360 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
361 .collect();
362
363 Ok(checkpoints)
364 }
365
366 pub fn multi_get_checkpoint_content(
367 &self,
368 contents_digest: &[CheckpointContentsDigest],
369 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
370 self.tables.checkpoint_content.multi_get(contents_digest)
371 }
372
373 pub fn get_highest_verified_checkpoint(
374 &self,
375 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
376 let highest_verified = if let Some(highest_verified) = self
377 .tables
378 .watermarks
379 .get(&CheckpointWatermark::HighestVerified)?
380 {
381 highest_verified
382 } else {
383 return Ok(None);
384 };
385 self.get_checkpoint_by_digest(&highest_verified.1)
386 }
387
388 pub fn get_highest_synced_checkpoint(
389 &self,
390 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
391 let highest_synced = if let Some(highest_synced) = self
392 .tables
393 .watermarks
394 .get(&CheckpointWatermark::HighestSynced)?
395 {
396 highest_synced
397 } else {
398 return Ok(None);
399 };
400 self.get_checkpoint_by_digest(&highest_synced.1)
401 }
402
403 pub fn get_highest_synced_checkpoint_seq_number(
404 &self,
405 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
406 if let Some(highest_synced) = self
407 .tables
408 .watermarks
409 .get(&CheckpointWatermark::HighestSynced)?
410 {
411 Ok(Some(highest_synced.0))
412 } else {
413 Ok(None)
414 }
415 }
416
417 pub fn get_highest_executed_checkpoint_seq_number(
418 &self,
419 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
420 if let Some(highest_executed) = self
421 .tables
422 .watermarks
423 .get(&CheckpointWatermark::HighestExecuted)?
424 {
425 Ok(Some(highest_executed.0))
426 } else {
427 Ok(None)
428 }
429 }
430
431 pub fn get_highest_executed_checkpoint(
432 &self,
433 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
434 let highest_executed = if let Some(highest_executed) = self
435 .tables
436 .watermarks
437 .get(&CheckpointWatermark::HighestExecuted)?
438 {
439 highest_executed
440 } else {
441 return Ok(None);
442 };
443 self.get_checkpoint_by_digest(&highest_executed.1)
444 }
445
446 pub fn get_highest_pruned_checkpoint_seq_number(
447 &self,
448 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
449 self.tables
450 .watermarks
451 .get(&CheckpointWatermark::HighestPruned)
452 .map(|watermark| watermark.map(|w| w.0))
453 }
454
455 pub fn get_checkpoint_contents(
456 &self,
457 digest: &CheckpointContentsDigest,
458 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
459 self.tables.checkpoint_content.get(digest)
460 }
461
462 pub fn get_full_checkpoint_contents_by_sequence_number(
463 &self,
464 seq: CheckpointSequenceNumber,
465 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
466 self.tables.full_checkpoint_content.get(&seq)
467 }
468
469 fn prune_local_summaries(&self) -> IotaResult {
470 if let Some((last_local_summary, _)) = self
471 .tables
472 .locally_computed_checkpoints
473 .reversed_safe_iter_with_bounds(None, None)?
474 .next()
475 .transpose()?
476 {
477 let mut batch = self.tables.locally_computed_checkpoints.batch();
478 batch.schedule_delete_range(
479 &self.tables.locally_computed_checkpoints,
480 &0,
481 &last_local_summary,
482 )?;
483 batch.write()?;
484 info!("Pruned local summaries up to {:?}", last_local_summary);
485 }
486 Ok(())
487 }
488
489 #[instrument(level = "trace", skip_all)]
490 fn check_for_checkpoint_fork(
491 &self,
492 local_checkpoint: &CheckpointSummary,
493 verified_checkpoint: &VerifiedCheckpoint,
494 ) {
495 if local_checkpoint != verified_checkpoint.data() {
496 let verified_contents = self
497 .get_checkpoint_contents(&verified_checkpoint.content_digest)
498 .map(|opt_contents| {
499 opt_contents
500 .map(|contents| format!("{contents:?}"))
501 .unwrap_or_else(|| {
502 format!(
503 "Verified checkpoint contents not found, digest: {:?}",
504 verified_checkpoint.content_digest,
505 )
506 })
507 })
508 .map_err(|e| {
509 format!(
510 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
511 verified_checkpoint.content_digest, e
512 )
513 })
514 .unwrap_or_else(|err_msg| err_msg);
515
516 let local_contents = self
517 .get_checkpoint_contents(&local_checkpoint.content_digest)
518 .map(|opt_contents| {
519 opt_contents
520 .map(|contents| format!("{contents:?}"))
521 .unwrap_or_else(|| {
522 format!(
523 "Local checkpoint contents not found, digest: {:?}",
524 local_checkpoint.content_digest
525 )
526 })
527 })
528 .map_err(|e| {
529 format!(
530 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
531 local_checkpoint.content_digest, e
532 )
533 })
534 .unwrap_or_else(|err_msg| err_msg);
535
536 error!(
538 verified_checkpoint = ?verified_checkpoint.data(),
539 ?verified_contents,
540 ?local_checkpoint,
541 ?local_contents,
542 "Local checkpoint fork detected!",
543 );
544 fatal!(
545 "Local checkpoint fork detected for sequence number: {}",
546 local_checkpoint.sequence_number()
547 );
548 }
549 }
550
551 pub fn insert_certified_checkpoint(
557 &self,
558 checkpoint: &VerifiedCheckpoint,
559 ) -> Result<(), TypedStoreError> {
560 debug!(
561 checkpoint_seq = checkpoint.sequence_number(),
562 "Inserting certified checkpoint",
563 );
564 let mut batch = self.tables.certified_checkpoints.batch();
565 batch
566 .insert_batch(
567 &self.tables.certified_checkpoints,
568 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
569 )?
570 .insert_batch(
571 &self.tables.checkpoint_by_digest,
572 [(checkpoint.digest(), checkpoint.serializable_ref())],
573 )?;
574 if checkpoint.next_epoch_committee().is_some() {
575 batch.insert_batch(
576 &self.tables.epoch_last_checkpoint_map,
577 [(&checkpoint.epoch(), checkpoint.sequence_number())],
578 )?;
579 }
580 batch.write()?;
581
582 if let Some(local_checkpoint) = self
583 .tables
584 .locally_computed_checkpoints
585 .get(checkpoint.sequence_number())?
586 {
587 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
588 }
589
590 Ok(())
591 }
592
593 #[instrument(level = "trace", skip_all)]
596 pub fn insert_verified_checkpoint(
597 &self,
598 checkpoint: &VerifiedCheckpoint,
599 ) -> Result<(), TypedStoreError> {
600 self.insert_certified_checkpoint(checkpoint)?;
601 self.update_highest_verified_checkpoint(checkpoint)
602 }
603
604 pub fn update_highest_verified_checkpoint(
605 &self,
606 checkpoint: &VerifiedCheckpoint,
607 ) -> Result<(), TypedStoreError> {
608 if Some(*checkpoint.sequence_number())
609 > self
610 .get_highest_verified_checkpoint()?
611 .map(|x| *x.sequence_number())
612 {
613 debug!(
614 checkpoint_seq = checkpoint.sequence_number(),
615 "Updating highest verified checkpoint",
616 );
617 self.tables.watermarks.insert(
618 &CheckpointWatermark::HighestVerified,
619 &(*checkpoint.sequence_number(), *checkpoint.digest()),
620 )?;
621 }
622
623 Ok(())
624 }
625
626 pub fn update_highest_synced_checkpoint(
627 &self,
628 checkpoint: &VerifiedCheckpoint,
629 ) -> Result<(), TypedStoreError> {
630 let seq = *checkpoint.sequence_number();
631 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
632 self.tables.watermarks.insert(
633 &CheckpointWatermark::HighestSynced,
634 &(seq, *checkpoint.digest()),
635 )?;
636 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
637 Ok(())
638 }
639
640 async fn notify_read_checkpoint_watermark<F>(
641 &self,
642 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
643 seq: CheckpointSequenceNumber,
644 get_watermark: F,
645 ) -> VerifiedCheckpoint
646 where
647 F: Fn() -> Option<CheckpointSequenceNumber>,
648 {
649 type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
650
651 notify_read
652 .read(&[seq], |seqs| {
653 let seq = seqs[0];
654 let Some(highest) = get_watermark() else {
655 return Ok(vec![None]) as ReadResult;
656 };
657 if highest < seq {
658 return Ok(vec![None]) as ReadResult;
659 }
660 let checkpoint = self
661 .get_checkpoint_by_sequence_number(seq)
662 .expect("db error")
663 .expect("checkpoint not found");
664 Ok(vec![Some(checkpoint)]) as ReadResult
665 })
666 .await
667 .unwrap()
668 .into_iter()
669 .next()
670 .unwrap()
671 }
672
673 pub async fn notify_read_synced_checkpoint(
674 &self,
675 seq: CheckpointSequenceNumber,
676 ) -> VerifiedCheckpoint {
677 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
678 self.get_highest_synced_checkpoint_seq_number()
679 .expect("db error")
680 })
681 .await
682 }
683
684 pub async fn notify_read_executed_checkpoint(
685 &self,
686 seq: CheckpointSequenceNumber,
687 ) -> VerifiedCheckpoint {
688 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
689 self.get_highest_executed_checkpoint_seq_number()
690 .expect("db error")
691 })
692 .await
693 }
694
695 pub fn update_highest_executed_checkpoint(
696 &self,
697 checkpoint: &VerifiedCheckpoint,
698 ) -> Result<(), TypedStoreError> {
699 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
700 if seq_number >= *checkpoint.sequence_number() {
701 return Ok(());
702 }
703 assert_eq!(
704 seq_number + 1,
705 *checkpoint.sequence_number(),
706 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
707 checkpoint.sequence_number(),
708 seq_number
709 );
710 }
711 let seq = *checkpoint.sequence_number();
712 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
713 self.tables.watermarks.insert(
714 &CheckpointWatermark::HighestExecuted,
715 &(seq, *checkpoint.digest()),
716 )?;
717 self.executed_checkpoint_notify_read
718 .notify(&seq, checkpoint);
719 Ok(())
720 }
721
722 pub fn update_highest_pruned_checkpoint(
723 &self,
724 checkpoint: &VerifiedCheckpoint,
725 ) -> Result<(), TypedStoreError> {
726 self.tables.watermarks.insert(
727 &CheckpointWatermark::HighestPruned,
728 &(*checkpoint.sequence_number(), *checkpoint.digest()),
729 )
730 }
731
732 pub fn set_highest_executed_checkpoint_subtle(
738 &self,
739 checkpoint: &VerifiedCheckpoint,
740 ) -> Result<(), TypedStoreError> {
741 self.tables.watermarks.insert(
742 &CheckpointWatermark::HighestExecuted,
743 &(*checkpoint.sequence_number(), *checkpoint.digest()),
744 )
745 }
746
747 pub fn insert_checkpoint_contents(
748 &self,
749 contents: CheckpointContents,
750 ) -> Result<(), TypedStoreError> {
751 debug!(
752 checkpoint_seq = ?contents.digest(),
753 "Inserting checkpoint contents",
754 );
755 self.tables
756 .checkpoint_content
757 .insert(contents.digest(), &contents)
758 }
759
760 pub fn insert_verified_checkpoint_contents(
767 &self,
768 checkpoint: &VerifiedCheckpoint,
769 full_contents: VerifiedCheckpointContents,
770 ) -> Result<(), TypedStoreError> {
771 let mut batch = self.tables.full_checkpoint_content.batch();
772 batch.insert_batch(
773 &self.tables.checkpoint_sequence_by_contents_digest,
774 [(&checkpoint.content_digest, checkpoint.sequence_number())],
775 )?;
776 let full_contents = full_contents.into_inner();
777 batch.insert_batch(
778 &self.tables.full_checkpoint_content,
779 [(checkpoint.sequence_number(), &full_contents)],
780 )?;
781
782 let contents = full_contents.into_checkpoint_contents();
783 assert_eq!(&checkpoint.content_digest, contents.digest());
784
785 batch.insert_batch(
786 &self.tables.checkpoint_content,
787 [(contents.digest(), &contents)],
788 )?;
789
790 batch.write()
791 }
792
793 pub fn delete_full_checkpoint_contents(
794 &self,
795 seq: CheckpointSequenceNumber,
796 ) -> Result<(), TypedStoreError> {
797 self.tables.full_checkpoint_content.remove(&seq)
798 }
799
800 pub fn get_epoch_last_checkpoint(
801 &self,
802 epoch_id: EpochId,
803 ) -> IotaResult<Option<VerifiedCheckpoint>> {
804 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
805 let checkpoint = match seq {
806 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
807 None => None,
808 };
809 Ok(checkpoint)
810 }
811
812 pub fn insert_epoch_last_checkpoint(
813 &self,
814 epoch_id: EpochId,
815 checkpoint: &VerifiedCheckpoint,
816 ) -> IotaResult {
817 self.tables
818 .epoch_last_checkpoint_map
819 .insert(&epoch_id, checkpoint.sequence_number())?;
820 Ok(())
821 }
822
823 pub fn get_epoch_state_commitments(
824 &self,
825 epoch: EpochId,
826 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
827 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
828 checkpoint
829 .end_of_epoch_data
830 .as_ref()
831 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
832 .epoch_commitments
833 .clone()
834 });
835 Ok(commitments)
836 }
837
838 pub fn get_epoch_stats(
841 &self,
842 epoch: EpochId,
843 last_checkpoint: &CheckpointSummary,
844 ) -> Option<EpochStats> {
845 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
846 (0, 0)
847 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
848 (
849 checkpoint.sequence_number + 1,
850 checkpoint.network_total_transactions,
851 )
852 } else {
853 return None;
854 };
855 Some(EpochStats {
856 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
857 transaction_count: last_checkpoint.network_total_transactions
858 - prev_epoch_network_transactions,
859 total_gas_reward: last_checkpoint
860 .epoch_rolling_gas_cost_summary
861 .computation_cost,
862 })
863 }
864
865 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
866 self.tables
868 .checkpoint_content
869 .checkpoint_db(path)
870 .map_err(Into::into)
871 }
872
873 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
874 let mut wb = self.tables.watermarks.batch();
875 wb.delete_batch(
876 &self.tables.watermarks,
877 std::iter::once(CheckpointWatermark::HighestExecuted),
878 )?;
879 wb.write()?;
880 Ok(())
881 }
882
883 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
884 self.delete_highest_executed_checkpoint_test_only()?;
885 self.tables.watermarks.rocksdb.flush()?;
886 Ok(())
887 }
888}
889
890#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
891pub enum CheckpointWatermark {
892 HighestVerified,
893 HighestSynced,
894 HighestExecuted,
895 HighestPruned,
896}
897
898pub struct CheckpointBuilder {
899 state: Arc<AuthorityState>,
900 store: Arc<CheckpointStore>,
901 epoch_store: Arc<AuthorityPerEpochStore>,
902 notify: Arc<Notify>,
903 notify_aggregator: Arc<Notify>,
904 last_built: watch::Sender<CheckpointSequenceNumber>,
905 effects_store: Arc<dyn TransactionCacheRead>,
906 accumulator: Weak<StateAccumulator>,
907 output: Box<dyn CheckpointOutput>,
908 metrics: Arc<CheckpointMetrics>,
909 max_transactions_per_checkpoint: usize,
910 max_checkpoint_size_bytes: usize,
911}
912
913pub struct CheckpointAggregator {
914 store: Arc<CheckpointStore>,
915 epoch_store: Arc<AuthorityPerEpochStore>,
916 notify: Arc<Notify>,
917 current: Option<CheckpointSignatureAggregator>,
918 output: Box<dyn CertifiedCheckpointOutput>,
919 state: Arc<AuthorityState>,
920 metrics: Arc<CheckpointMetrics>,
921}
922
923pub struct CheckpointSignatureAggregator {
925 next_index: u64,
926 summary: CheckpointSummary,
927 digest: CheckpointDigest,
928 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
930 store: Arc<CheckpointStore>,
931 state: Arc<AuthorityState>,
932 metrics: Arc<CheckpointMetrics>,
933}
934
935impl CheckpointBuilder {
936 fn new(
937 state: Arc<AuthorityState>,
938 store: Arc<CheckpointStore>,
939 epoch_store: Arc<AuthorityPerEpochStore>,
940 notify: Arc<Notify>,
941 effects_store: Arc<dyn TransactionCacheRead>,
942 accumulator: Weak<StateAccumulator>,
943 output: Box<dyn CheckpointOutput>,
944 notify_aggregator: Arc<Notify>,
945 last_built: watch::Sender<CheckpointSequenceNumber>,
946 metrics: Arc<CheckpointMetrics>,
947 max_transactions_per_checkpoint: usize,
948 max_checkpoint_size_bytes: usize,
949 ) -> Self {
950 Self {
951 state,
952 store,
953 epoch_store,
954 notify,
955 effects_store,
956 accumulator,
957 output,
958 notify_aggregator,
959 last_built,
960 metrics,
961 max_transactions_per_checkpoint,
962 max_checkpoint_size_bytes,
963 }
964 }
965
966 async fn run(mut self) {
969 info!("Starting CheckpointBuilder");
970 loop {
971 self.maybe_build_checkpoints().await;
972
973 self.notify.notified().await;
974 }
975 }
976
977 async fn maybe_build_checkpoints(&mut self) {
978 let _scope = monitored_scope("BuildCheckpoints");
979
980 let summary = self
982 .epoch_store
983 .last_built_checkpoint_builder_summary()
984 .expect("epoch should not have ended");
985 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
986 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
987
988 let min_checkpoint_interval_ms = self
989 .epoch_store
990 .protocol_config()
991 .min_checkpoint_interval_ms_as_option()
992 .unwrap_or_default();
993 let mut grouped_pending_checkpoints = Vec::new();
994 let mut checkpoints_iter = self
995 .epoch_store
996 .get_pending_checkpoints(last_height)
997 .expect("unexpected epoch store error")
998 .into_iter()
999 .peekable();
1000 while let Some((height, pending)) = checkpoints_iter.next() {
1001 let current_timestamp = pending.details().timestamp_ms;
1004 let can_build = match last_timestamp {
1005 Some(last_timestamp) => {
1006 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1007 }
1008 None => true,
1009 } || checkpoints_iter
1012 .peek()
1013 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1014 || pending.details().last_of_epoch;
1016 grouped_pending_checkpoints.push(pending);
1017 if !can_build {
1018 debug!(
1019 checkpoint_commit_height = height,
1020 ?last_timestamp,
1021 ?current_timestamp,
1022 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1023 );
1024 continue;
1025 }
1026
1027 last_height = Some(height);
1029 last_timestamp = Some(current_timestamp);
1030 debug!(
1031 checkpoint_commit_height = height,
1032 "Making checkpoint at commit height"
1033 );
1034
1035 match self
1036 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1037 .await
1038 {
1039 Ok(seq) => {
1040 self.last_built.send_if_modified(|cur| {
1041 if seq > *cur {
1043 *cur = seq;
1044 true
1045 } else {
1046 false
1047 }
1048 });
1049 }
1050 Err(e) => {
1051 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1052 tokio::time::sleep(Duration::from_secs(1)).await;
1053 self.metrics.checkpoint_errors.inc();
1054 return;
1055 }
1056 }
1057 tokio::task::yield_now().await;
1060 }
1061 debug!(
1062 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1063 grouped_pending_checkpoints.len(),
1064 );
1065 }
1066
1067 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1068 async fn make_checkpoint(
1069 &self,
1070 pendings: Vec<PendingCheckpoint>,
1071 ) -> anyhow::Result<CheckpointSequenceNumber> {
1072 let last_details = pendings.last().unwrap().details().clone();
1073
1074 let mut effects_in_current_checkpoint = BTreeSet::new();
1080
1081 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1084 for pending_checkpoint in pendings.into_iter() {
1085 let pending = pending_checkpoint.into_v1();
1086 let txn_in_checkpoint = self
1087 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1088 .await?;
1089 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1090 }
1091 let new_checkpoints = self
1092 .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1093 .await?;
1094 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1095 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1096 .await?;
1097 Ok(highest_sequence)
1098 }
1099
1100 #[instrument(level = "debug", skip_all)]
1105 async fn resolve_checkpoint_transactions(
1106 &self,
1107 roots: Vec<TransactionKey>,
1108 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1109 ) -> IotaResult<Vec<TransactionEffects>> {
1110 self.metrics
1111 .checkpoint_roots_count
1112 .inc_by(roots.len() as u64);
1113
1114 let root_digests = self
1115 .epoch_store
1116 .notify_read_executed_digests(&roots)
1117 .in_monitored_scope("CheckpointNotifyDigests")
1118 .await?;
1119 let root_effects = self
1120 .effects_store
1121 .try_notify_read_executed_effects(&root_digests)
1122 .in_monitored_scope("CheckpointNotifyRead")
1123 .await?;
1124
1125 let _scope = monitored_scope("CheckpointBuilder");
1126
1127 let consensus_commit_prologue = {
1128 let consensus_commit_prologue = self
1132 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1133 .await?;
1134
1135 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1139 let unsorted_ccp = self.complete_checkpoint_effects(
1140 vec![ccp_effects.clone()],
1141 effects_in_current_checkpoint,
1142 )?;
1143
1144 if unsorted_ccp.len() != 1 {
1147 fatal!(
1148 "Expected 1 consensus commit prologue, got {:?}",
1149 unsorted_ccp
1150 .iter()
1151 .map(|e| e.transaction_digest())
1152 .collect::<Vec<_>>()
1153 );
1154 }
1155 assert_eq!(unsorted_ccp.len(), 1);
1156 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1157 }
1158 consensus_commit_prologue
1159 };
1160
1161 let unsorted =
1162 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1163
1164 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1165 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1166 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1167 #[cfg(debug_assertions)]
1168 {
1169 for tx in unsorted.iter() {
1172 assert!(tx.transaction_digest() != &_ccp_digest);
1173 }
1174 }
1175 sorted.push(ccp_effects);
1176 }
1177 sorted.extend(CausalOrder::causal_sort(unsorted));
1178
1179 #[cfg(msim)]
1180 {
1181 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1183 }
1184
1185 Ok(sorted)
1186 }
1187
1188 async fn extract_consensus_commit_prologue(
1193 &self,
1194 root_digests: &[TransactionDigest],
1195 root_effects: &[TransactionEffects],
1196 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1197 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1198 if root_digests.is_empty() {
1199 return Ok(None);
1200 }
1201
1202 let first_tx = self
1207 .state
1208 .get_transaction_cache_reader()
1209 .try_get_transaction_block(&root_digests[0])?
1210 .expect("Transaction block must exist");
1211
1212 Ok(match first_tx.transaction_data().kind() {
1213 TransactionKind::ConsensusCommitPrologueV1(_) => {
1214 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1215 Some((*first_tx.digest(), root_effects[0].clone()))
1216 }
1217 _ => None,
1218 })
1219 }
1220
1221 #[instrument(level = "debug", skip_all)]
1223 async fn write_checkpoints(
1224 &self,
1225 height: CheckpointHeight,
1226 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1227 ) -> IotaResult {
1228 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1229 let mut batch = self.store.tables.checkpoint_content.batch();
1230 let mut all_tx_digests =
1231 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1232
1233 for (summary, contents) in &new_checkpoints {
1234 debug!(
1235 checkpoint_commit_height = height,
1236 checkpoint_seq = summary.sequence_number,
1237 contents_digest = ?contents.digest(),
1238 "writing checkpoint",
1239 );
1240
1241 if let Some(previously_computed_summary) = self
1242 .store
1243 .tables
1244 .locally_computed_checkpoints
1245 .get(&summary.sequence_number)?
1246 {
1247 if previously_computed_summary != *summary {
1248 fatal!(
1250 "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1251 summary.sequence_number,
1252 );
1253 }
1254 }
1255
1256 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1257
1258 self.metrics
1259 .transactions_included_in_checkpoint
1260 .inc_by(contents.size() as u64);
1261 let sequence_number = summary.sequence_number;
1262 self.metrics
1263 .last_constructed_checkpoint
1264 .set(sequence_number as i64);
1265
1266 batch.insert_batch(
1267 &self.store.tables.checkpoint_content,
1268 [(contents.digest(), contents)],
1269 )?;
1270
1271 batch.insert_batch(
1272 &self.store.tables.locally_computed_checkpoints,
1273 [(sequence_number, summary)],
1274 )?;
1275 }
1276
1277 batch.write()?;
1278
1279 for (summary, contents) in &new_checkpoints {
1282 self.output
1283 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1284 .await?;
1285 }
1286
1287 for (local_checkpoint, _) in &new_checkpoints {
1288 if let Some(certified_checkpoint) = self
1289 .store
1290 .tables
1291 .certified_checkpoints
1292 .get(local_checkpoint.sequence_number())?
1293 {
1294 self.store
1295 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1296 }
1297 }
1298
1299 self.notify_aggregator.notify_one();
1300 self.epoch_store
1301 .process_constructed_checkpoint(height, new_checkpoints);
1302 Ok(())
1303 }
1304
1305 #[expect(clippy::type_complexity)]
1306 fn split_checkpoint_chunks(
1307 &self,
1308 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1309 signatures: Vec<Vec<GenericSignature>>,
1310 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1311 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1312 let mut chunks = Vec::new();
1313 let mut chunk = Vec::new();
1314 let mut chunk_size: usize = 0;
1315 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1316 .into_iter()
1317 .zip(signatures.into_iter())
1318 {
1319 let size = transaction_size
1324 + bcs::serialized_size(&effects)?
1325 + bcs::serialized_size(&signatures)?;
1326 if chunk.len() == self.max_transactions_per_checkpoint
1327 || (chunk_size + size) > self.max_checkpoint_size_bytes
1328 {
1329 if chunk.is_empty() {
1330 warn!(
1332 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1333 self.max_checkpoint_size_bytes
1334 );
1335 } else {
1336 chunks.push(chunk);
1337 chunk = Vec::new();
1338 chunk_size = 0;
1339 }
1340 }
1341
1342 chunk.push((effects, signatures));
1343 chunk_size += size;
1344 }
1345
1346 if !chunk.is_empty() || chunks.is_empty() {
1347 chunks.push(chunk);
1352 }
1358 Ok(chunks)
1359 }
1360
1361 fn load_last_built_checkpoint_summary(
1364 epoch_store: &AuthorityPerEpochStore,
1365 store: &CheckpointStore,
1366 ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1367 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1368 if last_checkpoint.is_none() {
1369 let epoch = epoch_store.epoch();
1370 if epoch > 0 {
1371 let previous_epoch = epoch - 1;
1372 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1373 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1374 if let Some((ref seq, _)) = last_checkpoint {
1375 debug!(
1376 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1377 );
1378 } else {
1379 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1382 }
1383 }
1384 }
1385 Ok(last_checkpoint)
1386 }
1387
1388 #[instrument(level = "debug", skip_all)]
1389 async fn create_checkpoints(
1390 &self,
1391 all_effects: Vec<TransactionEffects>,
1392 details: &PendingCheckpointInfo,
1393 ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1394 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1395 let total = all_effects.len();
1396 let mut last_checkpoint =
1397 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1398 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1399 debug!(
1400 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1401 checkpoint_timestamp = details.timestamp_ms,
1402 "Creating checkpoint(s) for {} transactions",
1403 all_effects.len(),
1404 );
1405
1406 let all_digests: Vec<_> = all_effects
1407 .iter()
1408 .map(|effect| *effect.transaction_digest())
1409 .collect();
1410 let transactions_and_sizes = self
1411 .state
1412 .get_transaction_cache_reader()
1413 .try_get_transactions_and_serialized_sizes(&all_digests)?;
1414 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1415 let mut transactions = Vec::with_capacity(all_effects.len());
1416 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1417 let mut randomness_rounds = BTreeMap::new();
1418 {
1419 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1420 debug!(
1421 ?last_checkpoint_seq,
1422 "Waiting for {:?} certificates to appear in consensus",
1423 all_effects.len()
1424 );
1425
1426 for (effects, transaction_and_size) in all_effects
1427 .into_iter()
1428 .zip(transactions_and_sizes.into_iter())
1429 {
1430 let (transaction, size) = transaction_and_size
1431 .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1432 match transaction.inner().transaction_data().kind() {
1433 TransactionKind::ConsensusCommitPrologueV1(_)
1434 | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1435 }
1440 TransactionKind::RandomnessStateUpdate(rsu) => {
1441 randomness_rounds
1442 .insert(*effects.transaction_digest(), rsu.randomness_round);
1443 }
1444 _ => {
1445 transaction_keys.push(SequencedConsensusTransactionKey::External(
1448 ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1449 ));
1450 }
1451 }
1452 transactions.push(transaction);
1453 all_effects_and_transaction_sizes.push((effects, size));
1454 }
1455
1456 self.epoch_store
1457 .consensus_messages_processed_notify(transaction_keys)
1458 .await?;
1459 }
1460
1461 let signatures = self
1462 .epoch_store
1463 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1464 debug!(
1465 ?last_checkpoint_seq,
1466 "Received {} checkpoint user signatures from consensus",
1467 signatures.len()
1468 );
1469
1470 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1471 let chunks_count = chunks.len();
1472
1473 let mut checkpoints = Vec::with_capacity(chunks_count);
1474 debug!(
1475 ?last_checkpoint_seq,
1476 "Creating {} checkpoints with {} transactions", chunks_count, total,
1477 );
1478
1479 let epoch = self.epoch_store.epoch();
1480 for (index, transactions) in chunks.into_iter().enumerate() {
1481 let first_checkpoint_of_epoch = index == 0
1482 && last_checkpoint
1483 .as_ref()
1484 .map(|(_, c)| c.epoch != epoch)
1485 .unwrap_or(true);
1486 if first_checkpoint_of_epoch {
1487 self.epoch_store
1488 .record_epoch_first_checkpoint_creation_time_metric();
1489 }
1490 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1491
1492 let sequence_number = last_checkpoint
1493 .as_ref()
1494 .map(|(_, c)| c.sequence_number + 1)
1495 .unwrap_or_default();
1496 let mut timestamp_ms = details.timestamp_ms;
1497 if let Some((_, last_checkpoint)) = &last_checkpoint {
1498 if last_checkpoint.timestamp_ms > timestamp_ms {
1499 debug!(
1501 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1502 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1503 );
1504 if self
1505 .epoch_store
1506 .protocol_config()
1507 .consensus_median_timestamp_with_checkpoint_enforcement()
1508 {
1509 timestamp_ms = last_checkpoint.timestamp_ms;
1510 }
1511 }
1512 }
1513
1514 if self
1515 .epoch_store
1516 .protocol_config()
1517 .calculate_validator_scores()
1518 {
1519 self.epoch_store.scorer.update_scores();
1526 }
1527
1528 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1529 let epoch_rolling_gas_cost_summary =
1530 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1531
1532 let end_of_epoch_data = if last_checkpoint_of_epoch {
1533 let scores: Vec<u64> = if self
1534 .epoch_store
1535 .protocol_config()
1536 .pass_calculated_validator_scores_to_advance_epoch()
1537 {
1538 self.epoch_store
1539 .scorer
1540 .current_scores
1541 .iter()
1542 .map(|x| x.load(std::sync::atomic::Ordering::Relaxed))
1543 .collect()
1544 } else {
1545 vec![MAX_SCORE; self.epoch_store.committee().num_members()]
1547 };
1548
1549 let (system_state_obj, system_epoch_info_event) = self
1550 .augment_epoch_last_checkpoint(
1551 &epoch_rolling_gas_cost_summary,
1552 timestamp_ms,
1553 &mut effects,
1554 &mut signatures,
1555 sequence_number,
1556 scores,
1557 )
1558 .await?;
1559
1560 let epoch_supply_change =
1568 system_epoch_info_event.map_or(0, |event| event.supply_change());
1569
1570 let committee = system_state_obj
1571 .get_current_epoch_committee()
1572 .committee()
1573 .clone();
1574
1575 let root_state_digest = {
1578 let state_acc = self
1579 .accumulator
1580 .upgrade()
1581 .expect("No checkpoints should be getting built after local configuration");
1582 let acc = state_acc.accumulate_checkpoint(
1583 &effects,
1584 sequence_number,
1585 &self.epoch_store,
1586 )?;
1587
1588 state_acc
1589 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1590 .await?;
1591
1592 state_acc.accumulate_running_root(
1593 &self.epoch_store,
1594 sequence_number,
1595 Some(acc),
1596 )?;
1597 state_acc
1598 .digest_epoch(self.epoch_store.clone(), sequence_number)
1599 .await?
1600 };
1601 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1602 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1603
1604 let epoch_commitments = vec![root_state_digest.into()];
1605
1606 Some(EndOfEpochData {
1607 next_epoch_committee: committee.voting_rights,
1608 next_epoch_protocol_version: ProtocolVersion::new(
1609 system_state_obj.protocol_version(),
1610 ),
1611 epoch_commitments,
1612 epoch_supply_change,
1613 })
1614 } else {
1615 None
1616 };
1617 let contents = CheckpointContents::new_with_digests_and_signatures(
1618 effects.iter().map(TransactionEffects::execution_digests),
1619 signatures,
1620 );
1621
1622 let num_txns = contents.size() as u64;
1623
1624 let network_total_transactions = last_checkpoint
1625 .as_ref()
1626 .map(|(_, c)| c.network_total_transactions + num_txns)
1627 .unwrap_or(num_txns);
1628
1629 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1630
1631 let matching_randomness_rounds: Vec<_> = effects
1632 .iter()
1633 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1634 .copied()
1635 .collect();
1636
1637 let summary = CheckpointSummary::new(
1638 self.epoch_store.protocol_config(),
1639 epoch,
1640 sequence_number,
1641 network_total_transactions,
1642 &contents,
1643 previous_digest,
1644 epoch_rolling_gas_cost_summary,
1645 end_of_epoch_data,
1646 timestamp_ms,
1647 matching_randomness_rounds,
1648 );
1649 summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1650 if last_checkpoint_of_epoch {
1651 info!(
1652 checkpoint_seq = sequence_number,
1653 "creating last checkpoint of epoch {}", epoch
1654 );
1655 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1656 self.epoch_store
1657 .report_epoch_metrics_at_last_checkpoint(stats);
1658 }
1659 }
1660 last_checkpoint = Some((sequence_number, summary.clone()));
1661 checkpoints.push((summary, contents));
1662 }
1663
1664 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1665 }
1666
1667 fn get_epoch_total_gas_cost(
1668 &self,
1669 last_checkpoint: Option<&CheckpointSummary>,
1670 cur_checkpoint_effects: &[TransactionEffects],
1671 ) -> GasCostSummary {
1672 let (previous_epoch, previous_gas_costs) = last_checkpoint
1673 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1674 .unwrap_or_default();
1675 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1676 if previous_epoch == self.epoch_store.epoch() {
1677 GasCostSummary::new(
1679 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1680 previous_gas_costs.computation_cost_burned
1681 + current_gas_costs.computation_cost_burned,
1682 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1683 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1684 previous_gas_costs.non_refundable_storage_fee
1685 + current_gas_costs.non_refundable_storage_fee,
1686 )
1687 } else {
1688 current_gas_costs
1689 }
1690 }
1691
1692 #[instrument(level = "error", skip_all)]
1695 async fn augment_epoch_last_checkpoint(
1696 &self,
1697 epoch_total_gas_cost: &GasCostSummary,
1698 epoch_start_timestamp_ms: CheckpointTimestamp,
1699 checkpoint_effects: &mut Vec<TransactionEffects>,
1700 signatures: &mut Vec<Vec<GenericSignature>>,
1701 checkpoint: CheckpointSequenceNumber,
1702 scores: Vec<u64>,
1703 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1704 let (system_state, system_epoch_info_event, effects) = self
1705 .state
1706 .create_and_execute_advance_epoch_tx(
1707 &self.epoch_store,
1708 epoch_total_gas_cost,
1709 checkpoint,
1710 epoch_start_timestamp_ms,
1711 scores,
1712 )
1713 .await?;
1714 checkpoint_effects.push(effects);
1715 signatures.push(vec![]);
1716 Ok((system_state, system_epoch_info_event))
1717 }
1718
1719 #[instrument(level = "debug", skip_all)]
1728 fn complete_checkpoint_effects(
1729 &self,
1730 mut roots: Vec<TransactionEffects>,
1731 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1732 ) -> IotaResult<Vec<TransactionEffects>> {
1733 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1734 let mut results = vec![];
1735 let mut seen = HashSet::new();
1736 loop {
1737 let mut pending = HashSet::new();
1738
1739 let transactions_included = self
1740 .epoch_store
1741 .builder_included_transactions_in_checkpoint(
1742 roots.iter().map(|e| e.transaction_digest()),
1743 )?;
1744
1745 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1746 let digest = effect.transaction_digest();
1747 seen.insert(*digest);
1750
1751 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1753 continue;
1754 }
1755
1756 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1758 continue;
1759 }
1760
1761 let existing_effects = self
1762 .epoch_store
1763 .transactions_executed_in_cur_epoch(effect.dependencies())?;
1764
1765 for (dependency, effects_signature_exists) in
1766 effect.dependencies().iter().zip(existing_effects.iter())
1767 {
1768 if !effects_signature_exists {
1773 continue;
1774 }
1775 if seen.insert(*dependency) {
1776 pending.insert(*dependency);
1777 }
1778 }
1779 results.push(effect);
1780 }
1781 if pending.is_empty() {
1782 break;
1783 }
1784 let pending = pending.into_iter().collect::<Vec<_>>();
1785 let effects = self
1786 .effects_store
1787 .try_multi_get_executed_effects(&pending)?;
1788 let effects = effects
1789 .into_iter()
1790 .zip(pending)
1791 .map(|(opt, digest)| match opt {
1792 Some(x) => x,
1793 None => panic!(
1794 "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1795 ),
1796 })
1797 .collect::<Vec<_>>();
1798 roots = effects;
1799 }
1800
1801 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1802 Ok(results)
1803 }
1804
1805 #[cfg(msim)]
1808 fn expensive_consensus_commit_prologue_invariants_check(
1809 &self,
1810 root_digests: &[TransactionDigest],
1811 sorted: &[TransactionEffects],
1812 ) {
1813 let root_txs = self
1815 .state
1816 .get_transaction_cache_reader()
1817 .multi_get_transaction_blocks(root_digests);
1818 let ccps = root_txs
1819 .iter()
1820 .filter_map(|tx| {
1821 tx.as_ref().filter(|tx| {
1822 matches!(
1823 tx.transaction_data().kind(),
1824 TransactionKind::ConsensusCommitPrologueV1(_)
1825 )
1826 })
1827 })
1828 .collect::<Vec<_>>();
1829
1830 assert!(ccps.len() <= 1);
1833
1834 let txs = self
1836 .state
1837 .get_transaction_cache_reader()
1838 .multi_get_transaction_blocks(
1839 &sorted
1840 .iter()
1841 .map(|tx| *tx.transaction_digest())
1842 .collect::<Vec<_>>(),
1843 );
1844
1845 if ccps.is_empty() {
1846 for tx in txs.iter().flatten() {
1850 assert!(!matches!(
1851 tx.transaction_data().kind(),
1852 TransactionKind::ConsensusCommitPrologueV1(_)
1853 ));
1854 }
1855 } else {
1856 assert!(matches!(
1859 txs[0].as_ref().unwrap().transaction_data().kind(),
1860 TransactionKind::ConsensusCommitPrologueV1(_)
1861 ));
1862
1863 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1864
1865 for tx in txs.iter().skip(1).flatten() {
1866 assert!(!matches!(
1867 tx.transaction_data().kind(),
1868 TransactionKind::ConsensusCommitPrologueV1(_)
1869 ));
1870 }
1871 }
1872 }
1873}
1874
1875impl CheckpointAggregator {
1876 fn new(
1877 tables: Arc<CheckpointStore>,
1878 epoch_store: Arc<AuthorityPerEpochStore>,
1879 notify: Arc<Notify>,
1880 output: Box<dyn CertifiedCheckpointOutput>,
1881 state: Arc<AuthorityState>,
1882 metrics: Arc<CheckpointMetrics>,
1883 ) -> Self {
1884 let current = None;
1885 Self {
1886 store: tables,
1887 epoch_store,
1888 notify,
1889 current,
1890 output,
1891 state,
1892 metrics,
1893 }
1894 }
1895
1896 async fn run(mut self) {
1902 info!("Starting CheckpointAggregator");
1903 loop {
1904 if let Err(e) = self.run_and_notify().await {
1905 error!(
1906 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1907 e
1908 );
1909 self.metrics.checkpoint_errors.inc();
1910 tokio::time::sleep(Duration::from_secs(1)).await;
1911 continue;
1912 }
1913
1914 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1915 }
1916 }
1917
1918 async fn run_and_notify(&mut self) -> IotaResult {
1919 let summaries = self.run_inner()?;
1920 for summary in summaries {
1921 self.output.certified_checkpoint_created(&summary).await?;
1922 }
1923 Ok(())
1924 }
1925
1926 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1927 let _scope = monitored_scope("CheckpointAggregator");
1928 let mut result = vec![];
1929 'outer: loop {
1930 let next_to_certify = self.next_checkpoint_to_certify()?;
1931 let current = if let Some(current) = &mut self.current {
1932 if current.summary.sequence_number < next_to_certify {
1938 self.current = None;
1939 continue;
1940 }
1941 current
1942 } else {
1943 let Some(summary) = self
1944 .epoch_store
1945 .get_built_checkpoint_summary(next_to_certify)?
1946 else {
1947 return Ok(result);
1948 };
1949 self.current = Some(CheckpointSignatureAggregator {
1950 next_index: 0,
1951 digest: summary.digest(),
1952 summary,
1953 signatures_by_digest: MultiStakeAggregator::new(
1954 self.epoch_store.committee().clone(),
1955 ),
1956 store: self.store.clone(),
1957 state: self.state.clone(),
1958 metrics: self.metrics.clone(),
1959 });
1960 self.current.as_mut().unwrap()
1961 };
1962
1963 let epoch_tables = self
1964 .epoch_store
1965 .tables()
1966 .expect("should not run past end of epoch");
1967 let iter = epoch_tables
1968 .pending_checkpoint_signatures
1969 .safe_iter_with_bounds(
1970 Some((current.summary.sequence_number, current.next_index)),
1971 None,
1972 );
1973 for item in iter {
1974 let ((seq, index), data) = item?;
1975 if seq != current.summary.sequence_number {
1976 trace!(
1977 checkpoint_seq =? current.summary.sequence_number,
1978 "Not enough checkpoint signatures",
1979 );
1980 return Ok(result);
1982 }
1983 trace!(
1984 checkpoint_seq = current.summary.sequence_number,
1985 "Processing signature for checkpoint (digest: {:?}) from {:?}",
1986 current.summary.digest(),
1987 data.summary.auth_sig().authority.concise()
1988 );
1989 self.metrics
1990 .checkpoint_participation
1991 .with_label_values(&[&format!(
1992 "{:?}",
1993 data.summary.auth_sig().authority.concise()
1994 )])
1995 .inc();
1996 if let Ok(auth_signature) = current.try_aggregate(data) {
1997 debug!(
1998 checkpoint_seq = current.summary.sequence_number,
1999 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2000 current.summary.digest(),
2001 );
2002 let summary = VerifiedCheckpoint::new_unchecked(
2003 CertifiedCheckpointSummary::new_from_data_and_sig(
2004 current.summary.clone(),
2005 auth_signature,
2006 ),
2007 );
2008
2009 self.store.insert_certified_checkpoint(&summary)?;
2010 self.metrics
2011 .last_certified_checkpoint
2012 .set(current.summary.sequence_number as i64);
2013 current
2014 .summary
2015 .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2016 result.push(summary.into_inner());
2017 self.current = None;
2018 continue 'outer;
2019 } else {
2020 current.next_index = index + 1;
2021 }
2022 }
2023 break;
2024 }
2025 Ok(result)
2026 }
2027
2028 fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2029 Ok(self
2030 .store
2031 .tables
2032 .certified_checkpoints
2033 .reversed_safe_iter_with_bounds(None, None)?
2034 .next()
2035 .transpose()?
2036 .map(|(seq, _)| seq + 1)
2037 .unwrap_or_default())
2038 }
2039}
2040
2041impl CheckpointSignatureAggregator {
2042 #[expect(clippy::result_unit_err)]
2043 pub fn try_aggregate(
2044 &mut self,
2045 data: CheckpointSignatureMessage,
2046 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2047 let their_digest = *data.summary.digest();
2048 let (_, signature) = data.summary.into_data_and_sig();
2049 let author = signature.authority;
2050 let envelope =
2051 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2052 match self.signatures_by_digest.insert(their_digest, envelope) {
2053 InsertResult::Failed {
2055 error:
2056 IotaError::StakeAggregatorRepeatedSigner {
2057 conflicting_sig: false,
2058 ..
2059 },
2060 } => Err(()),
2061 InsertResult::Failed { error } => {
2062 warn!(
2063 checkpoint_seq = self.summary.sequence_number,
2064 "Failed to aggregate new signature from validator {:?}: {:?}",
2065 author.concise(),
2066 error
2067 );
2068 self.check_for_split_brain();
2069 Err(())
2070 }
2071 InsertResult::QuorumReached(cert) => {
2072 if their_digest != self.digest {
2076 self.metrics.remote_checkpoint_forks.inc();
2077 warn!(
2078 checkpoint_seq = self.summary.sequence_number,
2079 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2080 author.concise(),
2081 their_digest,
2082 self.digest
2083 );
2084 return Err(());
2085 }
2086 Ok(cert)
2087 }
2088 InsertResult::NotEnoughVotes {
2089 bad_votes: _,
2090 bad_authorities: _,
2091 } => {
2092 self.check_for_split_brain();
2093 Err(())
2094 }
2095 }
2096 }
2097
2098 fn check_for_split_brain(&self) {
2103 debug!(
2104 checkpoint_seq = self.summary.sequence_number,
2105 "Checking for split brain condition"
2106 );
2107 if self.signatures_by_digest.quorum_unreachable() {
2108 let digests_by_stake_messages = self
2114 .signatures_by_digest
2115 .get_all_unique_values()
2116 .into_iter()
2117 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2118 .map(|(digest, (_authorities, total_stake))| {
2119 format!("{digest:?} (total stake: {total_stake})")
2120 })
2121 .collect::<Vec<String>>();
2122 error!(
2123 checkpoint_seq = self.summary.sequence_number,
2124 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2125 self.signatures_by_digest.uncommitted_stake(),
2126 digests_by_stake_messages,
2127 );
2128 self.metrics.split_brain_checkpoint_forks.inc();
2129
2130 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2131 let local_summary = self.summary.clone();
2132 let state = self.state.clone();
2133 let tables = self.store.clone();
2134
2135 tokio::spawn(async move {
2136 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2137 });
2138 }
2139 }
2140}
2141
2142async fn diagnose_split_brain(
2148 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2149 local_summary: CheckpointSummary,
2150 state: Arc<AuthorityState>,
2151 tables: Arc<CheckpointStore>,
2152) {
2153 debug!(
2154 checkpoint_seq = local_summary.sequence_number,
2155 "Running split brain diagnostics..."
2156 );
2157 let time = SystemTime::now();
2158 let digest_to_validator = all_unique_values
2160 .iter()
2161 .filter_map(|(digest, (validators, _))| {
2162 if *digest != local_summary.digest() {
2163 let random_validator = validators.choose(&mut OsRng).unwrap();
2164 Some((*digest, *random_validator))
2165 } else {
2166 None
2167 }
2168 })
2169 .collect::<HashMap<_, _>>();
2170 if digest_to_validator.is_empty() {
2171 panic!(
2172 "Given split brain condition, there should be at \
2173 least one validator that disagrees with local signature"
2174 );
2175 }
2176
2177 let epoch_store = state.load_epoch_store_one_call_per_task();
2178 let committee = epoch_store
2179 .epoch_start_state()
2180 .get_iota_committee_with_network_metadata();
2181 let network_config = default_iota_network_config();
2182 let network_clients =
2183 make_network_authority_clients_with_network_config(&committee, &network_config);
2184
2185 let response_futures = digest_to_validator
2187 .values()
2188 .cloned()
2189 .map(|validator| {
2190 let client = network_clients
2191 .get(&validator)
2192 .expect("Failed to get network client");
2193 let request = CheckpointRequest {
2194 sequence_number: Some(local_summary.sequence_number),
2195 request_content: true,
2196 certified: false,
2197 };
2198 client.handle_checkpoint(request)
2199 })
2200 .collect::<Vec<_>>();
2201
2202 let digest_name_pair = digest_to_validator.iter();
2203 let response_data = futures::future::join_all(response_futures)
2204 .await
2205 .into_iter()
2206 .zip(digest_name_pair)
2207 .filter_map(|(response, (digest, name))| match response {
2208 Ok(response) => match response {
2209 CheckpointResponse {
2210 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2211 contents: Some(contents),
2212 } => Some((*name, *digest, summary, contents)),
2213 CheckpointResponse {
2214 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2215 contents: _,
2216 } => {
2217 panic!("Expected pending checkpoint, but got certified checkpoint");
2218 }
2219 CheckpointResponse {
2220 checkpoint: None,
2221 contents: _,
2222 } => {
2223 error!(
2224 "Summary for checkpoint {:?} not found on validator {:?}",
2225 local_summary.sequence_number, name
2226 );
2227 None
2228 }
2229 CheckpointResponse {
2230 checkpoint: _,
2231 contents: None,
2232 } => {
2233 error!(
2234 "Contents for checkpoint {:?} not found on validator {:?}",
2235 local_summary.sequence_number, name
2236 );
2237 None
2238 }
2239 },
2240 Err(e) => {
2241 error!(
2242 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2243 e
2244 );
2245 None
2246 }
2247 })
2248 .collect::<Vec<_>>();
2249
2250 let local_checkpoint_contents = tables
2251 .get_checkpoint_contents(&local_summary.content_digest)
2252 .unwrap_or_else(|_| {
2253 panic!(
2254 "Could not find checkpoint contents for digest {:?}",
2255 local_summary.digest()
2256 )
2257 })
2258 .unwrap_or_else(|| {
2259 panic!(
2260 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2261 local_summary.sequence_number,
2262 local_summary.digest()
2263 )
2264 });
2265 let local_contents_text = format!("{local_checkpoint_contents:?}");
2266
2267 let local_summary_text = format!("{local_summary:?}");
2268 let local_validator = state.name.concise();
2269 let diff_patches = response_data
2270 .iter()
2271 .map(|(name, other_digest, other_summary, contents)| {
2272 let other_contents_text = format!("{contents:?}");
2273 let other_summary_text = format!("{other_summary:?}");
2274 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2275 .enumerate_transactions(&local_summary)
2276 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2277 .unzip();
2278 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2279 .enumerate_transactions(other_summary)
2280 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2281 .unzip();
2282 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2283 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2284 let local_transactions_text = format!("{local_transactions:#?}");
2285 let other_transactions_text = format!("{other_transactions:#?}");
2286 let transactions_patch =
2287 create_patch(&local_transactions_text, &other_transactions_text);
2288 let local_effects_text = format!("{local_effects:#?}");
2289 let other_effects_text = format!("{other_effects:#?}");
2290 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2291 let seq_number = local_summary.sequence_number;
2292 let local_digest = local_summary.digest();
2293 let other_validator = name.concise();
2294 format!(
2295 "Checkpoint: {seq_number:?}\n\
2296 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2297 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2298 Summary Diff: \n{summary_patch}\n\n\
2299 Contents Diff: \n{contents_patch}\n\n\
2300 Transactions Diff: \n{transactions_patch}\n\n\
2301 Effects Diff: \n{effects_patch}",
2302 )
2303 })
2304 .collect::<Vec<_>>()
2305 .join("\n\n\n");
2306
2307 let header = format!(
2308 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2309 Datetime: {time:?}"
2310 );
2311 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2312 let path = tempfile::tempdir()
2313 .expect("Failed to create tempdir")
2314 .keep()
2315 .join(Path::new("checkpoint_fork_dump.txt"));
2316 let mut file = File::create(path).unwrap();
2317 write!(file, "{fork_logs_text}").unwrap();
2318 debug!("{}", fork_logs_text);
2319
2320 fail_point!("split_brain_reached");
2321}
2322
2323pub trait CheckpointServiceNotify {
2324 fn notify_checkpoint_signature(
2325 &self,
2326 epoch_store: &AuthorityPerEpochStore,
2327 info: &CheckpointSignatureMessage,
2328 ) -> IotaResult;
2329
2330 fn notify_checkpoint(&self) -> IotaResult;
2331}
2332
2333enum CheckpointServiceState {
2334 Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2335 Started,
2336}
2337
2338impl CheckpointServiceState {
2339 fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2340 let mut state = CheckpointServiceState::Started;
2341 std::mem::swap(self, &mut state);
2342
2343 match state {
2344 CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2345 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2346 }
2347 }
2348}
2349
2350pub struct CheckpointService {
2351 tables: Arc<CheckpointStore>,
2352 notify_builder: Arc<Notify>,
2353 notify_aggregator: Arc<Notify>,
2354 last_signature_index: Mutex<u64>,
2355 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2357 highest_previously_built_seq: CheckpointSequenceNumber,
2360 metrics: Arc<CheckpointMetrics>,
2361 state: Mutex<CheckpointServiceState>,
2362}
2363
2364impl CheckpointService {
2365 pub fn build(
2369 state: Arc<AuthorityState>,
2370 checkpoint_store: Arc<CheckpointStore>,
2371 epoch_store: Arc<AuthorityPerEpochStore>,
2372 effects_store: Arc<dyn TransactionCacheRead>,
2373 accumulator: Weak<StateAccumulator>,
2374 checkpoint_output: Box<dyn CheckpointOutput>,
2375 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2376 metrics: Arc<CheckpointMetrics>,
2377 max_transactions_per_checkpoint: usize,
2378 max_checkpoint_size_bytes: usize,
2379 ) -> Arc<Self> {
2380 info!(
2381 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2382 );
2383 let notify_builder = Arc::new(Notify::new());
2384 let notify_aggregator = Arc::new(Notify::new());
2385
2386 let highest_previously_built_seq = checkpoint_store
2388 .get_latest_locally_computed_checkpoint()
2389 .expect("failed to get latest locally computed checkpoint")
2390 .map(|s| s.sequence_number)
2391 .unwrap_or(0);
2392
2393 let highest_currently_built_seq =
2394 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2395 .expect("epoch should not have ended")
2396 .map(|(seq, _)| seq)
2397 .unwrap_or(0);
2398
2399 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2400
2401 let aggregator = CheckpointAggregator::new(
2402 checkpoint_store.clone(),
2403 epoch_store.clone(),
2404 notify_aggregator.clone(),
2405 certified_checkpoint_output,
2406 state.clone(),
2407 metrics.clone(),
2408 );
2409
2410 let builder = CheckpointBuilder::new(
2411 state.clone(),
2412 checkpoint_store.clone(),
2413 epoch_store.clone(),
2414 notify_builder.clone(),
2415 effects_store,
2416 accumulator,
2417 checkpoint_output,
2418 notify_aggregator.clone(),
2419 highest_currently_built_seq_tx.clone(),
2420 metrics.clone(),
2421 max_transactions_per_checkpoint,
2422 max_checkpoint_size_bytes,
2423 );
2424
2425 let last_signature_index = epoch_store
2426 .get_last_checkpoint_signature_index()
2427 .expect("should not cross end of epoch");
2428 let last_signature_index = Mutex::new(last_signature_index);
2429
2430 Arc::new(Self {
2431 tables: checkpoint_store,
2432 notify_builder,
2433 notify_aggregator,
2434 last_signature_index,
2435 highest_currently_built_seq_tx,
2436 highest_previously_built_seq,
2437 metrics,
2438 state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2439 builder, aggregator,
2440 )))),
2441 })
2442 }
2443
2444 pub async fn spawn(&self) -> JoinSet<()> {
2453 let mut tasks = JoinSet::new();
2454
2455 let (builder, aggregator) = self.state.lock().take_unstarted();
2456 tasks.spawn(monitored_future!(builder.run()));
2457 tasks.spawn(monitored_future!(aggregator.run()));
2458
2459 if tokio::time::timeout(
2465 Duration::from_secs(120),
2466 self.wait_for_rebuilt_checkpoints(),
2467 )
2468 .await
2469 .is_err()
2470 {
2471 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2472 }
2473
2474 tasks
2475 }
2476}
2477
2478impl CheckpointService {
2479 pub async fn wait_for_rebuilt_checkpoints(&self) {
2486 let highest_previously_built_seq = self.highest_previously_built_seq;
2487 let mut rx = self.highest_currently_built_seq_tx.subscribe();
2488 let mut highest_currently_built_seq = *rx.borrow_and_update();
2489 info!(
2490 "Waiting for checkpoints to be rebuilt, previously built seq: \
2491 {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2492 );
2493 loop {
2494 if highest_currently_built_seq >= highest_previously_built_seq {
2495 info!("Checkpoint rebuild complete");
2496 break;
2497 }
2498 rx.changed().await.unwrap();
2499 highest_currently_built_seq = *rx.borrow_and_update();
2500 }
2501 }
2502
2503 #[cfg(test)]
2504 fn write_and_notify_checkpoint_for_testing(
2505 &self,
2506 epoch_store: &AuthorityPerEpochStore,
2507 checkpoint: PendingCheckpoint,
2508 ) -> IotaResult {
2509 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2510
2511 let mut output = ConsensusCommitOutput::new(0);
2512 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2513 output.set_default_commit_stats_for_testing();
2514 epoch_store.push_consensus_output_for_tests(output);
2515 self.notify_checkpoint()?;
2516 Ok(())
2517 }
2518}
2519
2520impl CheckpointServiceNotify for CheckpointService {
2521 fn notify_checkpoint_signature(
2522 &self,
2523 epoch_store: &AuthorityPerEpochStore,
2524 info: &CheckpointSignatureMessage,
2525 ) -> IotaResult {
2526 let sequence = info.summary.sequence_number;
2527 let signer = info.summary.auth_sig().authority.concise();
2528
2529 if let Some(highest_verified_checkpoint) = self
2530 .tables
2531 .get_highest_verified_checkpoint()?
2532 .map(|x| *x.sequence_number())
2533 {
2534 if sequence <= highest_verified_checkpoint {
2535 trace!(
2536 checkpoint_seq = sequence,
2537 "Ignore checkpoint signature from {} - already certified", signer,
2538 );
2539 self.metrics
2540 .last_ignored_checkpoint_signature_received
2541 .set(sequence as i64);
2542 return Ok(());
2543 }
2544 }
2545 trace!(
2546 checkpoint_seq = sequence,
2547 "Received checkpoint signature, digest {} from {}",
2548 info.summary.digest(),
2549 signer,
2550 );
2551 self.metrics
2552 .last_received_checkpoint_signatures
2553 .with_label_values(&[&signer.to_string()])
2554 .set(sequence as i64);
2555 let mut index = self.last_signature_index.lock();
2559 *index += 1;
2560 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2561 self.notify_aggregator.notify_one();
2562 Ok(())
2563 }
2564
2565 fn notify_checkpoint(&self) -> IotaResult {
2566 self.notify_builder.notify_one();
2567 Ok(())
2568 }
2569}
2570
2571pub struct CheckpointServiceNoop {}
2573impl CheckpointServiceNotify for CheckpointServiceNoop {
2574 fn notify_checkpoint_signature(
2575 &self,
2576 _: &AuthorityPerEpochStore,
2577 _: &CheckpointSignatureMessage,
2578 ) -> IotaResult {
2579 Ok(())
2580 }
2581
2582 fn notify_checkpoint(&self) -> IotaResult {
2583 Ok(())
2584 }
2585}
2586
2587#[cfg(test)]
2588mod tests {
2589 use std::{
2590 collections::{BTreeMap, HashMap},
2591 ops::Deref,
2592 };
2593
2594 use futures::{FutureExt as _, future::BoxFuture};
2595 use iota_macros::sim_test;
2596 use iota_protocol_config::{Chain, ProtocolConfig};
2597 use iota_types::{
2598 base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2599 crypto::Signature,
2600 digests::TransactionEventsDigest,
2601 effects::{TransactionEffects, TransactionEvents},
2602 messages_checkpoint::SignedCheckpointSummary,
2603 move_package::MovePackage,
2604 object,
2605 transaction::{GenesisObject, VerifiedTransaction},
2606 };
2607 use tokio::sync::mpsc;
2608
2609 use super::*;
2610 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2611
2612 #[sim_test]
2613 pub async fn checkpoint_builder_test() {
2614 telemetry_subscribers::init_for_testing();
2615
2616 let mut protocol_config =
2617 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2618 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2619 let state = TestAuthorityBuilder::new()
2620 .with_protocol_config(protocol_config)
2621 .build()
2622 .await;
2623
2624 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2625 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2626 vec![GenesisObject::RawObject {
2627 data: object::Data::Package(
2628 MovePackage::new(
2629 ObjectID::random(),
2630 SequenceNumber::new(),
2631 BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2632 100_000,
2633 Vec::new(),
2636 BTreeMap::new(),
2639 )
2640 .unwrap(),
2641 ),
2642 owner: object::Owner::Immutable,
2643 }],
2644 vec![],
2645 );
2646 for i in 0..15 {
2647 state
2648 .database_for_testing()
2649 .perpetual_tables
2650 .transactions
2651 .insert(&d(i), dummy_tx.serializable_ref())
2652 .unwrap();
2653 }
2654 for i in 15..20 {
2655 state
2656 .database_for_testing()
2657 .perpetual_tables
2658 .transactions
2659 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2660 .unwrap();
2661 }
2662
2663 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2664 commit_cert_for_test(
2665 &mut store,
2666 state.clone(),
2667 d(1),
2668 vec![d(2), d(3)],
2669 GasCostSummary::new(11, 11, 12, 11, 1),
2670 );
2671 commit_cert_for_test(
2672 &mut store,
2673 state.clone(),
2674 d(2),
2675 vec![d(3), d(4)],
2676 GasCostSummary::new(21, 21, 22, 21, 1),
2677 );
2678 commit_cert_for_test(
2679 &mut store,
2680 state.clone(),
2681 d(3),
2682 vec![],
2683 GasCostSummary::new(31, 31, 32, 31, 1),
2684 );
2685 commit_cert_for_test(
2686 &mut store,
2687 state.clone(),
2688 d(4),
2689 vec![],
2690 GasCostSummary::new(41, 41, 42, 41, 1),
2691 );
2692 for i in [5, 6, 7, 10, 11, 12, 13] {
2693 commit_cert_for_test(
2694 &mut store,
2695 state.clone(),
2696 d(i),
2697 vec![],
2698 GasCostSummary::new(41, 41, 42, 41, 1),
2699 );
2700 }
2701 for i in [15, 16, 17] {
2702 commit_cert_for_test(
2703 &mut store,
2704 state.clone(),
2705 d(i),
2706 vec![],
2707 GasCostSummary::new(51, 51, 52, 51, 1),
2708 );
2709 }
2710 let all_digests: Vec<_> = store.keys().copied().collect();
2711 for digest in all_digests {
2712 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2713 state
2714 .epoch_store_for_testing()
2715 .test_insert_user_signature(digest, vec![signature]);
2716 }
2717
2718 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2719 let (certified_output, mut certified_result) =
2720 mpsc::channel::<CertifiedCheckpointSummary>(10);
2721 let store = Arc::new(store);
2722
2723 let ckpt_dir = tempfile::tempdir().unwrap();
2724 let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2725 let epoch_store = state.epoch_store_for_testing();
2726
2727 let accumulator = Arc::new(StateAccumulator::new_for_tests(
2728 state.get_accumulator_store().clone(),
2729 ));
2730
2731 let checkpoint_service = CheckpointService::build(
2732 state.clone(),
2733 checkpoint_store,
2734 epoch_store.clone(),
2735 store,
2736 Arc::downgrade(&accumulator),
2737 Box::new(output),
2738 Box::new(certified_output),
2739 CheckpointMetrics::new_for_tests(),
2740 3,
2741 100_000,
2742 );
2743 let _tasks = checkpoint_service.spawn().await;
2744
2745 checkpoint_service
2746 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2747 .unwrap();
2748 checkpoint_service
2749 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2750 .unwrap();
2751 checkpoint_service
2752 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2753 .unwrap();
2754 checkpoint_service
2755 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2756 .unwrap();
2757 checkpoint_service
2758 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2759 .unwrap();
2760 checkpoint_service
2761 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2762 .unwrap();
2763
2764 let (c1c, c1s) = result.recv().await.unwrap();
2765 let (c2c, c2s) = result.recv().await.unwrap();
2766
2767 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2768 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2769 assert_eq!(c1t, vec![d(4)]);
2770 assert_eq!(c1s.previous_digest, None);
2771 assert_eq!(c1s.sequence_number, 0);
2772 assert_eq!(
2773 c1s.epoch_rolling_gas_cost_summary,
2774 GasCostSummary::new(41, 41, 42, 41, 1)
2775 );
2776
2777 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2778 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2779 assert_eq!(c2s.sequence_number, 1);
2780 assert_eq!(
2781 c2s.epoch_rolling_gas_cost_summary,
2782 GasCostSummary::new(104, 104, 108, 104, 4)
2783 );
2784
2785 let (c3c, c3s) = result.recv().await.unwrap();
2788 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2789 let (c4c, c4s) = result.recv().await.unwrap();
2790 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2791 assert_eq!(c3s.sequence_number, 2);
2792 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2793 assert_eq!(c4s.sequence_number, 3);
2794 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2795 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2796 assert_eq!(c4t, vec![d(13)]);
2797
2798 let (c5c, c5s) = result.recv().await.unwrap();
2801 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2802 let (c6c, c6s) = result.recv().await.unwrap();
2803 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2804 assert_eq!(c5s.sequence_number, 4);
2805 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2806 assert_eq!(c6s.sequence_number, 5);
2807 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2808 assert_eq!(c5t, vec![d(15), d(16)]);
2809 assert_eq!(c6t, vec![d(17)]);
2810
2811 let (c7c, c7s) = result.recv().await.unwrap();
2814 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2815 assert_eq!(c7t, vec![d(5), d(6)]);
2816 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2817 assert_eq!(c7s.sequence_number, 6);
2818
2819 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2820 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2821
2822 checkpoint_service
2823 .notify_checkpoint_signature(
2824 &epoch_store,
2825 &CheckpointSignatureMessage { summary: c2ss },
2826 )
2827 .unwrap();
2828 checkpoint_service
2829 .notify_checkpoint_signature(
2830 &epoch_store,
2831 &CheckpointSignatureMessage { summary: c1ss },
2832 )
2833 .unwrap();
2834
2835 let c1sc = certified_result.recv().await.unwrap();
2836 let c2sc = certified_result.recv().await.unwrap();
2837 assert_eq!(c1sc.sequence_number, 0);
2838 assert_eq!(c2sc.sequence_number, 1);
2839 }
2840
2841 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2842 fn try_notify_read_executed_effects(
2843 &self,
2844 digests: &[TransactionDigest],
2845 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2846 std::future::ready(Ok(digests
2847 .iter()
2848 .map(|d| self.get(d).expect("effects not found").clone())
2849 .collect()))
2850 .boxed()
2851 }
2852
2853 fn try_notify_read_executed_effects_digests(
2854 &self,
2855 digests: &[TransactionDigest],
2856 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2857 std::future::ready(Ok(digests
2858 .iter()
2859 .map(|d| {
2860 self.get(d)
2861 .map(|fx| fx.digest())
2862 .expect("effects not found")
2863 })
2864 .collect()))
2865 .boxed()
2866 }
2867
2868 fn try_multi_get_executed_effects(
2869 &self,
2870 digests: &[TransactionDigest],
2871 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2872 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2873 }
2874
2875 fn try_multi_get_transaction_blocks(
2882 &self,
2883 _: &[TransactionDigest],
2884 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2885 unimplemented!()
2886 }
2887
2888 fn try_multi_get_executed_effects_digests(
2889 &self,
2890 _: &[TransactionDigest],
2891 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2892 unimplemented!()
2893 }
2894
2895 fn try_multi_get_effects(
2896 &self,
2897 _: &[TransactionEffectsDigest],
2898 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2899 unimplemented!()
2900 }
2901
2902 fn try_multi_get_events(
2903 &self,
2904 _: &[TransactionEventsDigest],
2905 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2906 unimplemented!()
2907 }
2908 }
2909
2910 #[async_trait::async_trait]
2911 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2912 async fn checkpoint_created(
2913 &self,
2914 summary: &CheckpointSummary,
2915 contents: &CheckpointContents,
2916 _epoch_store: &Arc<AuthorityPerEpochStore>,
2917 _checkpoint_store: &Arc<CheckpointStore>,
2918 ) -> IotaResult {
2919 self.try_send((contents.clone(), summary.clone())).unwrap();
2920 Ok(())
2921 }
2922 }
2923
2924 #[async_trait::async_trait]
2925 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2926 async fn certified_checkpoint_created(
2927 &self,
2928 summary: &CertifiedCheckpointSummary,
2929 ) -> IotaResult {
2930 self.try_send(summary.clone()).unwrap();
2931 Ok(())
2932 }
2933 }
2934
2935 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2936 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2937 roots: t
2938 .into_iter()
2939 .map(|t| TransactionKey::Digest(d(t)))
2940 .collect(),
2941 details: PendingCheckpointInfo {
2942 timestamp_ms,
2943 last_of_epoch: false,
2944 checkpoint_height: i,
2945 },
2946 })
2947 }
2948
2949 fn d(i: u8) -> TransactionDigest {
2950 let mut bytes: [u8; 32] = Default::default();
2951 bytes[0] = i;
2952 TransactionDigest::new(bytes)
2953 }
2954
2955 fn e(
2956 transaction_digest: TransactionDigest,
2957 dependencies: Vec<TransactionDigest>,
2958 gas_used: GasCostSummary,
2959 ) -> TransactionEffects {
2960 let mut effects = TransactionEffects::default();
2961 *effects.transaction_digest_mut_for_testing() = transaction_digest;
2962 *effects.dependencies_mut_for_testing() = dependencies;
2963 *effects.gas_cost_summary_mut_for_testing() = gas_used;
2964 effects
2965 }
2966
2967 fn commit_cert_for_test(
2968 store: &mut HashMap<TransactionDigest, TransactionEffects>,
2969 state: Arc<AuthorityState>,
2970 digest: TransactionDigest,
2971 dependencies: Vec<TransactionDigest>,
2972 gas_used: GasCostSummary,
2973 ) {
2974 let epoch_store = state.epoch_store_for_testing();
2975 let effects = e(digest, dependencies, gas_used);
2976 store.insert(digest, effects.clone());
2977 epoch_store
2978 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2979 .expect("Inserting cert fx and sigs should not fail");
2980 }
2981}