1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27 committee::StakeUnit,
28 crypto::AuthorityStrongQuorumSignInfo,
29 digests::{CheckpointContentsDigest, CheckpointDigest},
30 effects::{TransactionEffects, TransactionEffectsAPI},
31 error::{IotaError, IotaResult},
32 event::SystemEpochInfoEvent,
33 executable_transaction::VerifiedExecutableTransaction,
34 gas::GasCostSummary,
35 iota_system_state::{
36 IotaSystemState, IotaSystemStateTrait,
37 epoch_start_iota_system_state::EpochStartSystemStateTrait,
38 },
39 message_envelope::Message,
40 messages_checkpoint::{
41 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
42 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
43 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
44 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
45 VerifiedCheckpointContents,
46 },
47 messages_consensus::ConsensusTransactionKey,
48 signature::GenericSignature,
49 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
50};
51use itertools::Itertools;
52use nonempty::NonEmpty;
53use parking_lot::Mutex;
54use rand::{rngs::OsRng, seq::SliceRandom};
55use serde::{Deserialize, Serialize};
56use tokio::{
57 sync::{Notify, watch},
58 task::JoinSet,
59 time::timeout,
60};
61use tracing::{debug, error, info, instrument, trace, warn};
62use typed_store::{
63 DBMapUtils, Map, TypedStoreError,
64 rocks::{DBMap, MetricConf},
65 traits::{TableSummary, TypedStoreDebug},
66};
67
68pub use crate::checkpoints::{
69 checkpoint_output::{
70 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
71 },
72 metrics::CheckpointMetrics,
73};
74use crate::{
75 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
76 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
77 checkpoints::{
78 causal_order::CausalOrder,
79 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
80 },
81 consensus_handler::SequencedConsensusTransactionKey,
82 execution_cache::TransactionCacheRead,
83 stake_aggregator::{InsertResult, MultiStakeAggregator},
84 state_accumulator::StateAccumulator,
85};
86
87pub type CheckpointHeight = u64;
88
89pub struct EpochStats {
90 pub checkpoint_count: u64,
91 pub transaction_count: u64,
92 pub total_gas_reward: u64,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct PendingCheckpointInfo {
97 pub timestamp_ms: CheckpointTimestamp,
98 pub last_of_epoch: bool,
99 pub checkpoint_height: CheckpointHeight,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub enum PendingCheckpoint {
104 V1(PendingCheckpointContentsV1),
106}
107
108#[derive(Clone, Debug, Serialize, Deserialize)]
109pub struct PendingCheckpointContentsV1 {
110 pub roots: Vec<TransactionKey>,
111 pub details: PendingCheckpointInfo,
112}
113
114impl PendingCheckpoint {
115 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
116 match self {
117 PendingCheckpoint::V1(contents) => contents,
118 }
119 }
120
121 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
122 match self {
123 PendingCheckpoint::V1(contents) => contents,
124 }
125 }
126
127 pub fn roots(&self) -> &Vec<TransactionKey> {
128 &self.as_v1().roots
129 }
130
131 pub fn details(&self) -> &PendingCheckpointInfo {
132 &self.as_v1().details
133 }
134
135 pub fn height(&self) -> CheckpointHeight {
136 self.details().checkpoint_height
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141pub struct BuilderCheckpointSummary {
142 pub summary: CheckpointSummary,
143 pub checkpoint_height: Option<CheckpointHeight>,
145 pub position_in_commit: usize,
146}
147
148#[derive(DBMapUtils)]
149pub struct CheckpointStoreTables {
150 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
152
153 pub(crate) checkpoint_sequence_by_contents_digest:
155 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
156
157 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
161
162 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
164 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
166
167 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
171
172 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
175
176 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
179}
180
181impl CheckpointStoreTables {
182 pub fn new(path: &Path, metric_name: &'static str) -> Self {
183 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
184 }
185 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
186 Self::get_read_only_handle(
187 path.to_path_buf(),
188 None,
189 None,
190 MetricConf::new("checkpoint_readonly"),
191 )
192 }
193}
194
195pub struct CheckpointStore {
196 pub(crate) tables: CheckpointStoreTables,
197 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
198 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
199}
200
201impl CheckpointStore {
202 pub fn new(path: &Path) -> Arc<Self> {
203 let tables = CheckpointStoreTables::new(path, "checkpoint");
204 Arc::new(Self {
205 tables,
206 synced_checkpoint_notify_read: NotifyRead::new(),
207 executed_checkpoint_notify_read: NotifyRead::new(),
208 })
209 }
210
211 pub fn new_for_tests() -> Arc<Self> {
212 let ckpt_dir = tempfile::tempdir().unwrap();
213 CheckpointStore::new(ckpt_dir.path())
214 }
215
216 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
217 let tables = CheckpointStoreTables::new(path, "db_checkpoint");
218 Arc::new(Self {
219 tables,
220 synced_checkpoint_notify_read: NotifyRead::new(),
221 executed_checkpoint_notify_read: NotifyRead::new(),
222 })
223 }
224
225 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
226 CheckpointStoreTables::open_readonly(path)
227 }
228
229 #[instrument(level = "info", skip_all)]
230 pub fn insert_genesis_checkpoint(
231 &self,
232 checkpoint: VerifiedCheckpoint,
233 contents: CheckpointContents,
234 epoch_store: &AuthorityPerEpochStore,
235 ) {
236 assert_eq!(
237 checkpoint.epoch(),
238 0,
239 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
240 );
241 assert_eq!(
242 *checkpoint.sequence_number(),
243 0,
244 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
245 );
246
247 if self
250 .get_checkpoint_by_digest(checkpoint.digest())
251 .unwrap()
252 .is_none()
253 {
254 if epoch_store.epoch() == checkpoint.epoch {
255 epoch_store
256 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
257 .unwrap();
258 } else {
259 debug!(
260 validator_epoch =% epoch_store.epoch(),
261 genesis_epoch =% checkpoint.epoch(),
262 "Not inserting checkpoint builder data for genesis checkpoint",
263 );
264 }
265 self.insert_checkpoint_contents(contents).unwrap();
266 self.insert_verified_checkpoint(&checkpoint).unwrap();
267 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
268 }
269 }
270
271 pub fn get_checkpoint_by_digest(
272 &self,
273 digest: &CheckpointDigest,
274 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
275 self.tables
276 .checkpoint_by_digest
277 .get(digest)
278 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
279 }
280
281 pub fn get_checkpoint_by_sequence_number(
282 &self,
283 sequence_number: CheckpointSequenceNumber,
284 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
285 self.tables
286 .certified_checkpoints
287 .get(&sequence_number)
288 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
289 }
290
291 pub fn get_locally_computed_checkpoint(
292 &self,
293 sequence_number: CheckpointSequenceNumber,
294 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
295 self.tables
296 .locally_computed_checkpoints
297 .get(&sequence_number)
298 }
299
300 pub fn get_sequence_number_by_contents_digest(
301 &self,
302 digest: &CheckpointContentsDigest,
303 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
304 self.tables
305 .checkpoint_sequence_by_contents_digest
306 .get(digest)
307 }
308
309 pub fn delete_contents_digest_sequence_number_mapping(
310 &self,
311 digest: &CheckpointContentsDigest,
312 ) -> Result<(), TypedStoreError> {
313 self.tables
314 .checkpoint_sequence_by_contents_digest
315 .remove(digest)
316 }
317
318 pub fn get_latest_certified_checkpoint(
319 &self,
320 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
321 Ok(self
322 .tables
323 .certified_checkpoints
324 .reversed_safe_iter_with_bounds(None, None)?
325 .next()
326 .transpose()?
327 .map(|(_, v)| v.into()))
328 }
329
330 pub fn get_latest_locally_computed_checkpoint(
331 &self,
332 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
333 Ok(self
334 .tables
335 .locally_computed_checkpoints
336 .reversed_safe_iter_with_bounds(None, None)?
337 .next()
338 .transpose()?
339 .map(|(_, v)| v))
340 }
341
342 pub fn multi_get_checkpoint_by_sequence_number(
343 &self,
344 sequence_numbers: &[CheckpointSequenceNumber],
345 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
346 let checkpoints = self
347 .tables
348 .certified_checkpoints
349 .multi_get(sequence_numbers)?
350 .into_iter()
351 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
352 .collect();
353
354 Ok(checkpoints)
355 }
356
357 pub fn multi_get_checkpoint_content(
358 &self,
359 contents_digest: &[CheckpointContentsDigest],
360 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
361 self.tables.checkpoint_content.multi_get(contents_digest)
362 }
363
364 pub fn get_highest_verified_checkpoint(
365 &self,
366 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
367 let highest_verified = if let Some(highest_verified) = self
368 .tables
369 .watermarks
370 .get(&CheckpointWatermark::HighestVerified)?
371 {
372 highest_verified
373 } else {
374 return Ok(None);
375 };
376 self.get_checkpoint_by_digest(&highest_verified.1)
377 }
378
379 pub fn get_highest_synced_checkpoint(
380 &self,
381 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
382 let highest_synced = if let Some(highest_synced) = self
383 .tables
384 .watermarks
385 .get(&CheckpointWatermark::HighestSynced)?
386 {
387 highest_synced
388 } else {
389 return Ok(None);
390 };
391 self.get_checkpoint_by_digest(&highest_synced.1)
392 }
393
394 pub fn get_highest_synced_checkpoint_seq_number(
395 &self,
396 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
397 if let Some(highest_synced) = self
398 .tables
399 .watermarks
400 .get(&CheckpointWatermark::HighestSynced)?
401 {
402 Ok(Some(highest_synced.0))
403 } else {
404 Ok(None)
405 }
406 }
407
408 pub fn get_highest_executed_checkpoint_seq_number(
409 &self,
410 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
411 if let Some(highest_executed) = self
412 .tables
413 .watermarks
414 .get(&CheckpointWatermark::HighestExecuted)?
415 {
416 Ok(Some(highest_executed.0))
417 } else {
418 Ok(None)
419 }
420 }
421
422 pub fn get_highest_executed_checkpoint(
423 &self,
424 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
425 let highest_executed = if let Some(highest_executed) = self
426 .tables
427 .watermarks
428 .get(&CheckpointWatermark::HighestExecuted)?
429 {
430 highest_executed
431 } else {
432 return Ok(None);
433 };
434 self.get_checkpoint_by_digest(&highest_executed.1)
435 }
436
437 pub fn get_highest_pruned_checkpoint_seq_number(
438 &self,
439 ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
440 Ok(self
441 .tables
442 .watermarks
443 .get(&CheckpointWatermark::HighestPruned)?
444 .unwrap_or_default()
445 .0)
446 }
447
448 pub fn get_checkpoint_contents(
449 &self,
450 digest: &CheckpointContentsDigest,
451 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
452 self.tables.checkpoint_content.get(digest)
453 }
454
455 pub fn get_full_checkpoint_contents_by_sequence_number(
456 &self,
457 seq: CheckpointSequenceNumber,
458 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
459 self.tables.full_checkpoint_content.get(&seq)
460 }
461
462 fn prune_local_summaries(&self) -> IotaResult {
463 if let Some((last_local_summary, _)) = self
464 .tables
465 .locally_computed_checkpoints
466 .reversed_safe_iter_with_bounds(None, None)?
467 .next()
468 .transpose()?
469 {
470 let mut batch = self.tables.locally_computed_checkpoints.batch();
471 batch.schedule_delete_range(
472 &self.tables.locally_computed_checkpoints,
473 &0,
474 &last_local_summary,
475 )?;
476 batch.write()?;
477 info!("Pruned local summaries up to {:?}", last_local_summary);
478 }
479 Ok(())
480 }
481
482 #[instrument(level = "trace", skip_all)]
483 fn check_for_checkpoint_fork(
484 &self,
485 local_checkpoint: &CheckpointSummary,
486 verified_checkpoint: &VerifiedCheckpoint,
487 ) {
488 if local_checkpoint != verified_checkpoint.data() {
489 let verified_contents = self
490 .get_checkpoint_contents(&verified_checkpoint.content_digest)
491 .map(|opt_contents| {
492 opt_contents
493 .map(|contents| format!("{contents:?}"))
494 .unwrap_or_else(|| {
495 format!(
496 "Verified checkpoint contents not found, digest: {:?}",
497 verified_checkpoint.content_digest,
498 )
499 })
500 })
501 .map_err(|e| {
502 format!(
503 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
504 verified_checkpoint.content_digest, e
505 )
506 })
507 .unwrap_or_else(|err_msg| err_msg);
508
509 let local_contents = self
510 .get_checkpoint_contents(&local_checkpoint.content_digest)
511 .map(|opt_contents| {
512 opt_contents
513 .map(|contents| format!("{contents:?}"))
514 .unwrap_or_else(|| {
515 format!(
516 "Local checkpoint contents not found, digest: {:?}",
517 local_checkpoint.content_digest
518 )
519 })
520 })
521 .map_err(|e| {
522 format!(
523 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
524 local_checkpoint.content_digest, e
525 )
526 })
527 .unwrap_or_else(|err_msg| err_msg);
528
529 error!(
531 verified_checkpoint = ?verified_checkpoint.data(),
532 ?verified_contents,
533 ?local_checkpoint,
534 ?local_contents,
535 "Local checkpoint fork detected!",
536 );
537 fatal!(
538 "Local checkpoint fork detected for sequence number: {}",
539 local_checkpoint.sequence_number()
540 );
541 }
542 }
543
544 pub fn insert_certified_checkpoint(
550 &self,
551 checkpoint: &VerifiedCheckpoint,
552 ) -> Result<(), TypedStoreError> {
553 debug!(
554 checkpoint_seq = checkpoint.sequence_number(),
555 "Inserting certified checkpoint",
556 );
557 let mut batch = self.tables.certified_checkpoints.batch();
558 batch
559 .insert_batch(
560 &self.tables.certified_checkpoints,
561 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
562 )?
563 .insert_batch(
564 &self.tables.checkpoint_by_digest,
565 [(checkpoint.digest(), checkpoint.serializable_ref())],
566 )?;
567 if checkpoint.next_epoch_committee().is_some() {
568 batch.insert_batch(
569 &self.tables.epoch_last_checkpoint_map,
570 [(&checkpoint.epoch(), checkpoint.sequence_number())],
571 )?;
572 }
573 batch.write()?;
574
575 if let Some(local_checkpoint) = self
576 .tables
577 .locally_computed_checkpoints
578 .get(checkpoint.sequence_number())?
579 {
580 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
581 }
582
583 Ok(())
584 }
585
586 #[instrument(level = "trace", skip_all)]
589 pub fn insert_verified_checkpoint(
590 &self,
591 checkpoint: &VerifiedCheckpoint,
592 ) -> Result<(), TypedStoreError> {
593 self.insert_certified_checkpoint(checkpoint)?;
594 self.update_highest_verified_checkpoint(checkpoint)
595 }
596
597 pub fn update_highest_verified_checkpoint(
598 &self,
599 checkpoint: &VerifiedCheckpoint,
600 ) -> Result<(), TypedStoreError> {
601 if Some(*checkpoint.sequence_number())
602 > self
603 .get_highest_verified_checkpoint()?
604 .map(|x| *x.sequence_number())
605 {
606 debug!(
607 checkpoint_seq = checkpoint.sequence_number(),
608 "Updating highest verified checkpoint",
609 );
610 self.tables.watermarks.insert(
611 &CheckpointWatermark::HighestVerified,
612 &(*checkpoint.sequence_number(), *checkpoint.digest()),
613 )?;
614 }
615
616 Ok(())
617 }
618
619 pub fn update_highest_synced_checkpoint(
620 &self,
621 checkpoint: &VerifiedCheckpoint,
622 ) -> Result<(), TypedStoreError> {
623 let seq = *checkpoint.sequence_number();
624 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
625 self.tables.watermarks.insert(
626 &CheckpointWatermark::HighestSynced,
627 &(seq, *checkpoint.digest()),
628 )?;
629 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
630 Ok(())
631 }
632
633 async fn notify_read_checkpoint_watermark<F>(
634 &self,
635 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
636 seq: CheckpointSequenceNumber,
637 get_watermark: F,
638 ) -> VerifiedCheckpoint
639 where
640 F: Fn() -> Option<CheckpointSequenceNumber>,
641 {
642 type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
643
644 notify_read
645 .read(&[seq], |seqs| {
646 let seq = seqs[0];
647 let Some(highest) = get_watermark() else {
648 return Ok(vec![None]) as ReadResult;
649 };
650 if highest < seq {
651 return Ok(vec![None]) as ReadResult;
652 }
653 let checkpoint = self
654 .get_checkpoint_by_sequence_number(seq)
655 .expect("db error")
656 .expect("checkpoint not found");
657 Ok(vec![Some(checkpoint)]) as ReadResult
658 })
659 .await
660 .unwrap()
661 .into_iter()
662 .next()
663 .unwrap()
664 }
665
666 pub async fn notify_read_synced_checkpoint(
667 &self,
668 seq: CheckpointSequenceNumber,
669 ) -> VerifiedCheckpoint {
670 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
671 self.get_highest_synced_checkpoint_seq_number()
672 .expect("db error")
673 })
674 .await
675 }
676
677 pub async fn notify_read_executed_checkpoint(
678 &self,
679 seq: CheckpointSequenceNumber,
680 ) -> VerifiedCheckpoint {
681 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
682 self.get_highest_executed_checkpoint_seq_number()
683 .expect("db error")
684 })
685 .await
686 }
687
688 pub fn update_highest_executed_checkpoint(
689 &self,
690 checkpoint: &VerifiedCheckpoint,
691 ) -> Result<(), TypedStoreError> {
692 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
693 if seq_number >= *checkpoint.sequence_number() {
694 return Ok(());
695 }
696 assert_eq!(
697 seq_number + 1,
698 *checkpoint.sequence_number(),
699 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
700 checkpoint.sequence_number(),
701 seq_number
702 );
703 }
704 let seq = *checkpoint.sequence_number();
705 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
706 self.tables.watermarks.insert(
707 &CheckpointWatermark::HighestExecuted,
708 &(seq, *checkpoint.digest()),
709 )?;
710 self.executed_checkpoint_notify_read
711 .notify(&seq, checkpoint);
712 Ok(())
713 }
714
715 pub fn update_highest_pruned_checkpoint(
716 &self,
717 checkpoint: &VerifiedCheckpoint,
718 ) -> Result<(), TypedStoreError> {
719 self.tables.watermarks.insert(
720 &CheckpointWatermark::HighestPruned,
721 &(*checkpoint.sequence_number(), *checkpoint.digest()),
722 )
723 }
724
725 pub fn set_highest_executed_checkpoint_subtle(
731 &self,
732 checkpoint: &VerifiedCheckpoint,
733 ) -> Result<(), TypedStoreError> {
734 self.tables.watermarks.insert(
735 &CheckpointWatermark::HighestExecuted,
736 &(*checkpoint.sequence_number(), *checkpoint.digest()),
737 )
738 }
739
740 pub fn insert_checkpoint_contents(
741 &self,
742 contents: CheckpointContents,
743 ) -> Result<(), TypedStoreError> {
744 debug!(
745 checkpoint_seq = ?contents.digest(),
746 "Inserting checkpoint contents",
747 );
748 self.tables
749 .checkpoint_content
750 .insert(contents.digest(), &contents)
751 }
752
753 pub fn insert_verified_checkpoint_contents(
754 &self,
755 checkpoint: &VerifiedCheckpoint,
756 full_contents: VerifiedCheckpointContents,
757 ) -> Result<(), TypedStoreError> {
758 let mut batch = self.tables.full_checkpoint_content.batch();
759 batch.insert_batch(
760 &self.tables.checkpoint_sequence_by_contents_digest,
761 [(&checkpoint.content_digest, checkpoint.sequence_number())],
762 )?;
763 let full_contents = full_contents.into_inner();
764 batch.insert_batch(
765 &self.tables.full_checkpoint_content,
766 [(checkpoint.sequence_number(), &full_contents)],
767 )?;
768
769 let contents = full_contents.into_checkpoint_contents();
770 assert_eq!(&checkpoint.content_digest, contents.digest());
771
772 batch.insert_batch(
773 &self.tables.checkpoint_content,
774 [(contents.digest(), &contents)],
775 )?;
776
777 batch.write()
778 }
779
780 pub fn delete_full_checkpoint_contents(
781 &self,
782 seq: CheckpointSequenceNumber,
783 ) -> Result<(), TypedStoreError> {
784 self.tables.full_checkpoint_content.remove(&seq)
785 }
786
787 pub fn get_epoch_last_checkpoint(
788 &self,
789 epoch_id: EpochId,
790 ) -> IotaResult<Option<VerifiedCheckpoint>> {
791 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
792 let checkpoint = match seq {
793 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
794 None => None,
795 };
796 Ok(checkpoint)
797 }
798
799 pub fn insert_epoch_last_checkpoint(
800 &self,
801 epoch_id: EpochId,
802 checkpoint: &VerifiedCheckpoint,
803 ) -> IotaResult {
804 self.tables
805 .epoch_last_checkpoint_map
806 .insert(&epoch_id, checkpoint.sequence_number())?;
807 Ok(())
808 }
809
810 pub fn get_epoch_state_commitments(
811 &self,
812 epoch: EpochId,
813 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
814 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
815 checkpoint
816 .end_of_epoch_data
817 .as_ref()
818 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
819 .epoch_commitments
820 .clone()
821 });
822 Ok(commitments)
823 }
824
825 pub fn get_epoch_stats(
828 &self,
829 epoch: EpochId,
830 last_checkpoint: &CheckpointSummary,
831 ) -> Option<EpochStats> {
832 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
833 (0, 0)
834 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
835 (
836 checkpoint.sequence_number + 1,
837 checkpoint.network_total_transactions,
838 )
839 } else {
840 return None;
841 };
842 Some(EpochStats {
843 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
844 transaction_count: last_checkpoint.network_total_transactions
845 - prev_epoch_network_transactions,
846 total_gas_reward: last_checkpoint
847 .epoch_rolling_gas_cost_summary
848 .computation_cost,
849 })
850 }
851
852 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
853 self.tables
855 .checkpoint_content
856 .checkpoint_db(path)
857 .map_err(Into::into)
858 }
859
860 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
861 let mut wb = self.tables.watermarks.batch();
862 wb.delete_batch(
863 &self.tables.watermarks,
864 std::iter::once(CheckpointWatermark::HighestExecuted),
865 )?;
866 wb.write()?;
867 Ok(())
868 }
869
870 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
871 self.delete_highest_executed_checkpoint_test_only()?;
872 self.tables.watermarks.rocksdb.flush()?;
873 Ok(())
874 }
875
876 #[instrument(level = "debug", skip_all)]
885 pub async fn reexecute_local_checkpoints(
886 &self,
887 state: &AuthorityState,
888 epoch_store: &AuthorityPerEpochStore,
889 ) {
890 let epoch = epoch_store.epoch();
891 let highest_executed = self
892 .get_highest_executed_checkpoint_seq_number()
893 .expect("get_highest_executed_checkpoint_seq_number should not fail")
894 .unwrap_or(0);
895
896 let Ok(Some(highest_built)) = self.get_latest_locally_computed_checkpoint() else {
897 info!("no locally built checkpoints to verify");
898 return;
899 };
900
901 info!(
902 "rexecuting locally computed checkpoints for crash recovery from {} to {}",
903 highest_executed, highest_built
904 );
905
906 for seq in highest_executed + 1..=*highest_built.sequence_number() {
907 info!(?seq, "Re-executing locally computed checkpoint");
908 let Some(checkpoint) = self
909 .get_locally_computed_checkpoint(seq)
910 .expect("get_locally_computed_checkpoint should not fail")
911 else {
912 panic!("locally computed checkpoint {seq:?} not found");
913 };
914
915 let Some(contents) = self
916 .get_checkpoint_contents(&checkpoint.content_digest)
917 .expect("get_checkpoint_contents should not fail")
918 else {
919 panic!(
920 "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
921 seq, checkpoint.content_digest
922 );
923 };
924
925 let cache = state.get_transaction_cache_reader();
926
927 let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
928 let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
929 let txns = cache
930 .try_multi_get_transaction_blocks(&tx_digests)
931 .expect("try_multi_get_transaction_blocks should not fail");
932
933 let txns: Vec<_> = itertools::izip!(txns, tx_digests, fx_digests)
934 .filter_map(|(tx, digest, fx)| {
935 if let Some(tx) = tx {
936 Some((tx, fx))
937 } else {
938 info!(
939 "transaction {:?} not found during checkpoint re-execution",
940 digest
941 );
942 None
943 }
944 })
945 .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
947 .map(|(tx, fx)| {
948 (
949 VerifiedExecutableTransaction::new_from_checkpoint(
950 (*tx).clone(),
951 epoch,
952 seq,
953 ),
954 fx,
955 )
956 })
957 .collect();
958
959 let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
960
961 info!(
962 ?seq,
963 ?tx_digests,
964 "Re-executing transactions for locally built checkpoint"
965 );
966 state.enqueue_with_expected_effects_digest(txns, epoch_store);
969
970 let waiting_logger = tokio::task::spawn(async move {
974 let mut interval = tokio::time::interval(Duration::from_secs(1));
975 loop {
976 interval.tick().await;
977 warn!(?seq, "Still waiting for re-execution to complete");
978 }
979 });
980
981 cache
982 .try_notify_read_executed_effects_digests(&tx_digests)
983 .await
984 .expect("notify_read_executed_effects_digests should not fail");
985
986 waiting_logger.abort();
987 waiting_logger.await.ok();
988 info!(?seq, "Re-execution completed for locally built checkpoint");
989 }
990
991 info!("Re-execution of locally built checkpoints completed");
992 }
993}
994
995#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
996pub enum CheckpointWatermark {
997 HighestVerified,
998 HighestSynced,
999 HighestExecuted,
1000 HighestPruned,
1001}
1002
1003pub struct CheckpointBuilder {
1004 state: Arc<AuthorityState>,
1005 store: Arc<CheckpointStore>,
1006 epoch_store: Arc<AuthorityPerEpochStore>,
1007 notify: Arc<Notify>,
1008 notify_aggregator: Arc<Notify>,
1009 last_built: watch::Sender<CheckpointSequenceNumber>,
1010 effects_store: Arc<dyn TransactionCacheRead>,
1011 accumulator: Weak<StateAccumulator>,
1012 output: Box<dyn CheckpointOutput>,
1013 metrics: Arc<CheckpointMetrics>,
1014 max_transactions_per_checkpoint: usize,
1015 max_checkpoint_size_bytes: usize,
1016}
1017
1018pub struct CheckpointAggregator {
1019 store: Arc<CheckpointStore>,
1020 epoch_store: Arc<AuthorityPerEpochStore>,
1021 notify: Arc<Notify>,
1022 current: Option<CheckpointSignatureAggregator>,
1023 output: Box<dyn CertifiedCheckpointOutput>,
1024 state: Arc<AuthorityState>,
1025 metrics: Arc<CheckpointMetrics>,
1026}
1027
1028pub struct CheckpointSignatureAggregator {
1030 next_index: u64,
1031 summary: CheckpointSummary,
1032 digest: CheckpointDigest,
1033 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1035 store: Arc<CheckpointStore>,
1036 state: Arc<AuthorityState>,
1037 metrics: Arc<CheckpointMetrics>,
1038}
1039
1040impl CheckpointBuilder {
1041 fn new(
1042 state: Arc<AuthorityState>,
1043 store: Arc<CheckpointStore>,
1044 epoch_store: Arc<AuthorityPerEpochStore>,
1045 notify: Arc<Notify>,
1046 effects_store: Arc<dyn TransactionCacheRead>,
1047 accumulator: Weak<StateAccumulator>,
1048 output: Box<dyn CheckpointOutput>,
1049 notify_aggregator: Arc<Notify>,
1050 last_built: watch::Sender<CheckpointSequenceNumber>,
1051 metrics: Arc<CheckpointMetrics>,
1052 max_transactions_per_checkpoint: usize,
1053 max_checkpoint_size_bytes: usize,
1054 ) -> Self {
1055 Self {
1056 state,
1057 store,
1058 epoch_store,
1059 notify,
1060 effects_store,
1061 accumulator,
1062 output,
1063 notify_aggregator,
1064 last_built,
1065 metrics,
1066 max_transactions_per_checkpoint,
1067 max_checkpoint_size_bytes,
1068 }
1069 }
1070
1071 async fn run(mut self) {
1074 info!("Starting CheckpointBuilder");
1075 loop {
1076 self.maybe_build_checkpoints().await;
1077
1078 self.notify.notified().await;
1079 }
1080 }
1081
1082 async fn maybe_build_checkpoints(&mut self) {
1083 let _scope = monitored_scope("BuildCheckpoints");
1084
1085 let summary = self
1087 .epoch_store
1088 .last_built_checkpoint_builder_summary()
1089 .expect("epoch should not have ended");
1090 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1091 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1092
1093 let min_checkpoint_interval_ms = self
1094 .epoch_store
1095 .protocol_config()
1096 .min_checkpoint_interval_ms_as_option()
1097 .unwrap_or_default();
1098 let mut grouped_pending_checkpoints = Vec::new();
1099 let mut checkpoints_iter = self
1100 .epoch_store
1101 .get_pending_checkpoints(last_height)
1102 .expect("unexpected epoch store error")
1103 .into_iter()
1104 .peekable();
1105 while let Some((height, pending)) = checkpoints_iter.next() {
1106 let current_timestamp = pending.details().timestamp_ms;
1109 let can_build = match last_timestamp {
1110 Some(last_timestamp) => {
1111 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1112 }
1113 None => true,
1114 } || checkpoints_iter
1117 .peek()
1118 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1119 || pending.details().last_of_epoch;
1121 grouped_pending_checkpoints.push(pending);
1122 if !can_build {
1123 debug!(
1124 checkpoint_commit_height = height,
1125 ?last_timestamp,
1126 ?current_timestamp,
1127 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1128 );
1129 continue;
1130 }
1131
1132 last_height = Some(height);
1134 last_timestamp = Some(current_timestamp);
1135 debug!(
1136 checkpoint_commit_height = height,
1137 "Making checkpoint at commit height"
1138 );
1139
1140 match self
1141 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1142 .await
1143 {
1144 Ok(seq) => {
1145 self.last_built.send_if_modified(|cur| {
1146 if seq > *cur {
1148 *cur = seq;
1149 true
1150 } else {
1151 false
1152 }
1153 });
1154 }
1155 Err(e) => {
1156 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1157 tokio::time::sleep(Duration::from_secs(1)).await;
1158 self.metrics.checkpoint_errors.inc();
1159 return;
1160 }
1161 }
1162 tokio::task::yield_now().await;
1165 }
1166 debug!(
1167 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1168 grouped_pending_checkpoints.len(),
1169 );
1170 }
1171
1172 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1173 async fn make_checkpoint(
1174 &self,
1175 pendings: Vec<PendingCheckpoint>,
1176 ) -> anyhow::Result<CheckpointSequenceNumber> {
1177 let last_details = pendings.last().unwrap().details().clone();
1178
1179 let mut effects_in_current_checkpoint = BTreeSet::new();
1185
1186 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1189 for pending_checkpoint in pendings.into_iter() {
1190 let pending = pending_checkpoint.into_v1();
1191 let txn_in_checkpoint = self
1192 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1193 .await?;
1194 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1195 }
1196 let new_checkpoints = self
1197 .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1198 .await?;
1199 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1200 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1201 .await?;
1202 Ok(highest_sequence)
1203 }
1204
1205 #[instrument(level = "debug", skip_all)]
1210 async fn resolve_checkpoint_transactions(
1211 &self,
1212 roots: Vec<TransactionKey>,
1213 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1214 ) -> IotaResult<Vec<TransactionEffects>> {
1215 self.metrics
1216 .checkpoint_roots_count
1217 .inc_by(roots.len() as u64);
1218
1219 let root_digests = self
1220 .epoch_store
1221 .notify_read_executed_digests(&roots)
1222 .in_monitored_scope("CheckpointNotifyDigests")
1223 .await?;
1224 let root_effects = self
1225 .effects_store
1226 .try_notify_read_executed_effects(&root_digests)
1227 .in_monitored_scope("CheckpointNotifyRead")
1228 .await?;
1229
1230 let _scope = monitored_scope("CheckpointBuilder");
1231
1232 let consensus_commit_prologue = {
1233 let consensus_commit_prologue = self
1237 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1238 .await?;
1239
1240 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1244 let unsorted_ccp = self.complete_checkpoint_effects(
1245 vec![ccp_effects.clone()],
1246 effects_in_current_checkpoint,
1247 )?;
1248
1249 if unsorted_ccp.len() != 1 {
1252 fatal!(
1253 "Expected 1 consensus commit prologue, got {:?}",
1254 unsorted_ccp
1255 .iter()
1256 .map(|e| e.transaction_digest())
1257 .collect::<Vec<_>>()
1258 );
1259 }
1260 assert_eq!(unsorted_ccp.len(), 1);
1261 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1262 }
1263 consensus_commit_prologue
1264 };
1265
1266 let unsorted =
1267 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1268
1269 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1270 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1271 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1272 #[cfg(debug_assertions)]
1273 {
1274 for tx in unsorted.iter() {
1277 assert!(tx.transaction_digest() != &_ccp_digest);
1278 }
1279 }
1280 sorted.push(ccp_effects);
1281 }
1282 sorted.extend(CausalOrder::causal_sort(unsorted));
1283
1284 #[cfg(msim)]
1285 {
1286 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1288 }
1289
1290 Ok(sorted)
1291 }
1292
1293 async fn extract_consensus_commit_prologue(
1298 &self,
1299 root_digests: &[TransactionDigest],
1300 root_effects: &[TransactionEffects],
1301 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1302 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1303 if root_digests.is_empty() {
1304 return Ok(None);
1305 }
1306
1307 let first_tx = self
1312 .state
1313 .get_transaction_cache_reader()
1314 .try_get_transaction_block(&root_digests[0])?
1315 .expect("Transaction block must exist");
1316
1317 Ok(match first_tx.transaction_data().kind() {
1318 TransactionKind::ConsensusCommitPrologueV1(_) => {
1319 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1320 Some((*first_tx.digest(), root_effects[0].clone()))
1321 }
1322 _ => None,
1323 })
1324 }
1325
1326 #[instrument(level = "debug", skip_all)]
1328 async fn write_checkpoints(
1329 &self,
1330 height: CheckpointHeight,
1331 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1332 ) -> IotaResult {
1333 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1334 let mut batch = self.store.tables.checkpoint_content.batch();
1335 let mut all_tx_digests =
1336 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1337
1338 for (summary, contents) in &new_checkpoints {
1339 debug!(
1340 checkpoint_commit_height = height,
1341 checkpoint_seq = summary.sequence_number,
1342 contents_digest = ?contents.digest(),
1343 "writing checkpoint",
1344 );
1345
1346 if let Some(previously_computed_summary) = self
1347 .store
1348 .tables
1349 .locally_computed_checkpoints
1350 .get(&summary.sequence_number)?
1351 {
1352 if previously_computed_summary != *summary {
1353 fatal!(
1355 "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1356 summary.sequence_number,
1357 );
1358 }
1359 }
1360
1361 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1362
1363 self.metrics
1364 .transactions_included_in_checkpoint
1365 .inc_by(contents.size() as u64);
1366 let sequence_number = summary.sequence_number;
1367 self.metrics
1368 .last_constructed_checkpoint
1369 .set(sequence_number as i64);
1370
1371 batch.insert_batch(
1372 &self.store.tables.checkpoint_content,
1373 [(contents.digest(), contents)],
1374 )?;
1375
1376 batch.insert_batch(
1377 &self.store.tables.locally_computed_checkpoints,
1378 [(sequence_number, summary)],
1379 )?;
1380 }
1381
1382 batch.write()?;
1383
1384 for (summary, contents) in &new_checkpoints {
1386 self.output
1387 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1388 .await?;
1389 }
1390
1391 for (local_checkpoint, _) in &new_checkpoints {
1392 if let Some(certified_checkpoint) = self
1393 .store
1394 .tables
1395 .certified_checkpoints
1396 .get(local_checkpoint.sequence_number())?
1397 {
1398 self.store
1399 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1400 }
1401 }
1402
1403 self.notify_aggregator.notify_one();
1404 self.epoch_store
1405 .process_constructed_checkpoint(height, new_checkpoints);
1406 Ok(())
1407 }
1408
1409 #[expect(clippy::type_complexity)]
1410 fn split_checkpoint_chunks(
1411 &self,
1412 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1413 signatures: Vec<Vec<GenericSignature>>,
1414 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1415 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1416 let mut chunks = Vec::new();
1417 let mut chunk = Vec::new();
1418 let mut chunk_size: usize = 0;
1419 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1420 .into_iter()
1421 .zip(signatures.into_iter())
1422 {
1423 let size = transaction_size
1428 + bcs::serialized_size(&effects)?
1429 + bcs::serialized_size(&signatures)?;
1430 if chunk.len() == self.max_transactions_per_checkpoint
1431 || (chunk_size + size) > self.max_checkpoint_size_bytes
1432 {
1433 if chunk.is_empty() {
1434 warn!(
1436 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1437 self.max_checkpoint_size_bytes
1438 );
1439 } else {
1440 chunks.push(chunk);
1441 chunk = Vec::new();
1442 chunk_size = 0;
1443 }
1444 }
1445
1446 chunk.push((effects, signatures));
1447 chunk_size += size;
1448 }
1449
1450 if !chunk.is_empty() || chunks.is_empty() {
1451 chunks.push(chunk);
1456 }
1462 Ok(chunks)
1463 }
1464
1465 fn load_last_built_checkpoint_summary(
1468 epoch_store: &AuthorityPerEpochStore,
1469 store: &CheckpointStore,
1470 ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1471 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1472 if last_checkpoint.is_none() {
1473 let epoch = epoch_store.epoch();
1474 if epoch > 0 {
1475 let previous_epoch = epoch - 1;
1476 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1477 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1478 if let Some((ref seq, _)) = last_checkpoint {
1479 debug!(
1480 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1481 );
1482 } else {
1483 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1486 }
1487 }
1488 }
1489 Ok(last_checkpoint)
1490 }
1491
1492 #[instrument(level = "debug", skip_all)]
1493 async fn create_checkpoints(
1494 &self,
1495 all_effects: Vec<TransactionEffects>,
1496 details: &PendingCheckpointInfo,
1497 ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1498 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1499 let total = all_effects.len();
1500 let mut last_checkpoint =
1501 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1502 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1503 debug!(
1504 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1505 checkpoint_timestamp = details.timestamp_ms,
1506 "Creating checkpoint(s) for {} transactions",
1507 all_effects.len(),
1508 );
1509
1510 let all_digests: Vec<_> = all_effects
1511 .iter()
1512 .map(|effect| *effect.transaction_digest())
1513 .collect();
1514 let transactions_and_sizes = self
1515 .state
1516 .get_transaction_cache_reader()
1517 .try_get_transactions_and_serialized_sizes(&all_digests)?;
1518 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1519 let mut transactions = Vec::with_capacity(all_effects.len());
1520 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1521 let mut randomness_rounds = BTreeMap::new();
1522 {
1523 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1524 debug!(
1525 ?last_checkpoint_seq,
1526 "Waiting for {:?} certificates to appear in consensus",
1527 all_effects.len()
1528 );
1529
1530 for (effects, transaction_and_size) in all_effects
1531 .into_iter()
1532 .zip(transactions_and_sizes.into_iter())
1533 {
1534 let (transaction, size) = transaction_and_size
1535 .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1536 match transaction.inner().transaction_data().kind() {
1537 TransactionKind::ConsensusCommitPrologueV1(_)
1538 | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1539 }
1544 TransactionKind::RandomnessStateUpdate(rsu) => {
1545 randomness_rounds
1546 .insert(*effects.transaction_digest(), rsu.randomness_round);
1547 }
1548 _ => {
1549 transaction_keys.push(SequencedConsensusTransactionKey::External(
1552 ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1553 ));
1554 }
1555 }
1556 transactions.push(transaction);
1557 all_effects_and_transaction_sizes.push((effects, size));
1558 }
1559
1560 self.epoch_store
1561 .consensus_messages_processed_notify(transaction_keys)
1562 .await?;
1563 }
1564
1565 let signatures = self
1566 .epoch_store
1567 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1568 debug!(
1569 ?last_checkpoint_seq,
1570 "Received {} checkpoint user signatures from consensus",
1571 signatures.len()
1572 );
1573
1574 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1575 let chunks_count = chunks.len();
1576
1577 let mut checkpoints = Vec::with_capacity(chunks_count);
1578 debug!(
1579 ?last_checkpoint_seq,
1580 "Creating {} checkpoints with {} transactions", chunks_count, total,
1581 );
1582
1583 let epoch = self.epoch_store.epoch();
1584 for (index, transactions) in chunks.into_iter().enumerate() {
1585 let first_checkpoint_of_epoch = index == 0
1586 && last_checkpoint
1587 .as_ref()
1588 .map(|(_, c)| c.epoch != epoch)
1589 .unwrap_or(true);
1590 if first_checkpoint_of_epoch {
1591 self.epoch_store
1592 .record_epoch_first_checkpoint_creation_time_metric();
1593 }
1594 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1595
1596 let sequence_number = last_checkpoint
1597 .as_ref()
1598 .map(|(_, c)| c.sequence_number + 1)
1599 .unwrap_or_default();
1600 let mut timestamp_ms = details.timestamp_ms;
1601 if let Some((_, last_checkpoint)) = &last_checkpoint {
1602 if last_checkpoint.timestamp_ms > timestamp_ms {
1603 debug!(
1605 "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1606 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1607 );
1608 if self
1609 .epoch_store
1610 .protocol_config()
1611 .consensus_median_timestamp_with_checkpoint_enforcement()
1612 {
1613 timestamp_ms = last_checkpoint.timestamp_ms;
1614 }
1615 }
1616 }
1617
1618 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1619 let epoch_rolling_gas_cost_summary =
1620 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1621
1622 let end_of_epoch_data = if last_checkpoint_of_epoch {
1623 let (system_state_obj, system_epoch_info_event) = self
1624 .augment_epoch_last_checkpoint(
1625 &epoch_rolling_gas_cost_summary,
1626 timestamp_ms,
1627 &mut effects,
1628 &mut signatures,
1629 sequence_number,
1630 )
1631 .await?;
1632
1633 let epoch_supply_change =
1641 system_epoch_info_event.map_or(0, |event| event.supply_change());
1642
1643 let committee = system_state_obj
1644 .get_current_epoch_committee()
1645 .committee()
1646 .clone();
1647
1648 let root_state_digest = {
1651 let state_acc = self
1652 .accumulator
1653 .upgrade()
1654 .expect("No checkpoints should be getting built after local configuration");
1655 let acc = state_acc.accumulate_checkpoint(
1656 &effects,
1657 sequence_number,
1658 &self.epoch_store,
1659 )?;
1660
1661 state_acc
1662 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1663 .await?;
1664
1665 state_acc.accumulate_running_root(
1666 &self.epoch_store,
1667 sequence_number,
1668 Some(acc),
1669 )?;
1670 state_acc
1671 .digest_epoch(self.epoch_store.clone(), sequence_number)
1672 .await?
1673 };
1674 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1675 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1676
1677 let epoch_commitments = vec![root_state_digest.into()];
1678
1679 Some(EndOfEpochData {
1680 next_epoch_committee: committee.voting_rights,
1681 next_epoch_protocol_version: ProtocolVersion::new(
1682 system_state_obj.protocol_version(),
1683 ),
1684 epoch_commitments,
1685 epoch_supply_change,
1686 })
1687 } else {
1688 None
1689 };
1690 let contents = CheckpointContents::new_with_digests_and_signatures(
1691 effects.iter().map(TransactionEffects::execution_digests),
1692 signatures,
1693 );
1694
1695 let num_txns = contents.size() as u64;
1696
1697 let network_total_transactions = last_checkpoint
1698 .as_ref()
1699 .map(|(_, c)| c.network_total_transactions + num_txns)
1700 .unwrap_or(num_txns);
1701
1702 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1703
1704 let matching_randomness_rounds: Vec<_> = effects
1705 .iter()
1706 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1707 .copied()
1708 .collect();
1709
1710 let summary = CheckpointSummary::new(
1711 self.epoch_store.protocol_config(),
1712 epoch,
1713 sequence_number,
1714 network_total_transactions,
1715 &contents,
1716 previous_digest,
1717 epoch_rolling_gas_cost_summary,
1718 end_of_epoch_data,
1719 timestamp_ms,
1720 matching_randomness_rounds,
1721 );
1722 summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1723 if last_checkpoint_of_epoch {
1724 info!(
1725 checkpoint_seq = sequence_number,
1726 "creating last checkpoint of epoch {}", epoch
1727 );
1728 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1729 self.epoch_store
1730 .report_epoch_metrics_at_last_checkpoint(stats);
1731 }
1732 }
1733 last_checkpoint = Some((sequence_number, summary.clone()));
1734 checkpoints.push((summary, contents));
1735 }
1736
1737 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1738 }
1739
1740 fn get_epoch_total_gas_cost(
1741 &self,
1742 last_checkpoint: Option<&CheckpointSummary>,
1743 cur_checkpoint_effects: &[TransactionEffects],
1744 ) -> GasCostSummary {
1745 let (previous_epoch, previous_gas_costs) = last_checkpoint
1746 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1747 .unwrap_or_default();
1748 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1749 if previous_epoch == self.epoch_store.epoch() {
1750 GasCostSummary::new(
1752 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1753 previous_gas_costs.computation_cost_burned
1754 + current_gas_costs.computation_cost_burned,
1755 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1756 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1757 previous_gas_costs.non_refundable_storage_fee
1758 + current_gas_costs.non_refundable_storage_fee,
1759 )
1760 } else {
1761 current_gas_costs
1762 }
1763 }
1764
1765 #[instrument(level = "error", skip_all)]
1768 async fn augment_epoch_last_checkpoint(
1769 &self,
1770 epoch_total_gas_cost: &GasCostSummary,
1771 epoch_start_timestamp_ms: CheckpointTimestamp,
1772 checkpoint_effects: &mut Vec<TransactionEffects>,
1773 signatures: &mut Vec<Vec<GenericSignature>>,
1774 checkpoint: CheckpointSequenceNumber,
1775 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1776 let (system_state, system_epoch_info_event, effects) = self
1777 .state
1778 .create_and_execute_advance_epoch_tx(
1779 &self.epoch_store,
1780 epoch_total_gas_cost,
1781 checkpoint,
1782 epoch_start_timestamp_ms,
1783 )
1784 .await?;
1785 checkpoint_effects.push(effects);
1786 signatures.push(vec![]);
1787 Ok((system_state, system_epoch_info_event))
1788 }
1789
1790 #[instrument(level = "debug", skip_all)]
1799 fn complete_checkpoint_effects(
1800 &self,
1801 mut roots: Vec<TransactionEffects>,
1802 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1803 ) -> IotaResult<Vec<TransactionEffects>> {
1804 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1805 let mut results = vec![];
1806 let mut seen = HashSet::new();
1807 loop {
1808 let mut pending = HashSet::new();
1809
1810 let transactions_included = self
1811 .epoch_store
1812 .builder_included_transactions_in_checkpoint(
1813 roots.iter().map(|e| e.transaction_digest()),
1814 )?;
1815
1816 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1817 let digest = effect.transaction_digest();
1818 seen.insert(*digest);
1821
1822 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1824 continue;
1825 }
1826
1827 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1829 continue;
1830 }
1831
1832 let existing_effects = self
1833 .epoch_store
1834 .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1835
1836 for (dependency, effects_signature_exists) in
1837 effect.dependencies().iter().zip(existing_effects.iter())
1838 {
1839 if !effects_signature_exists {
1844 continue;
1845 }
1846 if seen.insert(*dependency) {
1847 pending.insert(*dependency);
1848 }
1849 }
1850 results.push(effect);
1851 }
1852 if pending.is_empty() {
1853 break;
1854 }
1855 let pending = pending.into_iter().collect::<Vec<_>>();
1856 let effects = self
1857 .effects_store
1858 .try_multi_get_executed_effects(&pending)?;
1859 let effects = effects
1860 .into_iter()
1861 .zip(pending)
1862 .map(|(opt, digest)| match opt {
1863 Some(x) => x,
1864 None => panic!(
1865 "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1866 ),
1867 })
1868 .collect::<Vec<_>>();
1869 roots = effects;
1870 }
1871
1872 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1873 Ok(results)
1874 }
1875
1876 #[cfg(msim)]
1879 fn expensive_consensus_commit_prologue_invariants_check(
1880 &self,
1881 root_digests: &[TransactionDigest],
1882 sorted: &[TransactionEffects],
1883 ) {
1884 let root_txs = self
1886 .state
1887 .get_transaction_cache_reader()
1888 .multi_get_transaction_blocks(root_digests);
1889 let ccps = root_txs
1890 .iter()
1891 .filter_map(|tx| {
1892 tx.as_ref().filter(|tx| {
1893 matches!(
1894 tx.transaction_data().kind(),
1895 TransactionKind::ConsensusCommitPrologueV1(_)
1896 )
1897 })
1898 })
1899 .collect::<Vec<_>>();
1900
1901 assert!(ccps.len() <= 1);
1904
1905 let txs = self
1907 .state
1908 .get_transaction_cache_reader()
1909 .multi_get_transaction_blocks(
1910 &sorted
1911 .iter()
1912 .map(|tx| *tx.transaction_digest())
1913 .collect::<Vec<_>>(),
1914 );
1915
1916 if ccps.is_empty() {
1917 for tx in txs.iter().flatten() {
1921 assert!(!matches!(
1922 tx.transaction_data().kind(),
1923 TransactionKind::ConsensusCommitPrologueV1(_)
1924 ));
1925 }
1926 } else {
1927 assert!(matches!(
1930 txs[0].as_ref().unwrap().transaction_data().kind(),
1931 TransactionKind::ConsensusCommitPrologueV1(_)
1932 ));
1933
1934 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1935
1936 for tx in txs.iter().skip(1).flatten() {
1937 assert!(!matches!(
1938 tx.transaction_data().kind(),
1939 TransactionKind::ConsensusCommitPrologueV1(_)
1940 ));
1941 }
1942 }
1943 }
1944}
1945
1946impl CheckpointAggregator {
1947 fn new(
1948 tables: Arc<CheckpointStore>,
1949 epoch_store: Arc<AuthorityPerEpochStore>,
1950 notify: Arc<Notify>,
1951 output: Box<dyn CertifiedCheckpointOutput>,
1952 state: Arc<AuthorityState>,
1953 metrics: Arc<CheckpointMetrics>,
1954 ) -> Self {
1955 let current = None;
1956 Self {
1957 store: tables,
1958 epoch_store,
1959 notify,
1960 current,
1961 output,
1962 state,
1963 metrics,
1964 }
1965 }
1966
1967 async fn run(mut self) {
1973 info!("Starting CheckpointAggregator");
1974 loop {
1975 if let Err(e) = self.run_and_notify().await {
1976 error!(
1977 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1978 e
1979 );
1980 self.metrics.checkpoint_errors.inc();
1981 tokio::time::sleep(Duration::from_secs(1)).await;
1982 continue;
1983 }
1984
1985 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1986 }
1987 }
1988
1989 async fn run_and_notify(&mut self) -> IotaResult {
1990 let summaries = self.run_inner()?;
1991 for summary in summaries {
1992 self.output.certified_checkpoint_created(&summary).await?;
1993 }
1994 Ok(())
1995 }
1996
1997 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1998 let _scope = monitored_scope("CheckpointAggregator");
1999 let mut result = vec![];
2000 'outer: loop {
2001 let next_to_certify = self.next_checkpoint_to_certify()?;
2002 let current = if let Some(current) = &mut self.current {
2003 if current.summary.sequence_number < next_to_certify {
2009 self.current = None;
2010 continue;
2011 }
2012 current
2013 } else {
2014 let Some(summary) = self
2015 .epoch_store
2016 .get_built_checkpoint_summary(next_to_certify)?
2017 else {
2018 return Ok(result);
2019 };
2020 self.current = Some(CheckpointSignatureAggregator {
2021 next_index: 0,
2022 digest: summary.digest(),
2023 summary,
2024 signatures_by_digest: MultiStakeAggregator::new(
2025 self.epoch_store.committee().clone(),
2026 ),
2027 store: self.store.clone(),
2028 state: self.state.clone(),
2029 metrics: self.metrics.clone(),
2030 });
2031 self.current.as_mut().unwrap()
2032 };
2033
2034 let epoch_tables = self
2035 .epoch_store
2036 .tables()
2037 .expect("should not run past end of epoch");
2038 let iter = epoch_tables
2039 .pending_checkpoint_signatures
2040 .safe_iter_with_bounds(
2041 Some((current.summary.sequence_number, current.next_index)),
2042 None,
2043 );
2044 for item in iter {
2045 let ((seq, index), data) = item?;
2046 if seq != current.summary.sequence_number {
2047 trace!(
2048 checkpoint_seq =? current.summary.sequence_number,
2049 "Not enough checkpoint signatures",
2050 );
2051 return Ok(result);
2053 }
2054 trace!(
2055 checkpoint_seq = current.summary.sequence_number,
2056 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2057 current.summary.digest(),
2058 data.summary.auth_sig().authority.concise()
2059 );
2060 self.metrics
2061 .checkpoint_participation
2062 .with_label_values(&[&format!(
2063 "{:?}",
2064 data.summary.auth_sig().authority.concise()
2065 )])
2066 .inc();
2067 if let Ok(auth_signature) = current.try_aggregate(data) {
2068 debug!(
2069 checkpoint_seq = current.summary.sequence_number,
2070 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2071 current.summary.digest(),
2072 );
2073 let summary = VerifiedCheckpoint::new_unchecked(
2074 CertifiedCheckpointSummary::new_from_data_and_sig(
2075 current.summary.clone(),
2076 auth_signature,
2077 ),
2078 );
2079
2080 self.store.insert_certified_checkpoint(&summary)?;
2081 self.metrics
2082 .last_certified_checkpoint
2083 .set(current.summary.sequence_number as i64);
2084 current
2085 .summary
2086 .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2087 result.push(summary.into_inner());
2088 self.current = None;
2089 continue 'outer;
2090 } else {
2091 current.next_index = index + 1;
2092 }
2093 }
2094 break;
2095 }
2096 Ok(result)
2097 }
2098
2099 fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2100 Ok(self
2101 .store
2102 .tables
2103 .certified_checkpoints
2104 .reversed_safe_iter_with_bounds(None, None)?
2105 .next()
2106 .transpose()?
2107 .map(|(seq, _)| seq + 1)
2108 .unwrap_or_default())
2109 }
2110}
2111
2112impl CheckpointSignatureAggregator {
2113 #[expect(clippy::result_unit_err)]
2114 pub fn try_aggregate(
2115 &mut self,
2116 data: CheckpointSignatureMessage,
2117 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2118 let their_digest = *data.summary.digest();
2119 let (_, signature) = data.summary.into_data_and_sig();
2120 let author = signature.authority;
2121 let envelope =
2122 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2123 match self.signatures_by_digest.insert(their_digest, envelope) {
2124 InsertResult::Failed {
2126 error:
2127 IotaError::StakeAggregatorRepeatedSigner {
2128 conflicting_sig: false,
2129 ..
2130 },
2131 } => Err(()),
2132 InsertResult::Failed { error } => {
2133 warn!(
2134 checkpoint_seq = self.summary.sequence_number,
2135 "Failed to aggregate new signature from validator {:?}: {:?}",
2136 author.concise(),
2137 error
2138 );
2139 self.check_for_split_brain();
2140 Err(())
2141 }
2142 InsertResult::QuorumReached(cert) => {
2143 if their_digest != self.digest {
2147 self.metrics.remote_checkpoint_forks.inc();
2148 warn!(
2149 checkpoint_seq = self.summary.sequence_number,
2150 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2151 author.concise(),
2152 their_digest,
2153 self.digest
2154 );
2155 return Err(());
2156 }
2157 Ok(cert)
2158 }
2159 InsertResult::NotEnoughVotes {
2160 bad_votes: _,
2161 bad_authorities: _,
2162 } => {
2163 self.check_for_split_brain();
2164 Err(())
2165 }
2166 }
2167 }
2168
2169 fn check_for_split_brain(&self) {
2174 debug!(
2175 checkpoint_seq = self.summary.sequence_number,
2176 "Checking for split brain condition"
2177 );
2178 if self.signatures_by_digest.quorum_unreachable() {
2179 let digests_by_stake_messages = self
2185 .signatures_by_digest
2186 .get_all_unique_values()
2187 .into_iter()
2188 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2189 .map(|(digest, (_authorities, total_stake))| {
2190 format!("{digest:?} (total stake: {total_stake})")
2191 })
2192 .collect::<Vec<String>>();
2193 error!(
2194 checkpoint_seq = self.summary.sequence_number,
2195 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2196 self.signatures_by_digest.uncommitted_stake(),
2197 digests_by_stake_messages,
2198 );
2199 self.metrics.split_brain_checkpoint_forks.inc();
2200
2201 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2202 let local_summary = self.summary.clone();
2203 let state = self.state.clone();
2204 let tables = self.store.clone();
2205
2206 tokio::spawn(async move {
2207 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2208 });
2209 }
2210 }
2211}
2212
2213async fn diagnose_split_brain(
2219 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2220 local_summary: CheckpointSummary,
2221 state: Arc<AuthorityState>,
2222 tables: Arc<CheckpointStore>,
2223) {
2224 debug!(
2225 checkpoint_seq = local_summary.sequence_number,
2226 "Running split brain diagnostics..."
2227 );
2228 let time = SystemTime::now();
2229 let digest_to_validator = all_unique_values
2231 .iter()
2232 .filter_map(|(digest, (validators, _))| {
2233 if *digest != local_summary.digest() {
2234 let random_validator = validators.choose(&mut OsRng).unwrap();
2235 Some((*digest, *random_validator))
2236 } else {
2237 None
2238 }
2239 })
2240 .collect::<HashMap<_, _>>();
2241 if digest_to_validator.is_empty() {
2242 panic!(
2243 "Given split brain condition, there should be at \
2244 least one validator that disagrees with local signature"
2245 );
2246 }
2247
2248 let epoch_store = state.load_epoch_store_one_call_per_task();
2249 let committee = epoch_store
2250 .epoch_start_state()
2251 .get_iota_committee_with_network_metadata();
2252 let network_config = default_iota_network_config();
2253 let network_clients =
2254 make_network_authority_clients_with_network_config(&committee, &network_config);
2255
2256 let response_futures = digest_to_validator
2258 .values()
2259 .cloned()
2260 .map(|validator| {
2261 let client = network_clients
2262 .get(&validator)
2263 .expect("Failed to get network client");
2264 let request = CheckpointRequest {
2265 sequence_number: Some(local_summary.sequence_number),
2266 request_content: true,
2267 certified: false,
2268 };
2269 client.handle_checkpoint(request)
2270 })
2271 .collect::<Vec<_>>();
2272
2273 let digest_name_pair = digest_to_validator.iter();
2274 let response_data = futures::future::join_all(response_futures)
2275 .await
2276 .into_iter()
2277 .zip(digest_name_pair)
2278 .filter_map(|(response, (digest, name))| match response {
2279 Ok(response) => match response {
2280 CheckpointResponse {
2281 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2282 contents: Some(contents),
2283 } => Some((*name, *digest, summary, contents)),
2284 CheckpointResponse {
2285 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2286 contents: _,
2287 } => {
2288 panic!("Expected pending checkpoint, but got certified checkpoint");
2289 }
2290 CheckpointResponse {
2291 checkpoint: None,
2292 contents: _,
2293 } => {
2294 error!(
2295 "Summary for checkpoint {:?} not found on validator {:?}",
2296 local_summary.sequence_number, name
2297 );
2298 None
2299 }
2300 CheckpointResponse {
2301 checkpoint: _,
2302 contents: None,
2303 } => {
2304 error!(
2305 "Contents for checkpoint {:?} not found on validator {:?}",
2306 local_summary.sequence_number, name
2307 );
2308 None
2309 }
2310 },
2311 Err(e) => {
2312 error!(
2313 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2314 e
2315 );
2316 None
2317 }
2318 })
2319 .collect::<Vec<_>>();
2320
2321 let local_checkpoint_contents = tables
2322 .get_checkpoint_contents(&local_summary.content_digest)
2323 .unwrap_or_else(|_| {
2324 panic!(
2325 "Could not find checkpoint contents for digest {:?}",
2326 local_summary.digest()
2327 )
2328 })
2329 .unwrap_or_else(|| {
2330 panic!(
2331 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2332 local_summary.sequence_number,
2333 local_summary.digest()
2334 )
2335 });
2336 let local_contents_text = format!("{local_checkpoint_contents:?}");
2337
2338 let local_summary_text = format!("{local_summary:?}");
2339 let local_validator = state.name.concise();
2340 let diff_patches = response_data
2341 .iter()
2342 .map(|(name, other_digest, other_summary, contents)| {
2343 let other_contents_text = format!("{contents:?}");
2344 let other_summary_text = format!("{other_summary:?}");
2345 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2346 .enumerate_transactions(&local_summary)
2347 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2348 .unzip();
2349 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2350 .enumerate_transactions(other_summary)
2351 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2352 .unzip();
2353 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2354 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2355 let local_transactions_text = format!("{local_transactions:#?}");
2356 let other_transactions_text = format!("{other_transactions:#?}");
2357 let transactions_patch =
2358 create_patch(&local_transactions_text, &other_transactions_text);
2359 let local_effects_text = format!("{local_effects:#?}");
2360 let other_effects_text = format!("{other_effects:#?}");
2361 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2362 let seq_number = local_summary.sequence_number;
2363 let local_digest = local_summary.digest();
2364 let other_validator = name.concise();
2365 format!(
2366 "Checkpoint: {seq_number:?}\n\
2367 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2368 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2369 Summary Diff: \n{summary_patch}\n\n\
2370 Contents Diff: \n{contents_patch}\n\n\
2371 Transactions Diff: \n{transactions_patch}\n\n\
2372 Effects Diff: \n{effects_patch}",
2373 )
2374 })
2375 .collect::<Vec<_>>()
2376 .join("\n\n\n");
2377
2378 let header = format!(
2379 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2380 Datetime: {time:?}"
2381 );
2382 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2383 let path = tempfile::tempdir()
2384 .expect("Failed to create tempdir")
2385 .keep()
2386 .join(Path::new("checkpoint_fork_dump.txt"));
2387 let mut file = File::create(path).unwrap();
2388 write!(file, "{fork_logs_text}").unwrap();
2389 debug!("{}", fork_logs_text);
2390
2391 fail_point!("split_brain_reached");
2392}
2393
2394pub trait CheckpointServiceNotify {
2395 fn notify_checkpoint_signature(
2396 &self,
2397 epoch_store: &AuthorityPerEpochStore,
2398 info: &CheckpointSignatureMessage,
2399 ) -> IotaResult;
2400
2401 fn notify_checkpoint(&self) -> IotaResult;
2402}
2403
2404enum CheckpointServiceState {
2405 Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2406 Started,
2407}
2408
2409impl CheckpointServiceState {
2410 fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2411 let mut state = CheckpointServiceState::Started;
2412 std::mem::swap(self, &mut state);
2413
2414 match state {
2415 CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2416 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2417 }
2418 }
2419}
2420
2421pub struct CheckpointService {
2422 tables: Arc<CheckpointStore>,
2423 notify_builder: Arc<Notify>,
2424 notify_aggregator: Arc<Notify>,
2425 last_signature_index: Mutex<u64>,
2426 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2428 highest_previously_built_seq: CheckpointSequenceNumber,
2431 metrics: Arc<CheckpointMetrics>,
2432 state: Mutex<CheckpointServiceState>,
2433}
2434
2435impl CheckpointService {
2436 pub fn build(
2440 state: Arc<AuthorityState>,
2441 checkpoint_store: Arc<CheckpointStore>,
2442 epoch_store: Arc<AuthorityPerEpochStore>,
2443 effects_store: Arc<dyn TransactionCacheRead>,
2444 accumulator: Weak<StateAccumulator>,
2445 checkpoint_output: Box<dyn CheckpointOutput>,
2446 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2447 metrics: Arc<CheckpointMetrics>,
2448 max_transactions_per_checkpoint: usize,
2449 max_checkpoint_size_bytes: usize,
2450 ) -> Arc<Self> {
2451 info!(
2452 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2453 );
2454 let notify_builder = Arc::new(Notify::new());
2455 let notify_aggregator = Arc::new(Notify::new());
2456
2457 let highest_previously_built_seq = checkpoint_store
2459 .get_latest_locally_computed_checkpoint()
2460 .expect("failed to get latest locally computed checkpoint")
2461 .map(|s| s.sequence_number)
2462 .unwrap_or(0);
2463
2464 let highest_currently_built_seq =
2465 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2466 .expect("epoch should not have ended")
2467 .map(|(seq, _)| seq)
2468 .unwrap_or(0);
2469
2470 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2471
2472 let aggregator = CheckpointAggregator::new(
2473 checkpoint_store.clone(),
2474 epoch_store.clone(),
2475 notify_aggregator.clone(),
2476 certified_checkpoint_output,
2477 state.clone(),
2478 metrics.clone(),
2479 );
2480
2481 let builder = CheckpointBuilder::new(
2482 state.clone(),
2483 checkpoint_store.clone(),
2484 epoch_store.clone(),
2485 notify_builder.clone(),
2486 effects_store,
2487 accumulator,
2488 checkpoint_output,
2489 notify_aggregator.clone(),
2490 highest_currently_built_seq_tx.clone(),
2491 metrics.clone(),
2492 max_transactions_per_checkpoint,
2493 max_checkpoint_size_bytes,
2494 );
2495
2496 let last_signature_index = epoch_store
2497 .get_last_checkpoint_signature_index()
2498 .expect("should not cross end of epoch");
2499 let last_signature_index = Mutex::new(last_signature_index);
2500
2501 Arc::new(Self {
2502 tables: checkpoint_store,
2503 notify_builder,
2504 notify_aggregator,
2505 last_signature_index,
2506 highest_currently_built_seq_tx,
2507 highest_previously_built_seq,
2508 metrics,
2509 state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2510 builder, aggregator,
2511 )))),
2512 })
2513 }
2514
2515 pub async fn spawn(&self) -> JoinSet<()> {
2524 let mut tasks = JoinSet::new();
2525
2526 let (builder, aggregator) = self.state.lock().take_unstarted();
2527 tasks.spawn(monitored_future!(builder.run()));
2528 tasks.spawn(monitored_future!(aggregator.run()));
2529
2530 if tokio::time::timeout(
2536 Duration::from_secs(120),
2537 self.wait_for_rebuilt_checkpoints(),
2538 )
2539 .await
2540 .is_err()
2541 {
2542 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2543 }
2544
2545 tasks
2546 }
2547}
2548
2549impl CheckpointService {
2550 pub async fn wait_for_rebuilt_checkpoints(&self) {
2557 let highest_previously_built_seq = self.highest_previously_built_seq;
2558 let mut rx = self.highest_currently_built_seq_tx.subscribe();
2559 let mut highest_currently_built_seq = *rx.borrow_and_update();
2560 info!(
2561 "Waiting for checkpoints to be rebuilt, previously built seq: \
2562 {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2563 );
2564 loop {
2565 if highest_currently_built_seq >= highest_previously_built_seq {
2566 info!("Checkpoint rebuild complete");
2567 break;
2568 }
2569 rx.changed().await.unwrap();
2570 highest_currently_built_seq = *rx.borrow_and_update();
2571 }
2572 }
2573
2574 #[cfg(test)]
2575 fn write_and_notify_checkpoint_for_testing(
2576 &self,
2577 epoch_store: &AuthorityPerEpochStore,
2578 checkpoint: PendingCheckpoint,
2579 ) -> IotaResult {
2580 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2581
2582 let mut output = ConsensusCommitOutput::new(0);
2583 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2584 output.set_default_commit_stats_for_testing();
2585 epoch_store.push_consensus_output_for_tests(output);
2586 self.notify_checkpoint()?;
2587 Ok(())
2588 }
2589}
2590
2591impl CheckpointServiceNotify for CheckpointService {
2592 fn notify_checkpoint_signature(
2593 &self,
2594 epoch_store: &AuthorityPerEpochStore,
2595 info: &CheckpointSignatureMessage,
2596 ) -> IotaResult {
2597 let sequence = info.summary.sequence_number;
2598 let signer = info.summary.auth_sig().authority.concise();
2599
2600 if let Some(highest_verified_checkpoint) = self
2601 .tables
2602 .get_highest_verified_checkpoint()?
2603 .map(|x| *x.sequence_number())
2604 {
2605 if sequence <= highest_verified_checkpoint {
2606 trace!(
2607 checkpoint_seq = sequence,
2608 "Ignore checkpoint signature from {} - already certified", signer,
2609 );
2610 self.metrics
2611 .last_ignored_checkpoint_signature_received
2612 .set(sequence as i64);
2613 return Ok(());
2614 }
2615 }
2616 trace!(
2617 checkpoint_seq = sequence,
2618 "Received checkpoint signature, digest {} from {}",
2619 info.summary.digest(),
2620 signer,
2621 );
2622 self.metrics
2623 .last_received_checkpoint_signatures
2624 .with_label_values(&[&signer.to_string()])
2625 .set(sequence as i64);
2626 let mut index = self.last_signature_index.lock();
2630 *index += 1;
2631 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2632 self.notify_aggregator.notify_one();
2633 Ok(())
2634 }
2635
2636 fn notify_checkpoint(&self) -> IotaResult {
2637 self.notify_builder.notify_one();
2638 Ok(())
2639 }
2640}
2641
2642pub struct CheckpointServiceNoop {}
2644impl CheckpointServiceNotify for CheckpointServiceNoop {
2645 fn notify_checkpoint_signature(
2646 &self,
2647 _: &AuthorityPerEpochStore,
2648 _: &CheckpointSignatureMessage,
2649 ) -> IotaResult {
2650 Ok(())
2651 }
2652
2653 fn notify_checkpoint(&self) -> IotaResult {
2654 Ok(())
2655 }
2656}
2657
2658#[cfg(test)]
2659mod tests {
2660 use std::{
2661 collections::{BTreeMap, HashMap},
2662 ops::Deref,
2663 };
2664
2665 use futures::{FutureExt as _, future::BoxFuture};
2666 use iota_macros::sim_test;
2667 use iota_protocol_config::{Chain, ProtocolConfig};
2668 use iota_types::{
2669 base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2670 crypto::Signature,
2671 digests::TransactionEventsDigest,
2672 effects::{TransactionEffects, TransactionEvents},
2673 messages_checkpoint::SignedCheckpointSummary,
2674 move_package::MovePackage,
2675 object,
2676 transaction::{GenesisObject, VerifiedTransaction},
2677 };
2678 use tokio::sync::mpsc;
2679
2680 use super::*;
2681 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2682
2683 #[sim_test]
2684 pub async fn checkpoint_builder_test() {
2685 telemetry_subscribers::init_for_testing();
2686
2687 let mut protocol_config =
2688 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2689 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2690 let state = TestAuthorityBuilder::new()
2691 .with_protocol_config(protocol_config)
2692 .build()
2693 .await;
2694
2695 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2696 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2697 vec![GenesisObject::RawObject {
2698 data: object::Data::Package(
2699 MovePackage::new(
2700 ObjectID::random(),
2701 SequenceNumber::new(),
2702 BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2703 100_000,
2704 Vec::new(),
2707 BTreeMap::new(),
2710 )
2711 .unwrap(),
2712 ),
2713 owner: object::Owner::Immutable,
2714 }],
2715 vec![],
2716 );
2717 for i in 0..15 {
2718 state
2719 .database_for_testing()
2720 .perpetual_tables
2721 .transactions
2722 .insert(&d(i), dummy_tx.serializable_ref())
2723 .unwrap();
2724 }
2725 for i in 15..20 {
2726 state
2727 .database_for_testing()
2728 .perpetual_tables
2729 .transactions
2730 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2731 .unwrap();
2732 }
2733
2734 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2735 commit_cert_for_test(
2736 &mut store,
2737 state.clone(),
2738 d(1),
2739 vec![d(2), d(3)],
2740 GasCostSummary::new(11, 11, 12, 11, 1),
2741 );
2742 commit_cert_for_test(
2743 &mut store,
2744 state.clone(),
2745 d(2),
2746 vec![d(3), d(4)],
2747 GasCostSummary::new(21, 21, 22, 21, 1),
2748 );
2749 commit_cert_for_test(
2750 &mut store,
2751 state.clone(),
2752 d(3),
2753 vec![],
2754 GasCostSummary::new(31, 31, 32, 31, 1),
2755 );
2756 commit_cert_for_test(
2757 &mut store,
2758 state.clone(),
2759 d(4),
2760 vec![],
2761 GasCostSummary::new(41, 41, 42, 41, 1),
2762 );
2763 for i in [5, 6, 7, 10, 11, 12, 13] {
2764 commit_cert_for_test(
2765 &mut store,
2766 state.clone(),
2767 d(i),
2768 vec![],
2769 GasCostSummary::new(41, 41, 42, 41, 1),
2770 );
2771 }
2772 for i in [15, 16, 17] {
2773 commit_cert_for_test(
2774 &mut store,
2775 state.clone(),
2776 d(i),
2777 vec![],
2778 GasCostSummary::new(51, 51, 52, 51, 1),
2779 );
2780 }
2781 let all_digests: Vec<_> = store.keys().copied().collect();
2782 for digest in all_digests {
2783 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2784 state
2785 .epoch_store_for_testing()
2786 .test_insert_user_signature(digest, vec![signature]);
2787 }
2788
2789 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2790 let (certified_output, mut certified_result) =
2791 mpsc::channel::<CertifiedCheckpointSummary>(10);
2792 let store = Arc::new(store);
2793
2794 let ckpt_dir = tempfile::tempdir().unwrap();
2795 let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2796 let epoch_store = state.epoch_store_for_testing();
2797
2798 let accumulator = Arc::new(StateAccumulator::new_for_tests(
2799 state.get_accumulator_store().clone(),
2800 ));
2801
2802 let checkpoint_service = CheckpointService::build(
2803 state.clone(),
2804 checkpoint_store,
2805 epoch_store.clone(),
2806 store,
2807 Arc::downgrade(&accumulator),
2808 Box::new(output),
2809 Box::new(certified_output),
2810 CheckpointMetrics::new_for_tests(),
2811 3,
2812 100_000,
2813 );
2814 let _tasks = checkpoint_service.spawn().await;
2815
2816 checkpoint_service
2817 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2818 .unwrap();
2819 checkpoint_service
2820 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2821 .unwrap();
2822 checkpoint_service
2823 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2824 .unwrap();
2825 checkpoint_service
2826 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2827 .unwrap();
2828 checkpoint_service
2829 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2830 .unwrap();
2831 checkpoint_service
2832 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2833 .unwrap();
2834
2835 let (c1c, c1s) = result.recv().await.unwrap();
2836 let (c2c, c2s) = result.recv().await.unwrap();
2837
2838 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2839 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2840 assert_eq!(c1t, vec![d(4)]);
2841 assert_eq!(c1s.previous_digest, None);
2842 assert_eq!(c1s.sequence_number, 0);
2843 assert_eq!(
2844 c1s.epoch_rolling_gas_cost_summary,
2845 GasCostSummary::new(41, 41, 42, 41, 1)
2846 );
2847
2848 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2849 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2850 assert_eq!(c2s.sequence_number, 1);
2851 assert_eq!(
2852 c2s.epoch_rolling_gas_cost_summary,
2853 GasCostSummary::new(104, 104, 108, 104, 4)
2854 );
2855
2856 let (c3c, c3s) = result.recv().await.unwrap();
2859 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2860 let (c4c, c4s) = result.recv().await.unwrap();
2861 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2862 assert_eq!(c3s.sequence_number, 2);
2863 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2864 assert_eq!(c4s.sequence_number, 3);
2865 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2866 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2867 assert_eq!(c4t, vec![d(13)]);
2868
2869 let (c5c, c5s) = result.recv().await.unwrap();
2872 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2873 let (c6c, c6s) = result.recv().await.unwrap();
2874 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2875 assert_eq!(c5s.sequence_number, 4);
2876 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2877 assert_eq!(c6s.sequence_number, 5);
2878 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2879 assert_eq!(c5t, vec![d(15), d(16)]);
2880 assert_eq!(c6t, vec![d(17)]);
2881
2882 let (c7c, c7s) = result.recv().await.unwrap();
2885 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2886 assert_eq!(c7t, vec![d(5), d(6)]);
2887 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2888 assert_eq!(c7s.sequence_number, 6);
2889
2890 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2891 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2892
2893 checkpoint_service
2894 .notify_checkpoint_signature(
2895 &epoch_store,
2896 &CheckpointSignatureMessage { summary: c2ss },
2897 )
2898 .unwrap();
2899 checkpoint_service
2900 .notify_checkpoint_signature(
2901 &epoch_store,
2902 &CheckpointSignatureMessage { summary: c1ss },
2903 )
2904 .unwrap();
2905
2906 let c1sc = certified_result.recv().await.unwrap();
2907 let c2sc = certified_result.recv().await.unwrap();
2908 assert_eq!(c1sc.sequence_number, 0);
2909 assert_eq!(c2sc.sequence_number, 1);
2910 }
2911
2912 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2913 fn try_notify_read_executed_effects(
2914 &self,
2915 digests: &[TransactionDigest],
2916 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2917 std::future::ready(Ok(digests
2918 .iter()
2919 .map(|d| self.get(d).expect("effects not found").clone())
2920 .collect()))
2921 .boxed()
2922 }
2923
2924 fn try_notify_read_executed_effects_digests(
2925 &self,
2926 digests: &[TransactionDigest],
2927 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2928 std::future::ready(Ok(digests
2929 .iter()
2930 .map(|d| {
2931 self.get(d)
2932 .map(|fx| fx.digest())
2933 .expect("effects not found")
2934 })
2935 .collect()))
2936 .boxed()
2937 }
2938
2939 fn try_multi_get_executed_effects(
2940 &self,
2941 digests: &[TransactionDigest],
2942 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2943 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2944 }
2945
2946 fn try_multi_get_transaction_blocks(
2953 &self,
2954 _: &[TransactionDigest],
2955 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2956 unimplemented!()
2957 }
2958
2959 fn try_multi_get_executed_effects_digests(
2960 &self,
2961 _: &[TransactionDigest],
2962 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2963 unimplemented!()
2964 }
2965
2966 fn try_multi_get_effects(
2967 &self,
2968 _: &[TransactionEffectsDigest],
2969 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2970 unimplemented!()
2971 }
2972
2973 fn try_multi_get_events(
2974 &self,
2975 _: &[TransactionEventsDigest],
2976 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2977 unimplemented!()
2978 }
2979 }
2980
2981 #[async_trait::async_trait]
2982 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2983 async fn checkpoint_created(
2984 &self,
2985 summary: &CheckpointSummary,
2986 contents: &CheckpointContents,
2987 _epoch_store: &Arc<AuthorityPerEpochStore>,
2988 _checkpoint_store: &Arc<CheckpointStore>,
2989 ) -> IotaResult {
2990 self.try_send((contents.clone(), summary.clone())).unwrap();
2991 Ok(())
2992 }
2993 }
2994
2995 #[async_trait::async_trait]
2996 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2997 async fn certified_checkpoint_created(
2998 &self,
2999 summary: &CertifiedCheckpointSummary,
3000 ) -> IotaResult {
3001 self.try_send(summary.clone()).unwrap();
3002 Ok(())
3003 }
3004 }
3005
3006 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3007 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
3008 roots: t
3009 .into_iter()
3010 .map(|t| TransactionKey::Digest(d(t)))
3011 .collect(),
3012 details: PendingCheckpointInfo {
3013 timestamp_ms,
3014 last_of_epoch: false,
3015 checkpoint_height: i,
3016 },
3017 })
3018 }
3019
3020 fn d(i: u8) -> TransactionDigest {
3021 let mut bytes: [u8; 32] = Default::default();
3022 bytes[0] = i;
3023 TransactionDigest::new(bytes)
3024 }
3025
3026 fn e(
3027 transaction_digest: TransactionDigest,
3028 dependencies: Vec<TransactionDigest>,
3029 gas_used: GasCostSummary,
3030 ) -> TransactionEffects {
3031 let mut effects = TransactionEffects::default();
3032 *effects.transaction_digest_mut_for_testing() = transaction_digest;
3033 *effects.dependencies_mut_for_testing() = dependencies;
3034 *effects.gas_cost_summary_mut_for_testing() = gas_used;
3035 effects
3036 }
3037
3038 fn commit_cert_for_test(
3039 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3040 state: Arc<AuthorityState>,
3041 digest: TransactionDigest,
3042 dependencies: Vec<TransactionDigest>,
3043 gas_used: GasCostSummary,
3044 ) {
3045 let epoch_store = state.epoch_store_for_testing();
3046 let effects = e(digest, dependencies, gas_used);
3047 store.insert(digest, effects.clone());
3048 epoch_store
3049 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
3050 .expect("Inserting cert fx and sigs should not fail");
3051 }
3052}