1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, random::get_rng, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_sdk_types::GasCostSummary;
26use iota_types::{
27 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
28 committee::StakeUnit,
29 crypto::AuthorityStrongQuorumSignInfo,
30 digests::{CheckpointContentsDigest, CheckpointDigest},
31 effects::{TransactionEffects, TransactionEffectsAPI, TransactionEffectsExt},
32 error::{IotaError, IotaResult},
33 event::SystemEpochInfoEvent,
34 iota_system_state::{
35 IotaSystemState, IotaSystemStateTrait,
36 epoch_start_iota_system_state::EpochStartSystemStateTrait,
37 },
38 message_envelope::Message,
39 messages_checkpoint::{
40 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
41 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
42 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
43 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
44 VerifiedCheckpointContents,
45 },
46 messages_consensus::ConsensusTransactionKey,
47 signature::GenericSignature,
48 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
49};
50use itertools::Itertools;
51use nonempty::NonEmpty;
52use parking_lot::Mutex;
53use rand::seq::SliceRandom;
54use serde::{Deserialize, Serialize};
55use tokio::{
56 sync::{Notify, mpsc, watch},
57 task::JoinSet,
58 time::timeout,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use typed_store::{
62 DBMapUtils, Map, TypedStoreError,
63 rocks::{DBMap, MetricConf},
64};
65
66pub use crate::checkpoints::{
67 checkpoint_output::{
68 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
69 },
70 metrics::CheckpointMetrics,
71};
72use crate::{
73 authority::{
74 AuthorityState,
75 authority_per_epoch_store::{AuthorityPerEpochStore, scorer::MAX_SCORE},
76 },
77 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
78 checkpoints::{
79 causal_order::CausalOrder,
80 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
81 },
82 consensus_handler::SequencedConsensusTransactionKey,
83 execution_cache::TransactionCacheRead,
84 global_state_hasher::GlobalStateHasher,
85 stake_aggregator::{InsertResult, MultiStakeAggregator},
86};
87
88pub type CheckpointHeight = u64;
89
90pub struct EpochStats {
91 pub checkpoint_count: u64,
92 pub transaction_count: u64,
93 pub total_gas_reward: u64,
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub struct PendingCheckpointInfo {
98 pub timestamp_ms: CheckpointTimestamp,
99 pub last_of_epoch: bool,
100 pub checkpoint_height: CheckpointHeight,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum PendingCheckpoint {
105 V1(PendingCheckpointContentsV1),
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct PendingCheckpointContentsV1 {
111 pub roots: Vec<TransactionKey>,
112 pub details: PendingCheckpointInfo,
113}
114
115impl PendingCheckpoint {
116 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
117 match self {
118 PendingCheckpoint::V1(contents) => contents,
119 }
120 }
121
122 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
123 match self {
124 PendingCheckpoint::V1(contents) => contents,
125 }
126 }
127
128 pub fn roots(&self) -> &Vec<TransactionKey> {
129 &self.as_v1().roots
130 }
131
132 pub fn details(&self) -> &PendingCheckpointInfo {
133 &self.as_v1().details
134 }
135
136 pub fn height(&self) -> CheckpointHeight {
137 self.details().checkpoint_height
138 }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
142pub struct BuilderCheckpointSummary {
143 pub summary: CheckpointSummary,
144 pub checkpoint_height: Option<CheckpointHeight>,
146 pub position_in_commit: usize,
147}
148
149#[derive(DBMapUtils)]
150pub struct CheckpointStoreTables {
151 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
153
154 pub(crate) checkpoint_sequence_by_contents_digest:
158 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
159
160 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
165
166 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
168 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
170
171 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
175
176 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
179
180 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
183}
184
185impl CheckpointStoreTables {
186 pub fn new(path: &Path, metric_name: &'static str) -> Self {
187 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
188 }
189 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
190 Self::get_read_only_handle(
191 path.to_path_buf(),
192 None,
193 None,
194 MetricConf::new("checkpoint_readonly"),
195 )
196 }
197}
198
199pub struct CheckpointStore {
200 pub(crate) tables: CheckpointStoreTables,
201 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
202 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
203}
204
205impl CheckpointStore {
206 pub fn new(path: &Path) -> Arc<Self> {
207 let tables = CheckpointStoreTables::new(path, "checkpoint");
208 Arc::new(Self {
209 tables,
210 synced_checkpoint_notify_read: NotifyRead::new(),
211 executed_checkpoint_notify_read: NotifyRead::new(),
212 })
213 }
214
215 pub fn new_for_tests() -> Arc<Self> {
216 let storage_dir = iota_common::tempdir().keep();
217 CheckpointStore::new(storage_dir.as_path())
218 }
219
220 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
221 let tables = CheckpointStoreTables::new(path, "db_checkpoint");
222 Arc::new(Self {
223 tables,
224 synced_checkpoint_notify_read: NotifyRead::new(),
225 executed_checkpoint_notify_read: NotifyRead::new(),
226 })
227 }
228
229 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
230 CheckpointStoreTables::open_readonly(path)
231 }
232
233 #[instrument(level = "info", skip_all)]
234 pub fn insert_genesis_checkpoint(
235 &self,
236 checkpoint: VerifiedCheckpoint,
237 contents: CheckpointContents,
238 epoch_store: &AuthorityPerEpochStore,
239 ) {
240 assert_eq!(
241 checkpoint.epoch(),
242 0,
243 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
244 );
245 assert_eq!(
246 *checkpoint.sequence_number(),
247 0,
248 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
249 );
250
251 if self
254 .get_checkpoint_by_digest(checkpoint.digest())
255 .unwrap()
256 .is_none()
257 {
258 if epoch_store.epoch() == checkpoint.epoch {
259 epoch_store
260 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
261 .unwrap();
262 } else {
263 debug!(
264 validator_epoch =% epoch_store.epoch(),
265 genesis_epoch =% checkpoint.epoch(),
266 "Not inserting checkpoint builder data for genesis checkpoint",
267 );
268 }
269 self.insert_checkpoint_contents(contents).unwrap();
270 self.insert_verified_checkpoint(&checkpoint).unwrap();
271 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
272 }
273 }
274
275 pub fn get_checkpoint_by_digest(
276 &self,
277 digest: &CheckpointDigest,
278 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
279 self.tables
280 .checkpoint_by_digest
281 .get(digest)
282 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
283 }
284
285 pub fn get_checkpoint_by_sequence_number(
286 &self,
287 sequence_number: CheckpointSequenceNumber,
288 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
289 self.tables
290 .certified_checkpoints
291 .get(&sequence_number)
292 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
293 }
294
295 pub fn get_locally_computed_checkpoint(
296 &self,
297 sequence_number: CheckpointSequenceNumber,
298 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
299 self.tables
300 .locally_computed_checkpoints
301 .get(&sequence_number)
302 }
303
304 pub fn get_sequence_number_by_contents_digest(
309 &self,
310 digest: &CheckpointContentsDigest,
311 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
312 self.tables
313 .checkpoint_sequence_by_contents_digest
314 .get(digest)
315 }
316
317 pub fn delete_contents_digest_sequence_number_mapping(
318 &self,
319 digest: &CheckpointContentsDigest,
320 ) -> Result<(), TypedStoreError> {
321 self.tables
322 .checkpoint_sequence_by_contents_digest
323 .remove(digest)
324 }
325
326 pub fn get_latest_certified_checkpoint(
327 &self,
328 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
329 Ok(self
330 .tables
331 .certified_checkpoints
332 .reversed_safe_iter_with_bounds(None, None)?
333 .next()
334 .transpose()?
335 .map(|(_, v)| v.into()))
336 }
337
338 pub fn get_latest_locally_computed_checkpoint(
339 &self,
340 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
341 Ok(self
342 .tables
343 .locally_computed_checkpoints
344 .reversed_safe_iter_with_bounds(None, None)?
345 .next()
346 .transpose()?
347 .map(|(_, v)| v))
348 }
349
350 pub fn multi_get_checkpoint_by_sequence_number(
351 &self,
352 sequence_numbers: &[CheckpointSequenceNumber],
353 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
354 let checkpoints = self
355 .tables
356 .certified_checkpoints
357 .multi_get(sequence_numbers)?
358 .into_iter()
359 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
360 .collect();
361
362 Ok(checkpoints)
363 }
364
365 pub fn multi_get_checkpoint_content(
366 &self,
367 contents_digest: &[CheckpointContentsDigest],
368 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
369 self.tables.checkpoint_content.multi_get(contents_digest)
370 }
371
372 pub fn get_highest_verified_checkpoint(
373 &self,
374 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
375 let highest_verified = if let Some(highest_verified) = self
376 .tables
377 .watermarks
378 .get(&CheckpointWatermark::HighestVerified)?
379 {
380 highest_verified
381 } else {
382 return Ok(None);
383 };
384 self.get_checkpoint_by_digest(&highest_verified.1)
385 }
386
387 pub fn get_highest_synced_checkpoint(
388 &self,
389 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
390 let highest_synced = if let Some(highest_synced) = self
391 .tables
392 .watermarks
393 .get(&CheckpointWatermark::HighestSynced)?
394 {
395 highest_synced
396 } else {
397 return Ok(None);
398 };
399 self.get_checkpoint_by_digest(&highest_synced.1)
400 }
401
402 pub fn get_highest_synced_checkpoint_seq_number(
403 &self,
404 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
405 if let Some(highest_synced) = self
406 .tables
407 .watermarks
408 .get(&CheckpointWatermark::HighestSynced)?
409 {
410 Ok(Some(highest_synced.0))
411 } else {
412 Ok(None)
413 }
414 }
415
416 pub fn get_highest_executed_checkpoint_seq_number(
417 &self,
418 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
419 if let Some(highest_executed) = self
420 .tables
421 .watermarks
422 .get(&CheckpointWatermark::HighestExecuted)?
423 {
424 Ok(Some(highest_executed.0))
425 } else {
426 Ok(None)
427 }
428 }
429
430 pub fn get_highest_executed_checkpoint(
431 &self,
432 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
433 let highest_executed = if let Some(highest_executed) = self
434 .tables
435 .watermarks
436 .get(&CheckpointWatermark::HighestExecuted)?
437 {
438 highest_executed
439 } else {
440 return Ok(None);
441 };
442 self.get_checkpoint_by_digest(&highest_executed.1)
443 }
444
445 pub fn get_highest_pruned_checkpoint_seq_number(
446 &self,
447 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
448 self.tables
449 .watermarks
450 .get(&CheckpointWatermark::HighestPruned)
451 .map(|watermark| watermark.map(|w| w.0))
452 }
453
454 pub fn get_checkpoint_contents(
455 &self,
456 digest: &CheckpointContentsDigest,
457 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
458 self.tables.checkpoint_content.get(digest)
459 }
460
461 pub fn get_full_checkpoint_contents_by_sequence_number(
462 &self,
463 seq: CheckpointSequenceNumber,
464 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
465 self.tables.full_checkpoint_content.get(&seq)
466 }
467
468 fn prune_local_summaries(&self) -> IotaResult {
469 if let Some((last_local_summary, _)) = self
470 .tables
471 .locally_computed_checkpoints
472 .reversed_safe_iter_with_bounds(None, None)?
473 .next()
474 .transpose()?
475 {
476 let mut batch = self.tables.locally_computed_checkpoints.batch();
477 batch.schedule_delete_range(
478 &self.tables.locally_computed_checkpoints,
479 &0,
480 &last_local_summary,
481 )?;
482 batch.write()?;
483 info!("Pruned local summaries up to {:?}", last_local_summary);
484 }
485 Ok(())
486 }
487
488 #[instrument(level = "trace", skip_all)]
489 fn check_for_checkpoint_fork(
490 &self,
491 local_checkpoint: &CheckpointSummary,
492 verified_checkpoint: &VerifiedCheckpoint,
493 ) {
494 if local_checkpoint != verified_checkpoint.data() {
495 let verified_contents = self
496 .get_checkpoint_contents(&verified_checkpoint.content_digest)
497 .map(|opt_contents| {
498 opt_contents
499 .map(|contents| format!("{contents:?}"))
500 .unwrap_or_else(|| {
501 format!(
502 "Verified checkpoint contents not found, digest: {:?}",
503 verified_checkpoint.content_digest,
504 )
505 })
506 })
507 .map_err(|e| {
508 format!(
509 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
510 verified_checkpoint.content_digest, e
511 )
512 })
513 .unwrap_or_else(|err_msg| err_msg);
514
515 let local_contents = self
516 .get_checkpoint_contents(&local_checkpoint.content_digest)
517 .map(|opt_contents| {
518 opt_contents
519 .map(|contents| format!("{contents:?}"))
520 .unwrap_or_else(|| {
521 format!(
522 "Local checkpoint contents not found, digest: {:?}",
523 local_checkpoint.content_digest
524 )
525 })
526 })
527 .map_err(|e| {
528 format!(
529 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
530 local_checkpoint.content_digest, e
531 )
532 })
533 .unwrap_or_else(|err_msg| err_msg);
534
535 error!(
537 verified_checkpoint = ?verified_checkpoint.data(),
538 ?verified_contents,
539 ?local_checkpoint,
540 ?local_contents,
541 "Local checkpoint fork detected!",
542 );
543 fatal!(
544 "Local checkpoint fork detected for sequence number: {}",
545 local_checkpoint.sequence_number()
546 );
547 }
548 }
549
550 pub fn insert_certified_checkpoint(
556 &self,
557 checkpoint: &VerifiedCheckpoint,
558 ) -> Result<(), TypedStoreError> {
559 debug!(
560 checkpoint_seq = checkpoint.sequence_number(),
561 "Inserting certified checkpoint",
562 );
563 let mut batch = self.tables.certified_checkpoints.batch();
564 batch
565 .insert_batch(
566 &self.tables.certified_checkpoints,
567 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
568 )?
569 .insert_batch(
570 &self.tables.checkpoint_by_digest,
571 [(checkpoint.digest(), checkpoint.serializable_ref())],
572 )?;
573 if checkpoint.next_epoch_committee().is_some() {
574 batch.insert_batch(
575 &self.tables.epoch_last_checkpoint_map,
576 [(&checkpoint.epoch(), checkpoint.sequence_number())],
577 )?;
578 }
579 batch.write()?;
580
581 if let Some(local_checkpoint) = self
582 .tables
583 .locally_computed_checkpoints
584 .get(checkpoint.sequence_number())?
585 {
586 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
587 }
588
589 Ok(())
590 }
591
592 #[instrument(level = "trace", skip_all)]
595 pub fn insert_verified_checkpoint(
596 &self,
597 checkpoint: &VerifiedCheckpoint,
598 ) -> Result<(), TypedStoreError> {
599 self.insert_certified_checkpoint(checkpoint)?;
600 self.update_highest_verified_checkpoint(checkpoint)
601 }
602
603 pub fn update_highest_verified_checkpoint(
604 &self,
605 checkpoint: &VerifiedCheckpoint,
606 ) -> Result<(), TypedStoreError> {
607 if Some(*checkpoint.sequence_number())
608 > self
609 .get_highest_verified_checkpoint()?
610 .map(|x| *x.sequence_number())
611 {
612 debug!(
613 checkpoint_seq = checkpoint.sequence_number(),
614 "Updating highest verified checkpoint",
615 );
616 self.tables.watermarks.insert(
617 &CheckpointWatermark::HighestVerified,
618 &(*checkpoint.sequence_number(), *checkpoint.digest()),
619 )?;
620 }
621
622 Ok(())
623 }
624
625 pub fn update_highest_synced_checkpoint(
626 &self,
627 checkpoint: &VerifiedCheckpoint,
628 ) -> Result<(), TypedStoreError> {
629 let seq = *checkpoint.sequence_number();
630 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
631 self.tables.watermarks.insert(
632 &CheckpointWatermark::HighestSynced,
633 &(seq, *checkpoint.digest()),
634 )?;
635 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
636 Ok(())
637 }
638
639 async fn notify_read_checkpoint_watermark<F>(
640 &self,
641 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
642 seq: CheckpointSequenceNumber,
643 get_watermark: F,
644 ) -> VerifiedCheckpoint
645 where
646 F: Fn() -> Option<CheckpointSequenceNumber>,
647 {
648 type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
649
650 notify_read
651 .read(&[seq], |seqs| {
652 let seq = seqs[0];
653 let Some(highest) = get_watermark() else {
654 return Ok(vec![None]) as ReadResult;
655 };
656 if highest < seq {
657 return Ok(vec![None]) as ReadResult;
658 }
659 let checkpoint = self
660 .get_checkpoint_by_sequence_number(seq)
661 .expect("db error")
662 .expect("checkpoint not found");
663 Ok(vec![Some(checkpoint)]) as ReadResult
664 })
665 .await
666 .unwrap()
667 .into_iter()
668 .next()
669 .unwrap()
670 }
671
672 pub async fn notify_read_synced_checkpoint(
673 &self,
674 seq: CheckpointSequenceNumber,
675 ) -> VerifiedCheckpoint {
676 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
677 self.get_highest_synced_checkpoint_seq_number()
678 .expect("db error")
679 })
680 .await
681 }
682
683 pub async fn notify_read_executed_checkpoint(
684 &self,
685 seq: CheckpointSequenceNumber,
686 ) -> VerifiedCheckpoint {
687 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
688 self.get_highest_executed_checkpoint_seq_number()
689 .expect("db error")
690 })
691 .await
692 }
693
694 pub fn update_highest_executed_checkpoint(
695 &self,
696 checkpoint: &VerifiedCheckpoint,
697 ) -> Result<(), TypedStoreError> {
698 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
699 if seq_number >= *checkpoint.sequence_number() {
700 return Ok(());
701 }
702 assert_eq!(
703 seq_number + 1,
704 *checkpoint.sequence_number(),
705 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
706 checkpoint.sequence_number(),
707 seq_number
708 );
709 }
710 let seq = *checkpoint.sequence_number();
711 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
712 self.tables.watermarks.insert(
713 &CheckpointWatermark::HighestExecuted,
714 &(seq, *checkpoint.digest()),
715 )?;
716 self.executed_checkpoint_notify_read
717 .notify(&seq, checkpoint);
718 Ok(())
719 }
720
721 pub fn update_highest_pruned_checkpoint(
722 &self,
723 checkpoint: &VerifiedCheckpoint,
724 ) -> Result<(), TypedStoreError> {
725 self.tables.watermarks.insert(
726 &CheckpointWatermark::HighestPruned,
727 &(*checkpoint.sequence_number(), *checkpoint.digest()),
728 )
729 }
730
731 pub fn set_highest_executed_checkpoint_subtle(
737 &self,
738 checkpoint: &VerifiedCheckpoint,
739 ) -> Result<(), TypedStoreError> {
740 self.tables.watermarks.insert(
741 &CheckpointWatermark::HighestExecuted,
742 &(*checkpoint.sequence_number(), *checkpoint.digest()),
743 )
744 }
745
746 pub fn insert_checkpoint_contents(
747 &self,
748 contents: CheckpointContents,
749 ) -> Result<(), TypedStoreError> {
750 debug!(
751 checkpoint_seq = ?contents.digest(),
752 "Inserting checkpoint contents",
753 );
754 self.tables
755 .checkpoint_content
756 .insert(contents.digest(), &contents)
757 }
758
759 pub fn insert_verified_checkpoint_contents(
766 &self,
767 checkpoint: &VerifiedCheckpoint,
768 full_contents: VerifiedCheckpointContents,
769 ) -> Result<(), TypedStoreError> {
770 let mut batch = self.tables.full_checkpoint_content.batch();
771 batch.insert_batch(
772 &self.tables.checkpoint_sequence_by_contents_digest,
773 [(&checkpoint.content_digest, checkpoint.sequence_number())],
774 )?;
775 let full_contents = full_contents.into_inner();
776 batch.insert_batch(
777 &self.tables.full_checkpoint_content,
778 [(checkpoint.sequence_number(), &full_contents)],
779 )?;
780
781 let contents = full_contents.into_checkpoint_contents();
782 assert_eq!(&checkpoint.content_digest, contents.digest());
783
784 batch.insert_batch(
785 &self.tables.checkpoint_content,
786 [(contents.digest(), &contents)],
787 )?;
788
789 batch.write()
790 }
791
792 pub fn delete_full_checkpoint_contents(
793 &self,
794 seq: CheckpointSequenceNumber,
795 ) -> Result<(), TypedStoreError> {
796 self.tables.full_checkpoint_content.remove(&seq)
797 }
798
799 pub fn get_epoch_last_checkpoint(
800 &self,
801 epoch_id: EpochId,
802 ) -> IotaResult<Option<VerifiedCheckpoint>> {
803 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
804 let checkpoint = match seq {
805 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
806 None => None,
807 };
808 Ok(checkpoint)
809 }
810
811 pub fn insert_epoch_last_checkpoint(
812 &self,
813 epoch_id: EpochId,
814 checkpoint: &VerifiedCheckpoint,
815 ) -> IotaResult {
816 self.tables
817 .epoch_last_checkpoint_map
818 .insert(&epoch_id, checkpoint.sequence_number())?;
819 Ok(())
820 }
821
822 pub fn get_epoch_state_commitments(
823 &self,
824 epoch: EpochId,
825 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
826 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
827 checkpoint
828 .end_of_epoch_data
829 .as_ref()
830 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
831 .epoch_commitments
832 .clone()
833 });
834 Ok(commitments)
835 }
836
837 pub fn get_epoch_stats(
840 &self,
841 epoch: EpochId,
842 last_checkpoint: &CheckpointSummary,
843 ) -> Option<EpochStats> {
844 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
845 (0, 0)
846 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
847 (
848 checkpoint.sequence_number + 1,
849 checkpoint.network_total_transactions,
850 )
851 } else {
852 return None;
853 };
854 Some(EpochStats {
855 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
856 transaction_count: last_checkpoint.network_total_transactions
857 - prev_epoch_network_transactions,
858 total_gas_reward: last_checkpoint
859 .epoch_rolling_gas_cost_summary
860 .computation_cost,
861 })
862 }
863
864 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
865 self.tables
867 .checkpoint_content
868 .checkpoint_db(path)
869 .map_err(Into::into)
870 }
871
872 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
873 let mut wb = self.tables.watermarks.batch();
874 wb.delete_batch(
875 &self.tables.watermarks,
876 std::iter::once(CheckpointWatermark::HighestExecuted),
877 )?;
878 wb.write()?;
879 Ok(())
880 }
881
882 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
883 self.delete_highest_executed_checkpoint_test_only()?;
884 Ok(())
885 }
886}
887
888#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
889pub enum CheckpointWatermark {
890 HighestVerified,
891 HighestSynced,
892 HighestExecuted,
893 HighestPruned,
894}
895
896struct CheckpointStateHasher {
897 epoch_store: Arc<AuthorityPerEpochStore>,
898 hasher: Weak<GlobalStateHasher>,
899 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
900}
901
902impl CheckpointStateHasher {
903 fn new(
904 epoch_store: Arc<AuthorityPerEpochStore>,
905 hasher: Weak<GlobalStateHasher>,
906 receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
907 ) -> Self {
908 Self {
909 epoch_store,
910 hasher,
911 receive_from_builder,
912 }
913 }
914
915 async fn run(self) {
916 let Self {
917 epoch_store,
918 hasher,
919 mut receive_from_builder,
920 } = self;
921 while let Some((seq, effects)) = receive_from_builder.recv().await {
922 let Some(hasher) = hasher.upgrade() else {
923 info!("GlobalStateHash was dropped, stopping checkpoint accumulation");
924 break;
925 };
926 hasher
927 .accumulate_checkpoint(&effects, seq, &epoch_store)
928 .expect("epoch ended while accumulating checkpoint");
929 }
930 }
931}
932
933pub struct CheckpointBuilder {
934 state: Arc<AuthorityState>,
935 store: Arc<CheckpointStore>,
936 epoch_store: Arc<AuthorityPerEpochStore>,
937 notify: Arc<Notify>,
938 notify_aggregator: Arc<Notify>,
939 last_built: watch::Sender<CheckpointSequenceNumber>,
940 effects_store: Arc<dyn TransactionCacheRead>,
941 global_state_hasher: Weak<GlobalStateHasher>,
942 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
943 output: Box<dyn CheckpointOutput>,
944 metrics: Arc<CheckpointMetrics>,
945 max_transactions_per_checkpoint: usize,
946 max_checkpoint_size_bytes: usize,
947}
948
949pub struct CheckpointAggregator {
950 store: Arc<CheckpointStore>,
951 epoch_store: Arc<AuthorityPerEpochStore>,
952 notify: Arc<Notify>,
953 current: Option<CheckpointSignatureAggregator>,
954 output: Box<dyn CertifiedCheckpointOutput>,
955 state: Arc<AuthorityState>,
956 metrics: Arc<CheckpointMetrics>,
957}
958
959pub struct CheckpointSignatureAggregator {
961 next_index: u64,
962 summary: CheckpointSummary,
963 digest: CheckpointDigest,
964 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
966 store: Arc<CheckpointStore>,
967 state: Arc<AuthorityState>,
968 metrics: Arc<CheckpointMetrics>,
969}
970
971impl CheckpointBuilder {
972 fn new(
973 state: Arc<AuthorityState>,
974 store: Arc<CheckpointStore>,
975 epoch_store: Arc<AuthorityPerEpochStore>,
976 notify: Arc<Notify>,
977 effects_store: Arc<dyn TransactionCacheRead>,
978 global_state_hasher: Weak<GlobalStateHasher>,
980 send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
982 output: Box<dyn CheckpointOutput>,
983 notify_aggregator: Arc<Notify>,
984 last_built: watch::Sender<CheckpointSequenceNumber>,
985 metrics: Arc<CheckpointMetrics>,
986 max_transactions_per_checkpoint: usize,
987 max_checkpoint_size_bytes: usize,
988 ) -> Self {
989 Self {
990 state,
991 store,
992 epoch_store,
993 notify,
994 effects_store,
995 global_state_hasher,
996 send_to_hasher,
997 output,
998 notify_aggregator,
999 last_built,
1000 metrics,
1001 max_transactions_per_checkpoint,
1002 max_checkpoint_size_bytes,
1003 }
1004 }
1005
1006 async fn run(mut self) {
1009 info!("Starting CheckpointBuilder");
1010 loop {
1011 self.maybe_build_checkpoints().await;
1012
1013 self.notify.notified().await;
1014 }
1015 }
1016
1017 async fn maybe_build_checkpoints(&mut self) {
1018 let _scope = monitored_scope("BuildCheckpoints");
1019
1020 let summary = self
1022 .epoch_store
1023 .last_built_checkpoint_builder_summary()
1024 .expect("epoch should not have ended");
1025 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1026 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1027
1028 let min_checkpoint_interval_ms = self
1029 .epoch_store
1030 .protocol_config()
1031 .min_checkpoint_interval_ms_as_option()
1032 .unwrap_or_default();
1033 let mut grouped_pending_checkpoints = Vec::new();
1034 let mut checkpoints_iter = self
1035 .epoch_store
1036 .get_pending_checkpoints(last_height)
1037 .expect("unexpected epoch store error")
1038 .into_iter()
1039 .peekable();
1040 while let Some((height, pending)) = checkpoints_iter.next() {
1041 let current_timestamp = pending.details().timestamp_ms;
1044 let can_build = match last_timestamp {
1045 Some(last_timestamp) => {
1046 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1047 }
1048 None => true,
1049 } || checkpoints_iter
1052 .peek()
1053 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1054 || pending.details().last_of_epoch;
1056 grouped_pending_checkpoints.push(pending);
1057 if !can_build {
1058 debug!(
1059 checkpoint_commit_height = height,
1060 ?last_timestamp,
1061 ?current_timestamp,
1062 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1063 );
1064 continue;
1065 }
1066
1067 last_height = Some(height);
1069 last_timestamp = Some(current_timestamp);
1070 debug!(
1071 checkpoint_commit_height = height,
1072 "Making checkpoint at commit height"
1073 );
1074
1075 match self
1076 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1077 .await
1078 {
1079 Ok(seq) => {
1080 self.last_built.send_if_modified(|cur| {
1081 if seq > *cur {
1083 *cur = seq;
1084 true
1085 } else {
1086 false
1087 }
1088 });
1089 }
1090 Err(e) => {
1091 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1092 tokio::time::sleep(Duration::from_secs(1)).await;
1093 self.metrics.checkpoint_errors.inc();
1094 return;
1095 }
1096 }
1097 tokio::task::yield_now().await;
1100 }
1101 debug!(
1102 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1103 grouped_pending_checkpoints.len(),
1104 );
1105 }
1106
1107 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1108 async fn make_checkpoint(
1109 &self,
1110 pendings: Vec<PendingCheckpoint>,
1111 ) -> anyhow::Result<CheckpointSequenceNumber> {
1112 let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1113 let last_details = pendings.last().unwrap().details().clone();
1114
1115 let mut effects_in_current_checkpoint = BTreeSet::new();
1121
1122 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1125 let mut all_roots = HashSet::new();
1126 for pending_checkpoint in pendings.into_iter() {
1127 let pending = pending_checkpoint.into_v1();
1128 let (root_digests, txn_in_checkpoint) = self
1129 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1130 .await?;
1131 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1132 all_roots.extend(root_digests);
1133 }
1134 let new_checkpoints = self
1135 .create_checkpoints(
1136 sorted_tx_effects_included_in_checkpoint,
1137 &last_details,
1138 &all_roots,
1139 )
1140 .await?;
1141 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1142 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1143 .await?;
1144 Ok(highest_sequence)
1145 }
1146
1147 #[instrument(level = "debug", skip_all)]
1152 async fn resolve_checkpoint_transactions(
1153 &self,
1154 roots: Vec<TransactionKey>,
1155 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1156 ) -> IotaResult<(Vec<TransactionDigest>, Vec<TransactionEffects>)> {
1157 let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1158
1159 self.metrics
1160 .checkpoint_roots_count
1161 .inc_by(roots.len() as u64);
1162
1163 let root_digests = self
1164 .epoch_store
1165 .notify_read_executed_digests(&roots)
1166 .in_monitored_scope("CheckpointNotifyDigests")
1167 .await?;
1168 let root_effects = self
1169 .effects_store
1170 .try_notify_read_executed_effects(&root_digests)
1171 .in_monitored_scope("CheckpointNotifyRead")
1172 .await?;
1173
1174 let consensus_commit_prologue = {
1175 let consensus_commit_prologue = self
1179 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1180 .await?;
1181
1182 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1186 let unsorted_ccp = self.complete_checkpoint_effects(
1187 vec![ccp_effects.clone()],
1188 effects_in_current_checkpoint,
1189 )?;
1190
1191 if unsorted_ccp.len() != 1 {
1194 fatal!(
1195 "Expected 1 consensus commit prologue, got {:?}",
1196 unsorted_ccp
1197 .iter()
1198 .map(|e| e.transaction_digest())
1199 .collect::<Vec<_>>()
1200 );
1201 }
1202 assert_eq!(unsorted_ccp.len(), 1);
1203 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1204 }
1205 consensus_commit_prologue
1206 };
1207
1208 let unsorted =
1209 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1210
1211 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1212 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1213 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1214 #[cfg(debug_assertions)]
1215 {
1216 for tx in unsorted.iter() {
1219 assert!(tx.transaction_digest() != &_ccp_digest);
1220 }
1221 }
1222 sorted.push(ccp_effects);
1223 }
1224 sorted.extend(CausalOrder::causal_sort(unsorted));
1225
1226 #[cfg(msim)]
1227 {
1228 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1230 }
1231
1232 Ok((root_digests, sorted))
1233 }
1234
1235 async fn extract_consensus_commit_prologue(
1240 &self,
1241 root_digests: &[TransactionDigest],
1242 root_effects: &[TransactionEffects],
1243 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1244 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1245 if root_digests.is_empty() {
1246 return Ok(None);
1247 }
1248
1249 let first_tx = self
1254 .state
1255 .get_transaction_cache_reader()
1256 .try_get_transaction_block(&root_digests[0])?
1257 .expect("Transaction block must exist");
1258
1259 Ok(match first_tx.transaction_data().kind() {
1260 TransactionKind::ConsensusCommitPrologueV1(_) => {
1261 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1262 Some((*first_tx.digest(), root_effects[0].clone()))
1263 }
1264 _ => None,
1265 })
1266 }
1267
1268 #[instrument(level = "debug", skip_all)]
1270 async fn write_checkpoints(
1271 &self,
1272 height: CheckpointHeight,
1273 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1274 ) -> IotaResult {
1275 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1276 let mut batch = self.store.tables.checkpoint_content.batch();
1277 let mut all_tx_digests =
1278 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1279
1280 for (summary, contents) in &new_checkpoints {
1281 debug!(
1282 checkpoint_commit_height = height,
1283 checkpoint_seq = summary.sequence_number,
1284 contents_digest = ?contents.digest(),
1285 "writing checkpoint",
1286 );
1287
1288 if let Some(previously_computed_summary) = self
1289 .store
1290 .tables
1291 .locally_computed_checkpoints
1292 .get(&summary.sequence_number)?
1293 {
1294 if previously_computed_summary != *summary {
1295 fatal!(
1297 "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1298 summary.sequence_number,
1299 );
1300 }
1301 }
1302
1303 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1304
1305 self.metrics
1306 .transactions_included_in_checkpoint
1307 .inc_by(contents.size() as u64);
1308 let sequence_number = summary.sequence_number;
1309 self.metrics
1310 .last_constructed_checkpoint
1311 .set(sequence_number as i64);
1312
1313 batch.insert_batch(
1314 &self.store.tables.checkpoint_content,
1315 [(contents.digest(), contents)],
1316 )?;
1317
1318 batch.insert_batch(
1319 &self.store.tables.locally_computed_checkpoints,
1320 [(sequence_number, summary)],
1321 )?;
1322 }
1323
1324 batch.write()?;
1325
1326 for (summary, contents) in &new_checkpoints {
1329 self.output
1330 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1331 .await?;
1332 }
1333
1334 for (local_checkpoint, _) in &new_checkpoints {
1335 if let Some(certified_checkpoint) = self
1336 .store
1337 .tables
1338 .certified_checkpoints
1339 .get(local_checkpoint.sequence_number())?
1340 {
1341 self.store
1342 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1343 }
1344 }
1345
1346 self.notify_aggregator.notify_one();
1347 self.epoch_store
1348 .process_constructed_checkpoint(height, new_checkpoints);
1349 Ok(())
1350 }
1351
1352 #[expect(clippy::type_complexity)]
1353 fn split_checkpoint_chunks(
1354 &self,
1355 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1356 signatures: Vec<Vec<GenericSignature>>,
1357 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1358 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1359 let mut chunks = Vec::new();
1360 let mut chunk = Vec::new();
1361 let mut chunk_size: usize = 0;
1362 for ((effects, transaction_size), signatures) in
1363 effects_and_transaction_sizes.into_iter().zip(signatures)
1364 {
1365 let size = transaction_size
1370 + bcs::serialized_size(&effects)?
1371 + bcs::serialized_size(&signatures)?;
1372 if chunk.len() == self.max_transactions_per_checkpoint
1373 || (chunk_size + size) > self.max_checkpoint_size_bytes
1374 {
1375 if chunk.is_empty() {
1376 warn!(
1378 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1379 self.max_checkpoint_size_bytes
1380 );
1381 } else {
1382 chunks.push(chunk);
1383 chunk = Vec::new();
1384 chunk_size = 0;
1385 }
1386 }
1387
1388 chunk.push((effects, signatures));
1389 chunk_size += size;
1390 }
1391
1392 if !chunk.is_empty() || chunks.is_empty() {
1393 chunks.push(chunk);
1398 }
1404 Ok(chunks)
1405 }
1406
1407 fn load_last_built_checkpoint_summary(
1410 epoch_store: &AuthorityPerEpochStore,
1411 store: &CheckpointStore,
1412 ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1413 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1414 if last_checkpoint.is_none() {
1415 let epoch = epoch_store.epoch();
1416 if epoch > 0 {
1417 let previous_epoch = epoch - 1;
1418 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1419 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1420 if let Some((ref seq, _)) = last_checkpoint {
1421 debug!(
1422 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1423 );
1424 } else {
1425 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1428 }
1429 }
1430 }
1431 Ok(last_checkpoint)
1432 }
1433
1434 #[instrument(level = "debug", skip_all)]
1435 async fn create_checkpoints(
1436 &self,
1437 all_effects: Vec<TransactionEffects>,
1438 details: &PendingCheckpointInfo,
1439 all_roots: &HashSet<TransactionDigest>,
1440 ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1441 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1442
1443 let total = all_effects.len();
1444 let mut last_checkpoint =
1445 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1446 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1447 debug!(
1448 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1449 checkpoint_timestamp = details.timestamp_ms,
1450 "Creating checkpoint(s) for {} transactions",
1451 all_effects.len(),
1452 );
1453
1454 let all_digests: Vec<_> = all_effects
1455 .iter()
1456 .map(|effect| *effect.transaction_digest())
1457 .collect();
1458 let transactions_and_sizes = self
1459 .state
1460 .get_transaction_cache_reader()
1461 .try_get_transactions_and_serialized_sizes(&all_digests)?;
1462 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1463 let mut transactions = Vec::with_capacity(all_effects.len());
1464 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1465 let mut randomness_rounds = BTreeMap::new();
1466 {
1467 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1468 debug!(
1469 ?last_checkpoint_seq,
1470 "Waiting for {:?} certificates to appear in consensus",
1471 all_effects.len()
1472 );
1473
1474 for (effects, transaction_and_size) in all_effects
1475 .into_iter()
1476 .zip(transactions_and_sizes.into_iter())
1477 {
1478 let (transaction, size) = transaction_and_size
1479 .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1480 match transaction.inner().transaction_data().kind() {
1481 #[allow(deprecated)]
1482 TransactionKind::ConsensusCommitPrologueV1(_)
1483 | TransactionKind::AuthenticatorStateUpdateV1Deprecated => {
1484 }
1492 TransactionKind::RandomnessStateUpdate(rsu) => {
1493 randomness_rounds
1494 .insert(*effects.transaction_digest(), rsu.randomness_round);
1495 }
1496 _ => {
1497 let digest = *effects.transaction_digest();
1502 if !all_roots.contains(&digest) {
1503 transaction_keys.push(SequencedConsensusTransactionKey::External(
1504 ConsensusTransactionKey::Certificate(digest),
1505 ));
1506 }
1507 }
1508 }
1509 transactions.push(transaction);
1510 all_effects_and_transaction_sizes.push((effects, size));
1511 }
1512
1513 self.epoch_store
1514 .consensus_messages_processed_notify(transaction_keys)
1515 .await?;
1516 }
1517
1518 let signatures = self
1519 .epoch_store
1520 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1521 debug!(
1522 ?last_checkpoint_seq,
1523 "Received {} checkpoint user signatures from consensus",
1524 signatures.len()
1525 );
1526
1527 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1528 let chunks_count = chunks.len();
1529
1530 let mut checkpoints = Vec::with_capacity(chunks_count);
1531 debug!(
1532 ?last_checkpoint_seq,
1533 "Creating {} checkpoints with {} transactions", chunks_count, total,
1534 );
1535
1536 let epoch = self.epoch_store.epoch();
1537 for (index, transactions) in chunks.into_iter().enumerate() {
1538 let first_checkpoint_of_epoch = index == 0
1539 && last_checkpoint
1540 .as_ref()
1541 .map(|(_, c)| c.epoch != epoch)
1542 .unwrap_or(true);
1543 if first_checkpoint_of_epoch {
1544 self.epoch_store
1545 .record_epoch_first_checkpoint_creation_time_metric();
1546 }
1547 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1548
1549 let sequence_number = last_checkpoint
1550 .as_ref()
1551 .map(|(_, c)| c.sequence_number + 1)
1552 .unwrap_or_default();
1553 let mut timestamp_ms = details.timestamp_ms;
1554 if let Some((_, last_checkpoint)) = &last_checkpoint {
1555 if last_checkpoint.timestamp_ms > timestamp_ms {
1556 debug!(
1558 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1559 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1560 );
1561 if self
1562 .epoch_store
1563 .protocol_config()
1564 .consensus_median_timestamp_with_checkpoint_enforcement()
1565 {
1566 timestamp_ms = last_checkpoint.timestamp_ms;
1567 }
1568 }
1569 }
1570
1571 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1572 let epoch_rolling_gas_cost_summary =
1573 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1574
1575 let end_of_epoch_data = if last_checkpoint_of_epoch {
1576 let scores: Vec<u64> = if self
1577 .epoch_store
1578 .protocol_config()
1579 .pass_calculated_validator_scores_to_advance_epoch()
1580 {
1581 self.epoch_store.scoreboard.current_scores()
1582 } else {
1583 vec![MAX_SCORE; self.epoch_store.committee().num_members()]
1585 };
1586
1587 let (system_state_obj, system_epoch_info_event) = self
1588 .augment_epoch_last_checkpoint(
1589 &epoch_rolling_gas_cost_summary,
1590 timestamp_ms,
1591 &mut effects,
1592 &mut signatures,
1593 sequence_number,
1594 scores,
1595 )
1596 .await?;
1597
1598 let epoch_supply_change =
1606 system_epoch_info_event.map_or(0, |event| event.supply_change());
1607
1608 let committee = system_state_obj
1609 .get_current_epoch_committee()
1610 .committee()
1611 .clone();
1612
1613 let root_state_digest = {
1616 let state_acc = self
1617 .global_state_hasher
1618 .upgrade()
1619 .expect("No checkpoints should be getting built after local configuration");
1620 let acc = state_acc.accumulate_checkpoint(
1621 &effects,
1622 sequence_number,
1623 &self.epoch_store,
1624 )?;
1625
1626 state_acc
1627 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1628 .await?;
1629
1630 state_acc.accumulate_running_root(
1631 &self.epoch_store,
1632 sequence_number,
1633 Some(acc),
1634 )?;
1635 state_acc
1636 .digest_epoch(self.epoch_store.clone(), sequence_number)
1637 .await?
1638 };
1639 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1640 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1641
1642 let epoch_commitments = vec![root_state_digest.into()];
1643
1644 Some(EndOfEpochData {
1645 next_epoch_committee: committee.voting_rights,
1646 next_epoch_protocol_version: ProtocolVersion::new(
1647 system_state_obj.protocol_version(),
1648 ),
1649 epoch_commitments,
1650 epoch_supply_change,
1651 })
1652 } else {
1653 self.send_to_hasher
1654 .send((sequence_number, effects.clone()))
1655 .await?;
1656 None
1657 };
1658 let contents = CheckpointContents::new_with_digests_and_signatures(
1659 effects.iter().map(TransactionEffects::execution_digests),
1660 signatures,
1661 );
1662
1663 let num_txns = contents.size() as u64;
1664
1665 let network_total_transactions = last_checkpoint
1666 .as_ref()
1667 .map(|(_, c)| c.network_total_transactions + num_txns)
1668 .unwrap_or(num_txns);
1669
1670 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1671
1672 let matching_randomness_rounds: Vec<_> = effects
1673 .iter()
1674 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1675 .copied()
1676 .collect();
1677
1678 let summary = CheckpointSummary::new(
1679 self.epoch_store.protocol_config(),
1680 epoch,
1681 sequence_number,
1682 network_total_transactions,
1683 &contents,
1684 previous_digest,
1685 epoch_rolling_gas_cost_summary,
1686 end_of_epoch_data,
1687 timestamp_ms,
1688 matching_randomness_rounds,
1689 );
1690 summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1691 if last_checkpoint_of_epoch {
1692 info!(
1693 checkpoint_seq = sequence_number,
1694 "creating last checkpoint of epoch {}", epoch
1695 );
1696 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1697 self.epoch_store
1698 .report_epoch_metrics_at_last_checkpoint(stats);
1699 }
1700 }
1701 last_checkpoint = Some((sequence_number, summary.clone()));
1702 checkpoints.push((summary, contents));
1703 }
1704
1705 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1706 }
1707
1708 fn get_epoch_total_gas_cost(
1709 &self,
1710 last_checkpoint: Option<&CheckpointSummary>,
1711 cur_checkpoint_effects: &[TransactionEffects],
1712 ) -> GasCostSummary {
1713 let (previous_epoch, previous_gas_costs) = last_checkpoint
1714 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1715 .unwrap_or_default();
1716 let current_gas_costs =
1717 checked::new_gas_cost_summary_from_txn_effects(cur_checkpoint_effects.iter());
1718 if previous_epoch == self.epoch_store.epoch() {
1719 GasCostSummary::new(
1721 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1722 previous_gas_costs.computation_cost_burned
1723 + current_gas_costs.computation_cost_burned,
1724 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1725 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1726 previous_gas_costs.non_refundable_storage_fee
1727 + current_gas_costs.non_refundable_storage_fee,
1728 )
1729 } else {
1730 current_gas_costs
1731 }
1732 }
1733
1734 #[instrument(level = "error", skip_all)]
1737 async fn augment_epoch_last_checkpoint(
1738 &self,
1739 epoch_total_gas_cost: &GasCostSummary,
1740 epoch_start_timestamp_ms: CheckpointTimestamp,
1741 checkpoint_effects: &mut Vec<TransactionEffects>,
1742 signatures: &mut Vec<Vec<GenericSignature>>,
1743 checkpoint: CheckpointSequenceNumber,
1744 scores: Vec<u64>,
1745 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1746 let (system_state, system_epoch_info_event, effects) = self
1747 .state
1748 .create_and_execute_advance_epoch_tx(
1749 &self.epoch_store,
1750 epoch_total_gas_cost,
1751 checkpoint,
1752 epoch_start_timestamp_ms,
1753 scores,
1754 )
1755 .await?;
1756 checkpoint_effects.push(effects);
1757 signatures.push(vec![]);
1758 Ok((system_state, system_epoch_info_event))
1759 }
1760
1761 #[instrument(level = "debug", skip_all)]
1770 fn complete_checkpoint_effects(
1771 &self,
1772 mut roots: Vec<TransactionEffects>,
1773 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1774 ) -> IotaResult<Vec<TransactionEffects>> {
1775 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1776 let mut results = vec![];
1777 let mut seen = HashSet::new();
1778 loop {
1779 let mut pending = HashSet::new();
1780
1781 let transactions_included = self
1782 .epoch_store
1783 .builder_included_transactions_in_checkpoint(
1784 roots.iter().map(|e| e.transaction_digest()),
1785 )?;
1786
1787 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1788 let digest = effect.transaction_digest();
1789 seen.insert(*digest);
1792
1793 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1795 continue;
1796 }
1797
1798 if tx_included || effect.epoch() < self.epoch_store.epoch() {
1800 continue;
1801 }
1802
1803 let existing_effects = self
1804 .epoch_store
1805 .transactions_executed_in_cur_epoch(effect.dependencies())?;
1806
1807 for (dependency, effects_signature_exists) in
1808 effect.dependencies().iter().zip(existing_effects.iter())
1809 {
1810 if !effects_signature_exists {
1815 continue;
1816 }
1817 if seen.insert(*dependency) {
1818 pending.insert(*dependency);
1819 }
1820 }
1821 results.push(effect);
1822 }
1823 if pending.is_empty() {
1824 break;
1825 }
1826 let pending = pending.into_iter().collect::<Vec<_>>();
1827 let effects = self
1828 .effects_store
1829 .try_multi_get_executed_effects(&pending)?;
1830 let effects = effects
1831 .into_iter()
1832 .zip(pending)
1833 .map(|(opt, digest)| match opt {
1834 Some(x) => x,
1835 None => panic!(
1836 "Can not find effect for transaction {digest}, however transaction that depend on it was already executed"
1837 ),
1838 })
1839 .collect::<Vec<_>>();
1840 roots = effects;
1841 }
1842
1843 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1844 Ok(results)
1845 }
1846
1847 #[cfg(msim)]
1850 fn expensive_consensus_commit_prologue_invariants_check(
1851 &self,
1852 root_digests: &[TransactionDigest],
1853 sorted: &[TransactionEffects],
1854 ) {
1855 let root_txs = self
1857 .state
1858 .get_transaction_cache_reader()
1859 .multi_get_transaction_blocks(root_digests);
1860 let ccps = root_txs
1861 .iter()
1862 .filter_map(|tx| {
1863 tx.as_ref().filter(|tx| {
1864 matches!(
1865 tx.transaction_data().kind(),
1866 TransactionKind::ConsensusCommitPrologueV1(_)
1867 )
1868 })
1869 })
1870 .collect::<Vec<_>>();
1871
1872 assert!(ccps.len() <= 1);
1875
1876 let txs = self
1878 .state
1879 .get_transaction_cache_reader()
1880 .multi_get_transaction_blocks(
1881 &sorted
1882 .iter()
1883 .map(|tx| *tx.transaction_digest())
1884 .collect::<Vec<_>>(),
1885 );
1886
1887 if ccps.is_empty() {
1888 for tx in txs.iter().flatten() {
1892 assert!(!matches!(
1893 tx.transaction_data().kind(),
1894 TransactionKind::ConsensusCommitPrologueV1(_)
1895 ));
1896 }
1897 } else {
1898 assert!(matches!(
1901 txs[0].as_ref().unwrap().transaction_data().kind(),
1902 TransactionKind::ConsensusCommitPrologueV1(_)
1903 ));
1904
1905 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1906
1907 for tx in txs.iter().skip(1).flatten() {
1908 assert!(!matches!(
1909 tx.transaction_data().kind(),
1910 TransactionKind::ConsensusCommitPrologueV1(_)
1911 ));
1912 }
1913 }
1914 }
1915}
1916
1917impl CheckpointAggregator {
1918 fn new(
1919 tables: Arc<CheckpointStore>,
1920 epoch_store: Arc<AuthorityPerEpochStore>,
1921 notify: Arc<Notify>,
1922 output: Box<dyn CertifiedCheckpointOutput>,
1923 state: Arc<AuthorityState>,
1924 metrics: Arc<CheckpointMetrics>,
1925 ) -> Self {
1926 let current = None;
1927 Self {
1928 store: tables,
1929 epoch_store,
1930 notify,
1931 current,
1932 output,
1933 state,
1934 metrics,
1935 }
1936 }
1937
1938 async fn run(mut self) {
1944 info!("Starting CheckpointAggregator");
1945 loop {
1946 if let Err(e) = self.run_and_notify().await {
1947 error!(
1948 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1949 e
1950 );
1951 self.metrics.checkpoint_errors.inc();
1952 tokio::time::sleep(Duration::from_secs(1)).await;
1953 continue;
1954 }
1955
1956 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1957 }
1958 }
1959
1960 async fn run_and_notify(&mut self) -> IotaResult {
1961 let summaries = self.run_inner()?;
1962 for summary in summaries {
1963 self.output.certified_checkpoint_created(&summary).await?;
1964 }
1965 Ok(())
1966 }
1967
1968 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1969 let _scope = monitored_scope("CheckpointAggregator");
1970 let mut result = vec![];
1971 'outer: loop {
1972 let next_to_certify = self.next_checkpoint_to_certify()?;
1973 let current = if let Some(current) = &mut self.current {
1974 if current.summary.sequence_number < next_to_certify {
1980 self.current = None;
1981 continue;
1982 }
1983 current
1984 } else {
1985 let Some(summary) = self
1986 .epoch_store
1987 .get_built_checkpoint_summary(next_to_certify)?
1988 else {
1989 return Ok(result);
1990 };
1991 self.current = Some(CheckpointSignatureAggregator {
1992 next_index: 0,
1993 digest: summary.digest(),
1994 summary,
1995 signatures_by_digest: MultiStakeAggregator::new(
1996 self.epoch_store.committee().clone(),
1997 ),
1998 store: self.store.clone(),
1999 state: self.state.clone(),
2000 metrics: self.metrics.clone(),
2001 });
2002 self.current.as_mut().unwrap()
2003 };
2004
2005 let epoch_tables = self
2006 .epoch_store
2007 .tables()
2008 .expect("should not run past end of epoch");
2009 let iter = epoch_tables
2010 .pending_checkpoint_signatures
2011 .safe_iter_with_bounds(
2012 Some((current.summary.sequence_number, current.next_index)),
2013 None,
2014 );
2015 for item in iter {
2016 let ((seq, index), data) = item?;
2017 if seq != current.summary.sequence_number {
2018 trace!(
2019 checkpoint_seq =? current.summary.sequence_number,
2020 "Not enough checkpoint signatures",
2021 );
2022 return Ok(result);
2024 }
2025 trace!(
2026 checkpoint_seq = current.summary.sequence_number,
2027 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2028 current.summary.digest(),
2029 data.summary.auth_sig().authority.concise()
2030 );
2031 self.metrics
2032 .checkpoint_participation
2033 .with_label_values(&[&format!(
2034 "{:?}",
2035 data.summary.auth_sig().authority.concise()
2036 )])
2037 .inc();
2038 if let Ok(auth_signature) = current.try_aggregate(data) {
2039 debug!(
2040 checkpoint_seq = current.summary.sequence_number,
2041 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2042 current.summary.digest(),
2043 );
2044 let summary = VerifiedCheckpoint::new_unchecked(
2045 CertifiedCheckpointSummary::new_from_data_and_sig(
2046 current.summary.clone(),
2047 auth_signature,
2048 ),
2049 );
2050
2051 self.store.insert_certified_checkpoint(&summary)?;
2052 self.metrics
2053 .last_certified_checkpoint
2054 .set(current.summary.sequence_number as i64);
2055 current
2056 .summary
2057 .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2058 result.push(summary.into_inner());
2059 self.current = None;
2060 continue 'outer;
2061 } else {
2062 current.next_index = index + 1;
2063 }
2064 }
2065 break;
2066 }
2067 Ok(result)
2068 }
2069
2070 fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2071 Ok(self
2072 .store
2073 .tables
2074 .certified_checkpoints
2075 .reversed_safe_iter_with_bounds(None, None)?
2076 .next()
2077 .transpose()?
2078 .map(|(seq, _)| seq + 1)
2079 .unwrap_or_default())
2080 }
2081}
2082
2083impl CheckpointSignatureAggregator {
2084 #[expect(clippy::result_unit_err)]
2085 pub fn try_aggregate(
2086 &mut self,
2087 data: CheckpointSignatureMessage,
2088 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2089 let their_digest = *data.summary.digest();
2090 let (_, signature) = data.summary.into_data_and_sig();
2091 let author = signature.authority;
2092 let envelope =
2093 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2094 match self.signatures_by_digest.insert(their_digest, envelope) {
2095 InsertResult::Failed {
2097 error:
2098 IotaError::StakeAggregatorRepeatedSigner {
2099 conflicting_sig: false,
2100 ..
2101 },
2102 } => Err(()),
2103 InsertResult::Failed { error } => {
2104 warn!(
2105 checkpoint_seq = self.summary.sequence_number,
2106 "Failed to aggregate new signature from validator {:?}: {:?}",
2107 author.concise(),
2108 error
2109 );
2110 self.check_for_split_brain();
2111 Err(())
2112 }
2113 InsertResult::QuorumReached(cert) => {
2114 if their_digest != self.digest {
2118 self.metrics.remote_checkpoint_forks.inc();
2119 warn!(
2120 checkpoint_seq = self.summary.sequence_number,
2121 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2122 author.concise(),
2123 their_digest,
2124 self.digest
2125 );
2126 return Err(());
2127 }
2128 Ok(cert)
2129 }
2130 InsertResult::NotEnoughVotes {
2131 bad_votes: _,
2132 bad_authorities: _,
2133 } => {
2134 self.check_for_split_brain();
2135 Err(())
2136 }
2137 }
2138 }
2139
2140 fn check_for_split_brain(&self) {
2145 debug!(
2146 checkpoint_seq = self.summary.sequence_number,
2147 "Checking for split brain condition"
2148 );
2149 if self.signatures_by_digest.quorum_unreachable() {
2150 let digests_by_stake_messages = self
2156 .signatures_by_digest
2157 .get_all_unique_values()
2158 .into_iter()
2159 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2160 .map(|(digest, (_authorities, total_stake))| {
2161 format!("{digest} (total stake: {total_stake})")
2162 })
2163 .collect::<Vec<String>>();
2164 error!(
2165 checkpoint_seq = self.summary.sequence_number,
2166 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2167 self.signatures_by_digest.uncommitted_stake(),
2168 digests_by_stake_messages,
2169 );
2170 self.metrics.split_brain_checkpoint_forks.inc();
2171
2172 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2173 let local_summary = self.summary.clone();
2174 let state = self.state.clone();
2175 let tables = self.store.clone();
2176
2177 tokio::spawn(async move {
2178 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2179 });
2180 }
2181 }
2182}
2183
2184async fn diagnose_split_brain(
2190 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2191 local_summary: CheckpointSummary,
2192 state: Arc<AuthorityState>,
2193 tables: Arc<CheckpointStore>,
2194) {
2195 debug!(
2196 checkpoint_seq = local_summary.sequence_number,
2197 "Running split brain diagnostics..."
2198 );
2199 let time = SystemTime::now();
2200 let digest_to_validator = all_unique_values
2202 .iter()
2203 .filter_map(|(digest, (validators, _))| {
2204 if *digest != local_summary.digest() {
2205 let random_validator = validators.choose(&mut get_rng()).unwrap();
2206 Some((*digest, *random_validator))
2207 } else {
2208 None
2209 }
2210 })
2211 .collect::<HashMap<_, _>>();
2212 if digest_to_validator.is_empty() {
2213 panic!(
2214 "Given split brain condition, there should be at \
2215 least one validator that disagrees with local signature"
2216 );
2217 }
2218
2219 let epoch_store = state.load_epoch_store_one_call_per_task();
2220 let committee = epoch_store
2221 .epoch_start_state()
2222 .get_iota_committee_with_network_metadata();
2223 let network_config = default_iota_network_config();
2224 let network_clients =
2225 make_network_authority_clients_with_network_config(&committee, &network_config);
2226
2227 let response_futures = digest_to_validator
2229 .values()
2230 .cloned()
2231 .map(|validator| {
2232 let client = network_clients
2233 .get(&validator)
2234 .expect("Failed to get network client");
2235 let request = CheckpointRequest {
2236 sequence_number: Some(local_summary.sequence_number),
2237 request_content: true,
2238 certified: false,
2239 };
2240 client.handle_checkpoint(request)
2241 })
2242 .collect::<Vec<_>>();
2243
2244 let digest_name_pair = digest_to_validator.iter();
2245 let response_data = futures::future::join_all(response_futures)
2246 .await
2247 .into_iter()
2248 .zip(digest_name_pair)
2249 .filter_map(|(response, (digest, name))| match response {
2250 Ok(response) => match response {
2251 CheckpointResponse {
2252 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2253 contents: Some(contents),
2254 } => Some((*name, *digest, summary, contents)),
2255 CheckpointResponse {
2256 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2257 contents: _,
2258 } => {
2259 panic!("Expected pending checkpoint, but got certified checkpoint");
2260 }
2261 CheckpointResponse {
2262 checkpoint: None,
2263 contents: _,
2264 } => {
2265 error!(
2266 "Summary for checkpoint {:?} not found on validator {:?}",
2267 local_summary.sequence_number, name
2268 );
2269 None
2270 }
2271 CheckpointResponse {
2272 checkpoint: _,
2273 contents: None,
2274 } => {
2275 error!(
2276 "Contents for checkpoint {:?} not found on validator {:?}",
2277 local_summary.sequence_number, name
2278 );
2279 None
2280 }
2281 },
2282 Err(e) => {
2283 error!(
2284 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2285 e
2286 );
2287 None
2288 }
2289 })
2290 .collect::<Vec<_>>();
2291
2292 let local_checkpoint_contents = tables
2293 .get_checkpoint_contents(&local_summary.content_digest)
2294 .unwrap_or_else(|_| {
2295 panic!(
2296 "Could not find checkpoint contents for digest {:?}",
2297 local_summary.digest()
2298 )
2299 })
2300 .unwrap_or_else(|| {
2301 panic!(
2302 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2303 local_summary.sequence_number,
2304 local_summary.digest()
2305 )
2306 });
2307 let local_contents_text = format!("{local_checkpoint_contents:?}");
2308
2309 let local_summary_text = format!("{local_summary:?}");
2310 let local_validator = state.name.concise();
2311 let diff_patches = response_data
2312 .iter()
2313 .map(|(name, other_digest, other_summary, contents)| {
2314 let other_contents_text = format!("{contents:?}");
2315 let other_summary_text = format!("{other_summary:?}");
2316 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2317 .enumerate_transactions(&local_summary)
2318 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2319 .unzip();
2320 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2321 .enumerate_transactions(other_summary)
2322 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2323 .unzip();
2324 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2325 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2326 let local_transactions_text = format!("{local_transactions:#?}");
2327 let other_transactions_text = format!("{other_transactions:#?}");
2328 let transactions_patch =
2329 create_patch(&local_transactions_text, &other_transactions_text);
2330 let local_effects_text = format!("{local_effects:#?}");
2331 let other_effects_text = format!("{other_effects:#?}");
2332 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2333 let seq_number = local_summary.sequence_number;
2334 let local_digest = local_summary.digest();
2335 let other_validator = name.concise();
2336 format!(
2337 "Checkpoint: {seq_number:?}\n\
2338 Local validator (original): {local_validator:?}, digest: {local_digest}\n\
2339 Other validator (modified): {other_validator:?}, digest: {other_digest}\n\n\
2340 Summary Diff: \n{summary_patch}\n\n\
2341 Contents Diff: \n{contents_patch}\n\n\
2342 Transactions Diff: \n{transactions_patch}\n\n\
2343 Effects Diff: \n{effects_patch}",
2344 )
2345 })
2346 .collect::<Vec<_>>()
2347 .join("\n\n\n");
2348
2349 let header = format!(
2350 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2351 Datetime: {time:?}"
2352 );
2353 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2354 let checkpoint_fork_dir = iota_common::tempdir().keep();
2355 let checkpoint_fork_file_path = checkpoint_fork_dir.join(Path::new("checkpoint_fork_dump.txt"));
2356 let mut file = File::create(checkpoint_fork_file_path).unwrap();
2357 write!(file, "{fork_logs_text}").unwrap();
2358 debug!("{}", fork_logs_text);
2359
2360 fail_point!("split_brain_reached");
2361}
2362
2363pub trait CheckpointServiceNotify {
2364 fn notify_checkpoint_signature(
2365 &self,
2366 epoch_store: &AuthorityPerEpochStore,
2367 info: &CheckpointSignatureMessage,
2368 ) -> IotaResult;
2369
2370 fn notify_checkpoint(&self) -> IotaResult;
2371}
2372
2373enum CheckpointServiceState {
2374 Unstarted(
2375 Box<(
2376 CheckpointBuilder,
2377 CheckpointAggregator,
2378 CheckpointStateHasher,
2379 )>,
2380 ),
2381 Started,
2382}
2383
2384impl CheckpointServiceState {
2385 fn take_unstarted(
2386 &mut self,
2387 ) -> (
2388 CheckpointBuilder,
2389 CheckpointAggregator,
2390 CheckpointStateHasher,
2391 ) {
2392 let mut state = CheckpointServiceState::Started;
2393 std::mem::swap(self, &mut state);
2394
2395 match state {
2396 CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1, tup.2),
2397 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2398 }
2399 }
2400}
2401
2402pub struct CheckpointService {
2403 tables: Arc<CheckpointStore>,
2404 notify_builder: Arc<Notify>,
2405 notify_aggregator: Arc<Notify>,
2406 last_signature_index: Mutex<u64>,
2407 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2409 highest_previously_built_seq: CheckpointSequenceNumber,
2412 metrics: Arc<CheckpointMetrics>,
2413 state: Mutex<CheckpointServiceState>,
2414}
2415
2416impl CheckpointService {
2417 pub fn build(
2421 state: Arc<AuthorityState>,
2422 checkpoint_store: Arc<CheckpointStore>,
2423 epoch_store: Arc<AuthorityPerEpochStore>,
2424 effects_store: Arc<dyn TransactionCacheRead>,
2425 global_state_hasher: Weak<GlobalStateHasher>,
2426 checkpoint_output: Box<dyn CheckpointOutput>,
2427 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2428 metrics: Arc<CheckpointMetrics>,
2429 max_transactions_per_checkpoint: usize,
2430 max_checkpoint_size_bytes: usize,
2431 ) -> Arc<Self> {
2432 info!(
2433 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2434 );
2435 let notify_builder = Arc::new(Notify::new());
2436 let notify_aggregator = Arc::new(Notify::new());
2437
2438 let highest_previously_built_seq = checkpoint_store
2440 .get_latest_locally_computed_checkpoint()
2441 .expect("failed to get latest locally computed checkpoint")
2442 .map(|s| s.sequence_number)
2443 .unwrap_or(0);
2444
2445 let highest_currently_built_seq =
2446 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2447 .expect("epoch should not have ended")
2448 .map(|(seq, _)| seq)
2449 .unwrap_or(0);
2450
2451 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2452
2453 let aggregator = CheckpointAggregator::new(
2454 checkpoint_store.clone(),
2455 epoch_store.clone(),
2456 notify_aggregator.clone(),
2457 certified_checkpoint_output,
2458 state.clone(),
2459 metrics.clone(),
2460 );
2461
2462 let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
2463
2464 let ckpt_state_hasher = CheckpointStateHasher::new(
2465 epoch_store.clone(),
2466 global_state_hasher.clone(),
2467 receive_from_builder,
2468 );
2469
2470 let builder = CheckpointBuilder::new(
2471 state,
2472 checkpoint_store.clone(),
2473 epoch_store.clone(),
2474 notify_builder.clone(),
2475 effects_store,
2476 global_state_hasher,
2477 send_to_hasher,
2478 checkpoint_output,
2479 notify_aggregator.clone(),
2480 highest_currently_built_seq_tx.clone(),
2481 metrics.clone(),
2482 max_transactions_per_checkpoint,
2483 max_checkpoint_size_bytes,
2484 );
2485
2486 let last_signature_index = epoch_store
2487 .get_last_checkpoint_signature_index()
2488 .expect("should not cross end of epoch");
2489 let last_signature_index = Mutex::new(last_signature_index);
2490
2491 Arc::new(Self {
2492 tables: checkpoint_store,
2493 notify_builder,
2494 notify_aggregator,
2495 last_signature_index,
2496 highest_currently_built_seq_tx,
2497 highest_previously_built_seq,
2498 metrics,
2499 state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2500 builder,
2501 aggregator,
2502 ckpt_state_hasher,
2503 )))),
2504 })
2505 }
2506
2507 pub async fn spawn(&self) -> JoinSet<()> {
2516 let mut tasks = JoinSet::new();
2517
2518 let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
2519 tasks.spawn(monitored_future!(builder.run()));
2520 tasks.spawn(monitored_future!(aggregator.run()));
2521 tasks.spawn(monitored_future!(state_hasher.run()));
2522
2523 if tokio::time::timeout(
2529 Duration::from_secs(120),
2530 self.wait_for_rebuilt_checkpoints(),
2531 )
2532 .await
2533 .is_err()
2534 {
2535 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2536 }
2537
2538 tasks
2539 }
2540}
2541
2542impl CheckpointService {
2543 pub async fn wait_for_rebuilt_checkpoints(&self) {
2550 let highest_previously_built_seq = self.highest_previously_built_seq;
2551 let mut rx = self.highest_currently_built_seq_tx.subscribe();
2552 let mut highest_currently_built_seq = *rx.borrow_and_update();
2553 info!(
2554 "Waiting for checkpoints to be rebuilt, previously built seq: \
2555 {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2556 );
2557 loop {
2558 if highest_currently_built_seq >= highest_previously_built_seq {
2559 info!("Checkpoint rebuild complete");
2560 break;
2561 }
2562 rx.changed().await.unwrap();
2563 highest_currently_built_seq = *rx.borrow_and_update();
2564 }
2565 }
2566
2567 #[cfg(test)]
2568 fn write_and_notify_checkpoint_for_testing(
2569 &self,
2570 epoch_store: &AuthorityPerEpochStore,
2571 checkpoint: PendingCheckpoint,
2572 ) -> IotaResult {
2573 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2574
2575 let mut output = ConsensusCommitOutput::new(0);
2576 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2577 output.set_default_commit_stats_for_testing();
2578 epoch_store.push_consensus_output_for_tests(output);
2579 self.notify_checkpoint()?;
2580 Ok(())
2581 }
2582}
2583
2584impl CheckpointServiceNotify for CheckpointService {
2585 fn notify_checkpoint_signature(
2586 &self,
2587 epoch_store: &AuthorityPerEpochStore,
2588 info: &CheckpointSignatureMessage,
2589 ) -> IotaResult {
2590 let sequence = info.summary.sequence_number;
2591 let signer = info.summary.auth_sig().authority.concise();
2592
2593 if let Some(highest_verified_checkpoint) = self
2594 .tables
2595 .get_highest_verified_checkpoint()?
2596 .map(|x| *x.sequence_number())
2597 {
2598 if sequence <= highest_verified_checkpoint {
2599 trace!(
2600 checkpoint_seq = sequence,
2601 "Ignore checkpoint signature from {} - already certified", signer,
2602 );
2603 self.metrics
2604 .last_ignored_checkpoint_signature_received
2605 .set(sequence as i64);
2606 return Ok(());
2607 }
2608 }
2609 trace!(
2610 checkpoint_seq = sequence,
2611 "Received checkpoint signature, digest {} from {}",
2612 info.summary.digest(),
2613 signer,
2614 );
2615 self.metrics
2616 .last_received_checkpoint_signatures
2617 .with_label_values(&[&signer.to_string()])
2618 .set(sequence as i64);
2619 let mut index = self.last_signature_index.lock();
2623 *index += 1;
2624 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2625 self.notify_aggregator.notify_one();
2626 Ok(())
2627 }
2628
2629 fn notify_checkpoint(&self) -> IotaResult {
2630 self.notify_builder.notify_one();
2631 Ok(())
2632 }
2633}
2634
2635#[iota_macros::with_checked_arithmetic]
2636mod checked {
2637 use iota_sdk_types::GasCostSummary;
2638 use iota_types::effects::{TransactionEffects, TransactionEffectsAPI};
2639 use itertools::MultiUnzip;
2640
2641 #[expect(clippy::type_complexity)]
2642 pub fn new_gas_cost_summary_from_txn_effects<'a>(
2643 transactions: impl Iterator<Item = &'a TransactionEffects>,
2644 ) -> GasCostSummary {
2645 let (
2646 storage_costs,
2647 computation_costs,
2648 computation_costs_burned,
2649 storage_rebates,
2650 non_refundable_storage_fee,
2651 ): (Vec<u64>, Vec<u64>, Vec<u64>, Vec<u64>, Vec<u64>) = transactions
2652 .map(|e| {
2653 (
2654 e.gas_cost_summary().storage_cost,
2655 e.gas_cost_summary().computation_cost,
2656 e.gas_cost_summary().computation_cost_burned,
2657 e.gas_cost_summary().storage_rebate,
2658 e.gas_cost_summary().non_refundable_storage_fee,
2659 )
2660 })
2661 .multiunzip();
2662
2663 GasCostSummary::new(
2664 computation_costs.iter().sum(),
2665 computation_costs_burned.iter().sum(),
2666 storage_costs.iter().sum(),
2667 storage_rebates.iter().sum(),
2668 non_refundable_storage_fee.iter().sum(),
2669 )
2670 }
2671}
2672pub struct CheckpointServiceNoop {}
2674impl CheckpointServiceNotify for CheckpointServiceNoop {
2675 fn notify_checkpoint_signature(
2676 &self,
2677 _: &AuthorityPerEpochStore,
2678 _: &CheckpointSignatureMessage,
2679 ) -> IotaResult {
2680 Ok(())
2681 }
2682
2683 fn notify_checkpoint(&self) -> IotaResult {
2684 Ok(())
2685 }
2686}
2687
2688#[cfg(test)]
2689mod tests {
2690 use std::{
2691 collections::{BTreeMap, HashMap},
2692 ops::Deref,
2693 };
2694
2695 use futures::{FutureExt as _, future::BoxFuture};
2696 use iota_macros::sim_test;
2697 use iota_protocol_config::{Chain, ProtocolConfig};
2698 use iota_sdk_types::{Identifier, ObjectId, Owner, move_package::MovePackage};
2699 use iota_types::{
2700 base_types::{SequenceNumber, TransactionEffectsDigest},
2701 crypto::Signature,
2702 effects::{
2703 TransactionEffects, TransactionEffectsAPIForTesting, TransactionEffectsExt,
2704 TransactionEvents,
2705 },
2706 messages_checkpoint::SignedCheckpointSummary,
2707 object,
2708 transaction::{GenesisObject, VerifiedTransaction},
2709 };
2710 use tokio::sync::mpsc;
2711
2712 use super::*;
2713 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2714
2715 #[sim_test]
2716 pub async fn checkpoint_builder_test() {
2717 telemetry_subscribers::init_for_testing();
2718
2719 let mut protocol_config =
2720 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2721 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2722 let state = TestAuthorityBuilder::new()
2723 .with_protocol_config(protocol_config)
2724 .build()
2725 .await;
2726
2727 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2728 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2729 vec![GenesisObject::new(
2730 object::Data::Package(
2731 MovePackage::new(
2732 ObjectId::random(),
2733 SequenceNumber::default(),
2734 BTreeMap::from([(Identifier::new_unchecked("m"), vec![0u8; 40000])]),
2735 100_000,
2736 Vec::new(),
2739 BTreeMap::new(),
2742 )
2743 .unwrap(),
2744 ),
2745 Owner::Immutable,
2746 )],
2747 vec![],
2748 );
2749 for i in 0..15 {
2750 state
2751 .database_for_testing()
2752 .perpetual_tables
2753 .transactions
2754 .insert(&d(i), dummy_tx.serializable_ref())
2755 .unwrap();
2756 }
2757 for i in 15..20 {
2758 state
2759 .database_for_testing()
2760 .perpetual_tables
2761 .transactions
2762 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2763 .unwrap();
2764 }
2765
2766 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2767 commit_cert_for_test(
2768 &mut store,
2769 state.clone(),
2770 d(1),
2771 vec![d(2), d(3)],
2772 GasCostSummary::new(11, 11, 12, 11, 1),
2773 );
2774 commit_cert_for_test(
2775 &mut store,
2776 state.clone(),
2777 d(2),
2778 vec![d(3), d(4)],
2779 GasCostSummary::new(21, 21, 22, 21, 1),
2780 );
2781 commit_cert_for_test(
2782 &mut store,
2783 state.clone(),
2784 d(3),
2785 vec![],
2786 GasCostSummary::new(31, 31, 32, 31, 1),
2787 );
2788 commit_cert_for_test(
2789 &mut store,
2790 state.clone(),
2791 d(4),
2792 vec![],
2793 GasCostSummary::new(41, 41, 42, 41, 1),
2794 );
2795 for i in [5, 6, 7, 10, 11, 12, 13] {
2796 commit_cert_for_test(
2797 &mut store,
2798 state.clone(),
2799 d(i),
2800 vec![],
2801 GasCostSummary::new(41, 41, 42, 41, 1),
2802 );
2803 }
2804 for i in [15, 16, 17] {
2805 commit_cert_for_test(
2806 &mut store,
2807 state.clone(),
2808 d(i),
2809 vec![],
2810 GasCostSummary::new(51, 51, 52, 51, 1),
2811 );
2812 }
2813 let all_digests: Vec<_> = store.keys().copied().collect();
2814 for digest in all_digests {
2815 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2816 state
2817 .epoch_store_for_testing()
2818 .test_insert_user_signature(digest, vec![signature]);
2819 }
2820
2821 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2822 let (certified_output, mut certified_result) =
2823 mpsc::channel::<CertifiedCheckpointSummary>(10);
2824 let store = Arc::new(store);
2825
2826 let tmp_dir = iota_common::tempdir();
2827 let checkpoint_store = CheckpointStore::new(tmp_dir.path());
2828 let epoch_store = state.epoch_store_for_testing();
2829
2830 let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
2831 state.get_global_state_hash_store().clone(),
2832 ));
2833
2834 let checkpoint_service = CheckpointService::build(
2835 state.clone(),
2836 checkpoint_store,
2837 epoch_store.clone(),
2838 store,
2839 Arc::downgrade(&global_state_hasher),
2840 Box::new(output),
2841 Box::new(certified_output),
2842 CheckpointMetrics::new_for_tests(),
2843 3,
2844 100_000,
2845 );
2846 let _tasks = checkpoint_service.spawn().await;
2847
2848 checkpoint_service
2849 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2850 .unwrap();
2851 checkpoint_service
2852 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2853 .unwrap();
2854 checkpoint_service
2855 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2856 .unwrap();
2857 checkpoint_service
2858 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2859 .unwrap();
2860 checkpoint_service
2861 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2862 .unwrap();
2863 checkpoint_service
2864 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2865 .unwrap();
2866
2867 let (c1c, c1s) = result.recv().await.unwrap();
2868 let (c2c, c2s) = result.recv().await.unwrap();
2869
2870 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2871 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2872 assert_eq!(c1t, vec![d(4)]);
2873 assert_eq!(c1s.previous_digest, None);
2874 assert_eq!(c1s.sequence_number, 0);
2875 assert_eq!(
2876 c1s.epoch_rolling_gas_cost_summary,
2877 GasCostSummary::new(41, 41, 42, 41, 1)
2878 );
2879
2880 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2881 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2882 assert_eq!(c2s.sequence_number, 1);
2883 assert_eq!(
2884 c2s.epoch_rolling_gas_cost_summary,
2885 GasCostSummary::new(104, 104, 108, 104, 4)
2886 );
2887
2888 let (c3c, c3s) = result.recv().await.unwrap();
2891 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2892 let (c4c, c4s) = result.recv().await.unwrap();
2893 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2894 assert_eq!(c3s.sequence_number, 2);
2895 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2896 assert_eq!(c4s.sequence_number, 3);
2897 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2898 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2899 assert_eq!(c4t, vec![d(13)]);
2900
2901 let (c5c, c5s) = result.recv().await.unwrap();
2904 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2905 let (c6c, c6s) = result.recv().await.unwrap();
2906 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2907 assert_eq!(c5s.sequence_number, 4);
2908 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2909 assert_eq!(c6s.sequence_number, 5);
2910 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2911 assert_eq!(c5t, vec![d(15), d(16)]);
2912 assert_eq!(c6t, vec![d(17)]);
2913
2914 let (c7c, c7s) = result.recv().await.unwrap();
2917 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2918 assert_eq!(c7t, vec![d(5), d(6)]);
2919 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2920 assert_eq!(c7s.sequence_number, 6);
2921
2922 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2923 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2924
2925 checkpoint_service
2926 .notify_checkpoint_signature(
2927 &epoch_store,
2928 &CheckpointSignatureMessage { summary: c2ss },
2929 )
2930 .unwrap();
2931 checkpoint_service
2932 .notify_checkpoint_signature(
2933 &epoch_store,
2934 &CheckpointSignatureMessage { summary: c1ss },
2935 )
2936 .unwrap();
2937
2938 let c1sc = certified_result.recv().await.unwrap();
2939 let c2sc = certified_result.recv().await.unwrap();
2940 assert_eq!(c1sc.sequence_number, 0);
2941 assert_eq!(c2sc.sequence_number, 1);
2942 }
2943
2944 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2945 fn try_notify_read_executed_effects(
2946 &self,
2947 digests: &[TransactionDigest],
2948 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2949 std::future::ready(Ok(digests
2950 .iter()
2951 .map(|d| self.get(d).expect("effects not found").clone())
2952 .collect()))
2953 .boxed()
2954 }
2955
2956 fn try_notify_read_executed_effects_digests(
2957 &self,
2958 digests: &[TransactionDigest],
2959 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2960 std::future::ready(Ok(digests
2961 .iter()
2962 .map(|d| {
2963 self.get(d)
2964 .map(|fx| fx.digest())
2965 .expect("effects not found")
2966 })
2967 .collect()))
2968 .boxed()
2969 }
2970
2971 fn try_multi_get_executed_effects(
2972 &self,
2973 digests: &[TransactionDigest],
2974 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2975 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2976 }
2977
2978 fn try_multi_get_transaction_blocks(
2985 &self,
2986 _: &[TransactionDigest],
2987 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2988 unimplemented!()
2989 }
2990
2991 fn try_multi_get_executed_effects_digests(
2992 &self,
2993 _: &[TransactionDigest],
2994 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2995 unimplemented!()
2996 }
2997
2998 fn try_multi_get_effects(
2999 &self,
3000 _: &[TransactionEffectsDigest],
3001 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
3002 unimplemented!()
3003 }
3004
3005 fn try_multi_get_events(
3006 &self,
3007 _: &[TransactionDigest],
3008 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
3009 unimplemented!()
3010 }
3011 }
3012
3013 #[async_trait::async_trait]
3014 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3015 async fn checkpoint_created(
3016 &self,
3017 summary: &CheckpointSummary,
3018 contents: &CheckpointContents,
3019 _epoch_store: &Arc<AuthorityPerEpochStore>,
3020 _checkpoint_store: &Arc<CheckpointStore>,
3021 ) -> IotaResult {
3022 self.try_send((contents.clone(), summary.clone())).unwrap();
3023 Ok(())
3024 }
3025 }
3026
3027 #[async_trait::async_trait]
3028 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3029 async fn certified_checkpoint_created(
3030 &self,
3031 summary: &CertifiedCheckpointSummary,
3032 ) -> IotaResult {
3033 self.try_send(summary.clone()).unwrap();
3034 Ok(())
3035 }
3036 }
3037
3038 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3039 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
3040 roots: t
3041 .into_iter()
3042 .map(|t| TransactionKey::Digest(d(t)))
3043 .collect(),
3044 details: PendingCheckpointInfo {
3045 timestamp_ms,
3046 last_of_epoch: false,
3047 checkpoint_height: i,
3048 },
3049 })
3050 }
3051
3052 fn d(i: u8) -> TransactionDigest {
3053 let mut bytes: [u8; 32] = Default::default();
3054 bytes[0] = i;
3055 TransactionDigest::new(bytes)
3056 }
3057
3058 fn e(
3059 transaction_digest: TransactionDigest,
3060 dependencies: Vec<TransactionDigest>,
3061 gas_cost_summary: GasCostSummary,
3062 ) -> TransactionEffects {
3063 let mut effects = TransactionEffects::new_empty_v1(transaction_digest);
3064 *effects.dependencies_mut_for_testing() = dependencies;
3065 *effects.gas_cost_summary_mut_for_testing() = gas_cost_summary;
3066 effects
3067 }
3068
3069 fn commit_cert_for_test(
3070 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3071 state: Arc<AuthorityState>,
3072 digest: TransactionDigest,
3073 dependencies: Vec<TransactionDigest>,
3074 gas_cost_summary: GasCostSummary,
3075 ) {
3076 let epoch_store = state.epoch_store_for_testing();
3077 let effects = e(digest, dependencies, gas_cost_summary);
3078 store.insert(digest, effects);
3079 epoch_store
3080 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
3081 .expect("Inserting cert fx and sigs should not fail");
3082 }
3083}