1mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12 fs::File,
13 io::Write,
14 path::Path,
15 sync::{Arc, Weak},
16 time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26 base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27 committee::StakeUnit,
28 crypto::AuthorityStrongQuorumSignInfo,
29 digests::{CheckpointContentsDigest, CheckpointDigest},
30 effects::{TransactionEffects, TransactionEffectsAPI},
31 error::{IotaError, IotaResult},
32 event::SystemEpochInfoEvent,
33 executable_transaction::VerifiedExecutableTransaction,
34 gas::GasCostSummary,
35 iota_system_state::{
36 IotaSystemState, IotaSystemStateTrait,
37 epoch_start_iota_system_state::EpochStartSystemStateTrait,
38 },
39 message_envelope::Message,
40 messages_checkpoint::{
41 CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
42 CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
43 CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
44 FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
45 VerifiedCheckpointContents,
46 },
47 messages_consensus::ConsensusTransactionKey,
48 signature::GenericSignature,
49 transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
50};
51use itertools::Itertools;
52use nonempty::NonEmpty;
53use parking_lot::Mutex;
54use rand::{rngs::OsRng, seq::SliceRandom};
55use serde::{Deserialize, Serialize};
56use tokio::{
57 sync::{Notify, watch},
58 task::JoinSet,
59 time::timeout,
60};
61use tracing::{debug, error, info, instrument, trace, warn};
62use typed_store::{
63 DBMapUtils, Map, TypedStoreError,
64 rocks::{DBMap, MetricConf},
65 traits::{TableSummary, TypedStoreDebug},
66};
67
68pub use crate::checkpoints::{
69 checkpoint_output::{
70 LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
71 },
72 metrics::CheckpointMetrics,
73};
74use crate::{
75 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
76 authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
77 checkpoints::{
78 causal_order::CausalOrder,
79 checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
80 },
81 consensus_handler::SequencedConsensusTransactionKey,
82 execution_cache::TransactionCacheRead,
83 stake_aggregator::{InsertResult, MultiStakeAggregator},
84 state_accumulator::StateAccumulator,
85};
86
87pub type CheckpointHeight = u64;
88
89pub struct EpochStats {
90 pub checkpoint_count: u64,
91 pub transaction_count: u64,
92 pub total_gas_reward: u64,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct PendingCheckpointInfo {
97 pub timestamp_ms: CheckpointTimestamp,
98 pub last_of_epoch: bool,
99 pub checkpoint_height: CheckpointHeight,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub enum PendingCheckpoint {
104 V1(PendingCheckpointContentsV1),
106}
107
108#[derive(Clone, Debug, Serialize, Deserialize)]
109pub struct PendingCheckpointContentsV1 {
110 pub roots: Vec<TransactionKey>,
111 pub details: PendingCheckpointInfo,
112}
113
114impl PendingCheckpoint {
115 pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
116 match self {
117 PendingCheckpoint::V1(contents) => contents,
118 }
119 }
120
121 pub fn into_v1(self) -> PendingCheckpointContentsV1 {
122 match self {
123 PendingCheckpoint::V1(contents) => contents,
124 }
125 }
126
127 pub fn roots(&self) -> &Vec<TransactionKey> {
128 &self.as_v1().roots
129 }
130
131 pub fn details(&self) -> &PendingCheckpointInfo {
132 &self.as_v1().details
133 }
134
135 pub fn height(&self) -> CheckpointHeight {
136 self.details().checkpoint_height
137 }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141pub struct BuilderCheckpointSummary {
142 pub summary: CheckpointSummary,
143 pub checkpoint_height: Option<CheckpointHeight>,
145 pub position_in_commit: usize,
146}
147
148#[derive(DBMapUtils)]
149pub struct CheckpointStoreTables {
150 pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
152
153 pub(crate) checkpoint_sequence_by_contents_digest:
155 DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
156
157 full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
161
162 pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
164 pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
166
167 pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
171
172 epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
175
176 pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
179}
180
181impl CheckpointStoreTables {
182 pub fn new(path: &Path, metric_name: &'static str) -> Self {
183 Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
184 }
185 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
186 Self::get_read_only_handle(
187 path.to_path_buf(),
188 None,
189 None,
190 MetricConf::new("checkpoint_readonly"),
191 )
192 }
193}
194
195pub struct CheckpointStore {
196 pub(crate) tables: CheckpointStoreTables,
197 synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
198 executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
199}
200
201impl CheckpointStore {
202 pub fn new(path: &Path) -> Arc<Self> {
203 let tables = CheckpointStoreTables::new(path, "checkpoint");
204 Arc::new(Self {
205 tables,
206 synced_checkpoint_notify_read: NotifyRead::new(),
207 executed_checkpoint_notify_read: NotifyRead::new(),
208 })
209 }
210
211 pub fn new_for_tests() -> Arc<Self> {
212 let ckpt_dir = tempfile::tempdir().unwrap();
213 CheckpointStore::new(ckpt_dir.path())
214 }
215
216 pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
217 let tables = CheckpointStoreTables::new(path, "db_checkpoint");
218 Arc::new(Self {
219 tables,
220 synced_checkpoint_notify_read: NotifyRead::new(),
221 executed_checkpoint_notify_read: NotifyRead::new(),
222 })
223 }
224
225 pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
226 CheckpointStoreTables::open_readonly(path)
227 }
228
229 #[instrument(level = "info", skip_all)]
230 pub fn insert_genesis_checkpoint(
231 &self,
232 checkpoint: VerifiedCheckpoint,
233 contents: CheckpointContents,
234 epoch_store: &AuthorityPerEpochStore,
235 ) {
236 assert_eq!(
237 checkpoint.epoch(),
238 0,
239 "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
240 );
241 assert_eq!(
242 *checkpoint.sequence_number(),
243 0,
244 "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
245 );
246
247 if self
250 .get_checkpoint_by_digest(checkpoint.digest())
251 .unwrap()
252 .is_none()
253 {
254 if epoch_store.epoch() == checkpoint.epoch {
255 epoch_store
256 .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
257 .unwrap();
258 } else {
259 debug!(
260 validator_epoch =% epoch_store.epoch(),
261 genesis_epoch =% checkpoint.epoch(),
262 "Not inserting checkpoint builder data for genesis checkpoint",
263 );
264 }
265 self.insert_checkpoint_contents(contents).unwrap();
266 self.insert_verified_checkpoint(&checkpoint).unwrap();
267 self.update_highest_synced_checkpoint(&checkpoint).unwrap();
268 }
269 }
270
271 pub fn get_checkpoint_by_digest(
272 &self,
273 digest: &CheckpointDigest,
274 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
275 self.tables
276 .checkpoint_by_digest
277 .get(digest)
278 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
279 }
280
281 pub fn get_checkpoint_by_sequence_number(
282 &self,
283 sequence_number: CheckpointSequenceNumber,
284 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
285 self.tables
286 .certified_checkpoints
287 .get(&sequence_number)
288 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
289 }
290
291 pub fn get_locally_computed_checkpoint(
292 &self,
293 sequence_number: CheckpointSequenceNumber,
294 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
295 self.tables
296 .locally_computed_checkpoints
297 .get(&sequence_number)
298 }
299
300 pub fn get_sequence_number_by_contents_digest(
301 &self,
302 digest: &CheckpointContentsDigest,
303 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
304 self.tables
305 .checkpoint_sequence_by_contents_digest
306 .get(digest)
307 }
308
309 pub fn delete_contents_digest_sequence_number_mapping(
310 &self,
311 digest: &CheckpointContentsDigest,
312 ) -> Result<(), TypedStoreError> {
313 self.tables
314 .checkpoint_sequence_by_contents_digest
315 .remove(digest)
316 }
317
318 pub fn get_latest_certified_checkpoint(
319 &self,
320 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
321 Ok(self
322 .tables
323 .certified_checkpoints
324 .reversed_safe_iter_with_bounds(None, None)?
325 .next()
326 .transpose()?
327 .map(|(_, v)| v.into()))
328 }
329
330 pub fn get_latest_locally_computed_checkpoint(
331 &self,
332 ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
333 Ok(self
334 .tables
335 .locally_computed_checkpoints
336 .reversed_safe_iter_with_bounds(None, None)?
337 .next()
338 .transpose()?
339 .map(|(_, v)| v))
340 }
341
342 pub fn multi_get_checkpoint_by_sequence_number(
343 &self,
344 sequence_numbers: &[CheckpointSequenceNumber],
345 ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
346 let checkpoints = self
347 .tables
348 .certified_checkpoints
349 .multi_get(sequence_numbers)?
350 .into_iter()
351 .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
352 .collect();
353
354 Ok(checkpoints)
355 }
356
357 pub fn multi_get_checkpoint_content(
358 &self,
359 contents_digest: &[CheckpointContentsDigest],
360 ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
361 self.tables.checkpoint_content.multi_get(contents_digest)
362 }
363
364 pub fn get_highest_verified_checkpoint(
365 &self,
366 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
367 let highest_verified = if let Some(highest_verified) = self
368 .tables
369 .watermarks
370 .get(&CheckpointWatermark::HighestVerified)?
371 {
372 highest_verified
373 } else {
374 return Ok(None);
375 };
376 self.get_checkpoint_by_digest(&highest_verified.1)
377 }
378
379 pub fn get_highest_synced_checkpoint(
380 &self,
381 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
382 let highest_synced = if let Some(highest_synced) = self
383 .tables
384 .watermarks
385 .get(&CheckpointWatermark::HighestSynced)?
386 {
387 highest_synced
388 } else {
389 return Ok(None);
390 };
391 self.get_checkpoint_by_digest(&highest_synced.1)
392 }
393
394 pub fn get_highest_synced_checkpoint_seq_number(
395 &self,
396 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
397 if let Some(highest_synced) = self
398 .tables
399 .watermarks
400 .get(&CheckpointWatermark::HighestSynced)?
401 {
402 Ok(Some(highest_synced.0))
403 } else {
404 Ok(None)
405 }
406 }
407
408 pub fn get_highest_executed_checkpoint_seq_number(
409 &self,
410 ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
411 if let Some(highest_executed) = self
412 .tables
413 .watermarks
414 .get(&CheckpointWatermark::HighestExecuted)?
415 {
416 Ok(Some(highest_executed.0))
417 } else {
418 Ok(None)
419 }
420 }
421
422 pub fn get_highest_executed_checkpoint(
423 &self,
424 ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
425 let highest_executed = if let Some(highest_executed) = self
426 .tables
427 .watermarks
428 .get(&CheckpointWatermark::HighestExecuted)?
429 {
430 highest_executed
431 } else {
432 return Ok(None);
433 };
434 self.get_checkpoint_by_digest(&highest_executed.1)
435 }
436
437 pub fn get_highest_pruned_checkpoint_seq_number(
438 &self,
439 ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
440 Ok(self
441 .tables
442 .watermarks
443 .get(&CheckpointWatermark::HighestPruned)?
444 .unwrap_or_default()
445 .0)
446 }
447
448 pub fn get_checkpoint_contents(
449 &self,
450 digest: &CheckpointContentsDigest,
451 ) -> Result<Option<CheckpointContents>, TypedStoreError> {
452 self.tables.checkpoint_content.get(digest)
453 }
454
455 pub fn get_full_checkpoint_contents_by_sequence_number(
456 &self,
457 seq: CheckpointSequenceNumber,
458 ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
459 self.tables.full_checkpoint_content.get(&seq)
460 }
461
462 fn prune_local_summaries(&self) -> IotaResult {
463 if let Some((last_local_summary, _)) = self
464 .tables
465 .locally_computed_checkpoints
466 .reversed_safe_iter_with_bounds(None, None)?
467 .next()
468 .transpose()?
469 {
470 let mut batch = self.tables.locally_computed_checkpoints.batch();
471 batch.schedule_delete_range(
472 &self.tables.locally_computed_checkpoints,
473 &0,
474 &last_local_summary,
475 )?;
476 batch.write()?;
477 info!("Pruned local summaries up to {:?}", last_local_summary);
478 }
479 Ok(())
480 }
481
482 fn check_for_checkpoint_fork(
483 &self,
484 local_checkpoint: &CheckpointSummary,
485 verified_checkpoint: &VerifiedCheckpoint,
486 ) {
487 if local_checkpoint != verified_checkpoint.data() {
488 let verified_contents = self
489 .get_checkpoint_contents(&verified_checkpoint.content_digest)
490 .map(|opt_contents| {
491 opt_contents
492 .map(|contents| format!("{contents:?}"))
493 .unwrap_or_else(|| {
494 format!(
495 "Verified checkpoint contents not found, digest: {:?}",
496 verified_checkpoint.content_digest,
497 )
498 })
499 })
500 .map_err(|e| {
501 format!(
502 "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
503 verified_checkpoint.content_digest, e
504 )
505 })
506 .unwrap_or_else(|err_msg| err_msg);
507
508 let local_contents = self
509 .get_checkpoint_contents(&local_checkpoint.content_digest)
510 .map(|opt_contents| {
511 opt_contents
512 .map(|contents| format!("{contents:?}"))
513 .unwrap_or_else(|| {
514 format!(
515 "Local checkpoint contents not found, digest: {:?}",
516 local_checkpoint.content_digest
517 )
518 })
519 })
520 .map_err(|e| {
521 format!(
522 "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
523 local_checkpoint.content_digest, e
524 )
525 })
526 .unwrap_or_else(|err_msg| err_msg);
527
528 error!(
530 verified_checkpoint = ?verified_checkpoint.data(),
531 ?verified_contents,
532 ?local_checkpoint,
533 ?local_contents,
534 "Local checkpoint fork detected!",
535 );
536 fatal!(
537 "Local checkpoint fork detected for sequence number: {}",
538 local_checkpoint.sequence_number()
539 );
540 }
541 }
542
543 pub fn insert_certified_checkpoint(
549 &self,
550 checkpoint: &VerifiedCheckpoint,
551 ) -> Result<(), TypedStoreError> {
552 debug!(
553 checkpoint_seq = checkpoint.sequence_number(),
554 "Inserting certified checkpoint",
555 );
556 let mut batch = self.tables.certified_checkpoints.batch();
557 batch
558 .insert_batch(
559 &self.tables.certified_checkpoints,
560 [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
561 )?
562 .insert_batch(
563 &self.tables.checkpoint_by_digest,
564 [(checkpoint.digest(), checkpoint.serializable_ref())],
565 )?;
566 if checkpoint.next_epoch_committee().is_some() {
567 batch.insert_batch(
568 &self.tables.epoch_last_checkpoint_map,
569 [(&checkpoint.epoch(), checkpoint.sequence_number())],
570 )?;
571 }
572 batch.write()?;
573
574 if let Some(local_checkpoint) = self
575 .tables
576 .locally_computed_checkpoints
577 .get(checkpoint.sequence_number())?
578 {
579 self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
580 }
581
582 Ok(())
583 }
584
585 #[instrument(level = "debug", skip_all)]
588 pub fn insert_verified_checkpoint(
589 &self,
590 checkpoint: &VerifiedCheckpoint,
591 ) -> Result<(), TypedStoreError> {
592 self.insert_certified_checkpoint(checkpoint)?;
593 self.update_highest_verified_checkpoint(checkpoint)
594 }
595
596 pub fn update_highest_verified_checkpoint(
597 &self,
598 checkpoint: &VerifiedCheckpoint,
599 ) -> Result<(), TypedStoreError> {
600 if Some(*checkpoint.sequence_number())
601 > self
602 .get_highest_verified_checkpoint()?
603 .map(|x| *x.sequence_number())
604 {
605 debug!(
606 checkpoint_seq = checkpoint.sequence_number(),
607 "Updating highest verified checkpoint",
608 );
609 self.tables.watermarks.insert(
610 &CheckpointWatermark::HighestVerified,
611 &(*checkpoint.sequence_number(), *checkpoint.digest()),
612 )?;
613 }
614
615 Ok(())
616 }
617
618 pub fn update_highest_synced_checkpoint(
619 &self,
620 checkpoint: &VerifiedCheckpoint,
621 ) -> Result<(), TypedStoreError> {
622 let seq = *checkpoint.sequence_number();
623 debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
624 self.tables.watermarks.insert(
625 &CheckpointWatermark::HighestSynced,
626 &(seq, *checkpoint.digest()),
627 )?;
628 self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
629 Ok(())
630 }
631
632 async fn notify_read_checkpoint_watermark<F>(
633 &self,
634 notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
635 seq: CheckpointSequenceNumber,
636 get_watermark: F,
637 ) -> VerifiedCheckpoint
638 where
639 F: Fn() -> Option<CheckpointSequenceNumber>,
640 {
641 type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
642
643 notify_read
644 .read(&[seq], |seqs| {
645 let seq = seqs[0];
646 let Some(highest) = get_watermark() else {
647 return Ok(vec![None]) as ReadResult;
648 };
649 if highest < seq {
650 return Ok(vec![None]) as ReadResult;
651 }
652 let checkpoint = self
653 .get_checkpoint_by_sequence_number(seq)
654 .expect("db error")
655 .expect("checkpoint not found");
656 Ok(vec![Some(checkpoint)]) as ReadResult
657 })
658 .await
659 .unwrap()
660 .into_iter()
661 .next()
662 .unwrap()
663 }
664
665 pub async fn notify_read_synced_checkpoint(
666 &self,
667 seq: CheckpointSequenceNumber,
668 ) -> VerifiedCheckpoint {
669 self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
670 self.get_highest_synced_checkpoint_seq_number()
671 .expect("db error")
672 })
673 .await
674 }
675
676 pub async fn notify_read_executed_checkpoint(
677 &self,
678 seq: CheckpointSequenceNumber,
679 ) -> VerifiedCheckpoint {
680 self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
681 self.get_highest_executed_checkpoint_seq_number()
682 .expect("db error")
683 })
684 .await
685 }
686
687 pub fn update_highest_executed_checkpoint(
688 &self,
689 checkpoint: &VerifiedCheckpoint,
690 ) -> Result<(), TypedStoreError> {
691 if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
692 if seq_number >= *checkpoint.sequence_number() {
693 return Ok(());
694 }
695 assert_eq!(
696 seq_number + 1,
697 *checkpoint.sequence_number(),
698 "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
699 checkpoint.sequence_number(),
700 seq_number
701 );
702 }
703 let seq = *checkpoint.sequence_number();
704 debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
705 self.tables.watermarks.insert(
706 &CheckpointWatermark::HighestExecuted,
707 &(seq, *checkpoint.digest()),
708 )?;
709 self.executed_checkpoint_notify_read
710 .notify(&seq, checkpoint);
711 Ok(())
712 }
713
714 pub fn update_highest_pruned_checkpoint(
715 &self,
716 checkpoint: &VerifiedCheckpoint,
717 ) -> Result<(), TypedStoreError> {
718 self.tables.watermarks.insert(
719 &CheckpointWatermark::HighestPruned,
720 &(*checkpoint.sequence_number(), *checkpoint.digest()),
721 )
722 }
723
724 pub fn set_highest_executed_checkpoint_subtle(
730 &self,
731 checkpoint: &VerifiedCheckpoint,
732 ) -> Result<(), TypedStoreError> {
733 self.tables.watermarks.insert(
734 &CheckpointWatermark::HighestExecuted,
735 &(*checkpoint.sequence_number(), *checkpoint.digest()),
736 )
737 }
738
739 pub fn insert_checkpoint_contents(
740 &self,
741 contents: CheckpointContents,
742 ) -> Result<(), TypedStoreError> {
743 debug!(
744 checkpoint_seq = ?contents.digest(),
745 "Inserting checkpoint contents",
746 );
747 self.tables
748 .checkpoint_content
749 .insert(contents.digest(), &contents)
750 }
751
752 pub fn insert_verified_checkpoint_contents(
753 &self,
754 checkpoint: &VerifiedCheckpoint,
755 full_contents: VerifiedCheckpointContents,
756 ) -> Result<(), TypedStoreError> {
757 let mut batch = self.tables.full_checkpoint_content.batch();
758 batch.insert_batch(
759 &self.tables.checkpoint_sequence_by_contents_digest,
760 [(&checkpoint.content_digest, checkpoint.sequence_number())],
761 )?;
762 let full_contents = full_contents.into_inner();
763 batch.insert_batch(
764 &self.tables.full_checkpoint_content,
765 [(checkpoint.sequence_number(), &full_contents)],
766 )?;
767
768 let contents = full_contents.into_checkpoint_contents();
769 assert_eq!(&checkpoint.content_digest, contents.digest());
770
771 batch.insert_batch(
772 &self.tables.checkpoint_content,
773 [(contents.digest(), &contents)],
774 )?;
775
776 batch.write()
777 }
778
779 pub fn delete_full_checkpoint_contents(
780 &self,
781 seq: CheckpointSequenceNumber,
782 ) -> Result<(), TypedStoreError> {
783 self.tables.full_checkpoint_content.remove(&seq)
784 }
785
786 pub fn get_epoch_last_checkpoint(
787 &self,
788 epoch_id: EpochId,
789 ) -> IotaResult<Option<VerifiedCheckpoint>> {
790 let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
791 let checkpoint = match seq {
792 Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
793 None => None,
794 };
795 Ok(checkpoint)
796 }
797
798 pub fn insert_epoch_last_checkpoint(
799 &self,
800 epoch_id: EpochId,
801 checkpoint: &VerifiedCheckpoint,
802 ) -> IotaResult {
803 self.tables
804 .epoch_last_checkpoint_map
805 .insert(&epoch_id, checkpoint.sequence_number())?;
806 Ok(())
807 }
808
809 pub fn get_epoch_state_commitments(
810 &self,
811 epoch: EpochId,
812 ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
813 let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
814 checkpoint
815 .end_of_epoch_data
816 .as_ref()
817 .expect("Last checkpoint of epoch expected to have EndOfEpochData")
818 .epoch_commitments
819 .clone()
820 });
821 Ok(commitments)
822 }
823
824 pub fn get_epoch_stats(
827 &self,
828 epoch: EpochId,
829 last_checkpoint: &CheckpointSummary,
830 ) -> Option<EpochStats> {
831 let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
832 (0, 0)
833 } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
834 (
835 checkpoint.sequence_number + 1,
836 checkpoint.network_total_transactions,
837 )
838 } else {
839 return None;
840 };
841 Some(EpochStats {
842 checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
843 transaction_count: last_checkpoint.network_total_transactions
844 - prev_epoch_network_transactions,
845 total_gas_reward: last_checkpoint
846 .epoch_rolling_gas_cost_summary
847 .computation_cost,
848 })
849 }
850
851 pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
852 self.tables
854 .checkpoint_content
855 .checkpoint_db(path)
856 .map_err(Into::into)
857 }
858
859 pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
860 let mut wb = self.tables.watermarks.batch();
861 wb.delete_batch(
862 &self.tables.watermarks,
863 std::iter::once(CheckpointWatermark::HighestExecuted),
864 )?;
865 wb.write()?;
866 Ok(())
867 }
868
869 pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
870 self.delete_highest_executed_checkpoint_test_only()?;
871 self.tables.watermarks.rocksdb.flush()?;
872 Ok(())
873 }
874
875 #[instrument(level = "debug", skip_all)]
884 pub async fn reexecute_local_checkpoints(
885 &self,
886 state: &AuthorityState,
887 epoch_store: &AuthorityPerEpochStore,
888 ) {
889 let epoch = epoch_store.epoch();
890 let highest_executed = self
891 .get_highest_executed_checkpoint_seq_number()
892 .expect("get_highest_executed_checkpoint_seq_number should not fail")
893 .unwrap_or(0);
894
895 let Ok(Some(highest_built)) = self.get_latest_locally_computed_checkpoint() else {
896 info!("no locally built checkpoints to verify");
897 return;
898 };
899
900 info!(
901 "rexecuting locally computed checkpoints for crash recovery from {} to {}",
902 highest_executed, highest_built
903 );
904
905 for seq in highest_executed + 1..=*highest_built.sequence_number() {
906 info!(?seq, "Re-executing locally computed checkpoint");
907 let Some(checkpoint) = self
908 .get_locally_computed_checkpoint(seq)
909 .expect("get_locally_computed_checkpoint should not fail")
910 else {
911 panic!("locally computed checkpoint {seq:?} not found");
912 };
913
914 let Some(contents) = self
915 .get_checkpoint_contents(&checkpoint.content_digest)
916 .expect("get_checkpoint_contents should not fail")
917 else {
918 panic!(
919 "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
920 seq, checkpoint.content_digest
921 );
922 };
923
924 let cache = state.get_transaction_cache_reader();
925
926 let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
927 let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
928 let txns = cache
929 .try_multi_get_transaction_blocks(&tx_digests)
930 .expect("try_multi_get_transaction_blocks should not fail");
931
932 let txns: Vec<_> = itertools::izip!(txns, tx_digests, fx_digests)
933 .filter_map(|(tx, digest, fx)| {
934 if let Some(tx) = tx {
935 Some((tx, fx))
936 } else {
937 info!(
938 "transaction {:?} not found during checkpoint re-execution",
939 digest
940 );
941 None
942 }
943 })
944 .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
946 .map(|(tx, fx)| {
947 (
948 VerifiedExecutableTransaction::new_from_checkpoint(
949 (*tx).clone(),
950 epoch,
951 seq,
952 ),
953 fx,
954 )
955 })
956 .collect();
957
958 let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
959
960 info!(
961 ?seq,
962 ?tx_digests,
963 "Re-executing transactions for locally built checkpoint"
964 );
965 state.enqueue_with_expected_effects_digest(txns, epoch_store);
968
969 let waiting_logger = tokio::task::spawn(async move {
973 let mut interval = tokio::time::interval(Duration::from_secs(1));
974 loop {
975 interval.tick().await;
976 warn!(?seq, "Still waiting for re-execution to complete");
977 }
978 });
979
980 cache
981 .try_notify_read_executed_effects_digests(&tx_digests)
982 .await
983 .expect("notify_read_executed_effects_digests should not fail");
984
985 waiting_logger.abort();
986 waiting_logger.await.ok();
987 info!(?seq, "Re-execution completed for locally built checkpoint");
988 }
989
990 info!("Re-execution of locally built checkpoints completed");
991 }
992}
993
994#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
995pub enum CheckpointWatermark {
996 HighestVerified,
997 HighestSynced,
998 HighestExecuted,
999 HighestPruned,
1000}
1001
1002pub struct CheckpointBuilder {
1003 state: Arc<AuthorityState>,
1004 store: Arc<CheckpointStore>,
1005 epoch_store: Arc<AuthorityPerEpochStore>,
1006 notify: Arc<Notify>,
1007 notify_aggregator: Arc<Notify>,
1008 last_built: watch::Sender<CheckpointSequenceNumber>,
1009 effects_store: Arc<dyn TransactionCacheRead>,
1010 accumulator: Weak<StateAccumulator>,
1011 output: Box<dyn CheckpointOutput>,
1012 metrics: Arc<CheckpointMetrics>,
1013 max_transactions_per_checkpoint: usize,
1014 max_checkpoint_size_bytes: usize,
1015}
1016
1017pub struct CheckpointAggregator {
1018 store: Arc<CheckpointStore>,
1019 epoch_store: Arc<AuthorityPerEpochStore>,
1020 notify: Arc<Notify>,
1021 current: Option<CheckpointSignatureAggregator>,
1022 output: Box<dyn CertifiedCheckpointOutput>,
1023 state: Arc<AuthorityState>,
1024 metrics: Arc<CheckpointMetrics>,
1025}
1026
1027pub struct CheckpointSignatureAggregator {
1029 next_index: u64,
1030 summary: CheckpointSummary,
1031 digest: CheckpointDigest,
1032 signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1034 store: Arc<CheckpointStore>,
1035 state: Arc<AuthorityState>,
1036 metrics: Arc<CheckpointMetrics>,
1037}
1038
1039impl CheckpointBuilder {
1040 fn new(
1041 state: Arc<AuthorityState>,
1042 store: Arc<CheckpointStore>,
1043 epoch_store: Arc<AuthorityPerEpochStore>,
1044 notify: Arc<Notify>,
1045 effects_store: Arc<dyn TransactionCacheRead>,
1046 accumulator: Weak<StateAccumulator>,
1047 output: Box<dyn CheckpointOutput>,
1048 notify_aggregator: Arc<Notify>,
1049 last_built: watch::Sender<CheckpointSequenceNumber>,
1050 metrics: Arc<CheckpointMetrics>,
1051 max_transactions_per_checkpoint: usize,
1052 max_checkpoint_size_bytes: usize,
1053 ) -> Self {
1054 Self {
1055 state,
1056 store,
1057 epoch_store,
1058 notify,
1059 effects_store,
1060 accumulator,
1061 output,
1062 notify_aggregator,
1063 last_built,
1064 metrics,
1065 max_transactions_per_checkpoint,
1066 max_checkpoint_size_bytes,
1067 }
1068 }
1069
1070 async fn run(mut self) {
1073 info!("Starting CheckpointBuilder");
1074 loop {
1075 self.maybe_build_checkpoints().await;
1076
1077 self.notify.notified().await;
1078 }
1079 }
1080
1081 async fn maybe_build_checkpoints(&mut self) {
1082 let _scope = monitored_scope("BuildCheckpoints");
1083
1084 let summary = self
1086 .epoch_store
1087 .last_built_checkpoint_builder_summary()
1088 .expect("epoch should not have ended");
1089 let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1090 let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1091
1092 let min_checkpoint_interval_ms = self
1093 .epoch_store
1094 .protocol_config()
1095 .min_checkpoint_interval_ms_as_option()
1096 .unwrap_or_default();
1097 let mut grouped_pending_checkpoints = Vec::new();
1098 let mut checkpoints_iter = self
1099 .epoch_store
1100 .get_pending_checkpoints(last_height)
1101 .expect("unexpected epoch store error")
1102 .into_iter()
1103 .peekable();
1104 while let Some((height, pending)) = checkpoints_iter.next() {
1105 let current_timestamp = pending.details().timestamp_ms;
1108 let can_build = match last_timestamp {
1109 Some(last_timestamp) => {
1110 current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1111 }
1112 None => true,
1113 } || checkpoints_iter
1116 .peek()
1117 .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1118 || pending.details().last_of_epoch;
1120 grouped_pending_checkpoints.push(pending);
1121 if !can_build {
1122 debug!(
1123 checkpoint_commit_height = height,
1124 ?last_timestamp,
1125 ?current_timestamp,
1126 "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1127 );
1128 continue;
1129 }
1130
1131 last_height = Some(height);
1133 last_timestamp = Some(current_timestamp);
1134 debug!(
1135 checkpoint_commit_height = height,
1136 "Making checkpoint at commit height"
1137 );
1138
1139 match self
1140 .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1141 .await
1142 {
1143 Ok(seq) => {
1144 self.last_built.send_if_modified(|cur| {
1145 if seq > *cur {
1147 *cur = seq;
1148 true
1149 } else {
1150 false
1151 }
1152 });
1153 }
1154 Err(e) => {
1155 error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1156 tokio::time::sleep(Duration::from_secs(1)).await;
1157 self.metrics.checkpoint_errors.inc();
1158 return;
1159 }
1160 }
1161 tokio::task::yield_now().await;
1164 }
1165 debug!(
1166 "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1167 grouped_pending_checkpoints.len(),
1168 );
1169 }
1170
1171 #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1172 async fn make_checkpoint(
1173 &self,
1174 pendings: Vec<PendingCheckpoint>,
1175 ) -> anyhow::Result<CheckpointSequenceNumber> {
1176 let last_details = pendings.last().unwrap().details().clone();
1177
1178 let mut effects_in_current_checkpoint = BTreeSet::new();
1184
1185 let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1188 for pending_checkpoint in pendings.into_iter() {
1189 let pending = pending_checkpoint.into_v1();
1190 let txn_in_checkpoint = self
1191 .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1192 .await?;
1193 sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1194 }
1195 let new_checkpoints = self
1196 .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1197 .await?;
1198 let highest_sequence = *new_checkpoints.last().0.sequence_number();
1199 self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1200 .await?;
1201 Ok(highest_sequence)
1202 }
1203
1204 #[instrument(level = "debug", skip_all)]
1209 async fn resolve_checkpoint_transactions(
1210 &self,
1211 roots: Vec<TransactionKey>,
1212 effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1213 ) -> IotaResult<Vec<TransactionEffects>> {
1214 self.metrics
1215 .checkpoint_roots_count
1216 .inc_by(roots.len() as u64);
1217
1218 let root_digests = self
1219 .epoch_store
1220 .notify_read_executed_digests(&roots)
1221 .in_monitored_scope("CheckpointNotifyDigests")
1222 .await?;
1223 let root_effects = self
1224 .effects_store
1225 .try_notify_read_executed_effects(&root_digests)
1226 .in_monitored_scope("CheckpointNotifyRead")
1227 .await?;
1228
1229 let _scope = monitored_scope("CheckpointBuilder");
1230
1231 let consensus_commit_prologue = {
1232 let consensus_commit_prologue = self
1236 .extract_consensus_commit_prologue(&root_digests, &root_effects)
1237 .await?;
1238
1239 if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1243 let unsorted_ccp = self.complete_checkpoint_effects(
1244 vec![ccp_effects.clone()],
1245 effects_in_current_checkpoint,
1246 )?;
1247
1248 if unsorted_ccp.len() != 1 {
1251 fatal!(
1252 "Expected 1 consensus commit prologue, got {:?}",
1253 unsorted_ccp
1254 .iter()
1255 .map(|e| e.transaction_digest())
1256 .collect::<Vec<_>>()
1257 );
1258 }
1259 assert_eq!(unsorted_ccp.len(), 1);
1260 assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1261 }
1262 consensus_commit_prologue
1263 };
1264
1265 let unsorted =
1266 self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1267
1268 let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1269 let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1270 if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1271 #[cfg(debug_assertions)]
1272 {
1273 for tx in unsorted.iter() {
1276 assert!(tx.transaction_digest() != &_ccp_digest);
1277 }
1278 }
1279 sorted.push(ccp_effects);
1280 }
1281 sorted.extend(CausalOrder::causal_sort(unsorted));
1282
1283 #[cfg(msim)]
1284 {
1285 self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1287 }
1288
1289 Ok(sorted)
1290 }
1291
1292 async fn extract_consensus_commit_prologue(
1297 &self,
1298 root_digests: &[TransactionDigest],
1299 root_effects: &[TransactionEffects],
1300 ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1301 let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1302 if root_digests.is_empty() {
1303 return Ok(None);
1304 }
1305
1306 let first_tx = self
1311 .state
1312 .get_transaction_cache_reader()
1313 .try_get_transaction_block(&root_digests[0])?
1314 .expect("Transaction block must exist");
1315
1316 Ok(match first_tx.transaction_data().kind() {
1317 TransactionKind::ConsensusCommitPrologueV1(_) => {
1318 assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1319 Some((*first_tx.digest(), root_effects[0].clone()))
1320 }
1321 _ => None,
1322 })
1323 }
1324
1325 #[instrument(level = "debug", skip_all)]
1327 async fn write_checkpoints(
1328 &self,
1329 height: CheckpointHeight,
1330 new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1331 ) -> IotaResult {
1332 let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1333 let mut batch = self.store.tables.checkpoint_content.batch();
1334 let mut all_tx_digests =
1335 Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1336
1337 for (summary, contents) in &new_checkpoints {
1338 debug!(
1339 checkpoint_commit_height = height,
1340 checkpoint_seq = summary.sequence_number,
1341 contents_digest = ?contents.digest(),
1342 "writing checkpoint",
1343 );
1344
1345 if let Some(previously_computed_summary) = self
1346 .store
1347 .tables
1348 .locally_computed_checkpoints
1349 .get(&summary.sequence_number)?
1350 {
1351 if previously_computed_summary != *summary {
1352 fatal!(
1354 "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1355 summary.sequence_number,
1356 );
1357 }
1358 }
1359
1360 all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1361
1362 self.metrics
1363 .transactions_included_in_checkpoint
1364 .inc_by(contents.size() as u64);
1365 let sequence_number = summary.sequence_number;
1366 self.metrics
1367 .last_constructed_checkpoint
1368 .set(sequence_number as i64);
1369
1370 batch.insert_batch(
1371 &self.store.tables.checkpoint_content,
1372 [(contents.digest(), contents)],
1373 )?;
1374
1375 batch.insert_batch(
1376 &self.store.tables.locally_computed_checkpoints,
1377 [(sequence_number, summary)],
1378 )?;
1379 }
1380
1381 batch.write()?;
1382
1383 for (summary, contents) in &new_checkpoints {
1385 self.output
1386 .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1387 .await?;
1388 }
1389
1390 for (local_checkpoint, _) in &new_checkpoints {
1391 if let Some(certified_checkpoint) = self
1392 .store
1393 .tables
1394 .certified_checkpoints
1395 .get(local_checkpoint.sequence_number())?
1396 {
1397 self.store
1398 .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1399 }
1400 }
1401
1402 self.notify_aggregator.notify_one();
1403 self.epoch_store
1404 .process_constructed_checkpoint(height, new_checkpoints);
1405 Ok(())
1406 }
1407
1408 #[expect(clippy::type_complexity)]
1409 fn split_checkpoint_chunks(
1410 &self,
1411 effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1412 signatures: Vec<Vec<GenericSignature>>,
1413 ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1414 let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1415 let mut chunks = Vec::new();
1416 let mut chunk = Vec::new();
1417 let mut chunk_size: usize = 0;
1418 for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1419 .into_iter()
1420 .zip(signatures.into_iter())
1421 {
1422 let size = transaction_size
1427 + bcs::serialized_size(&effects)?
1428 + bcs::serialized_size(&signatures)?;
1429 if chunk.len() == self.max_transactions_per_checkpoint
1430 || (chunk_size + size) > self.max_checkpoint_size_bytes
1431 {
1432 if chunk.is_empty() {
1433 warn!(
1435 "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1436 self.max_checkpoint_size_bytes
1437 );
1438 } else {
1439 chunks.push(chunk);
1440 chunk = Vec::new();
1441 chunk_size = 0;
1442 }
1443 }
1444
1445 chunk.push((effects, signatures));
1446 chunk_size += size;
1447 }
1448
1449 if !chunk.is_empty() || chunks.is_empty() {
1450 chunks.push(chunk);
1455 }
1461 Ok(chunks)
1462 }
1463
1464 fn load_last_built_checkpoint_summary(
1467 epoch_store: &AuthorityPerEpochStore,
1468 store: &CheckpointStore,
1469 ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1470 let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1471 if last_checkpoint.is_none() {
1472 let epoch = epoch_store.epoch();
1473 if epoch > 0 {
1474 let previous_epoch = epoch - 1;
1475 let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1476 last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1477 if let Some((ref seq, _)) = last_checkpoint {
1478 debug!(
1479 "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1480 );
1481 } else {
1482 panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1485 }
1486 }
1487 }
1488 Ok(last_checkpoint)
1489 }
1490
1491 #[instrument(level = "debug", skip_all)]
1492 async fn create_checkpoints(
1493 &self,
1494 all_effects: Vec<TransactionEffects>,
1495 details: &PendingCheckpointInfo,
1496 ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1497 let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1498 let total = all_effects.len();
1499 let mut last_checkpoint =
1500 Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1501 let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1502 debug!(
1503 next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1504 checkpoint_timestamp = details.timestamp_ms,
1505 "Creating checkpoint(s) for {} transactions",
1506 all_effects.len(),
1507 );
1508
1509 let all_digests: Vec<_> = all_effects
1510 .iter()
1511 .map(|effect| *effect.transaction_digest())
1512 .collect();
1513 let transactions_and_sizes = self
1514 .state
1515 .get_transaction_cache_reader()
1516 .try_get_transactions_and_serialized_sizes(&all_digests)?;
1517 let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1518 let mut transactions = Vec::with_capacity(all_effects.len());
1519 let mut transaction_keys = Vec::with_capacity(all_effects.len());
1520 let mut randomness_rounds = BTreeMap::new();
1521 {
1522 let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1523 debug!(
1524 ?last_checkpoint_seq,
1525 "Waiting for {:?} certificates to appear in consensus",
1526 all_effects.len()
1527 );
1528
1529 for (effects, transaction_and_size) in all_effects
1530 .into_iter()
1531 .zip(transactions_and_sizes.into_iter())
1532 {
1533 let (transaction, size) = transaction_and_size
1534 .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1535 match transaction.inner().transaction_data().kind() {
1536 TransactionKind::ConsensusCommitPrologueV1(_)
1537 | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1538 }
1543 TransactionKind::RandomnessStateUpdate(rsu) => {
1544 randomness_rounds
1545 .insert(*effects.transaction_digest(), rsu.randomness_round);
1546 }
1547 _ => {
1548 transaction_keys.push(SequencedConsensusTransactionKey::External(
1551 ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1552 ));
1553 }
1554 }
1555 transactions.push(transaction);
1556 all_effects_and_transaction_sizes.push((effects, size));
1557 }
1558
1559 self.epoch_store
1560 .consensus_messages_processed_notify(transaction_keys)
1561 .await?;
1562 }
1563
1564 let signatures = self
1565 .epoch_store
1566 .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1567 debug!(
1568 ?last_checkpoint_seq,
1569 "Received {} checkpoint user signatures from consensus",
1570 signatures.len()
1571 );
1572
1573 let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1574 let chunks_count = chunks.len();
1575
1576 let mut checkpoints = Vec::with_capacity(chunks_count);
1577 debug!(
1578 ?last_checkpoint_seq,
1579 "Creating {} checkpoints with {} transactions", chunks_count, total,
1580 );
1581
1582 let epoch = self.epoch_store.epoch();
1583 for (index, transactions) in chunks.into_iter().enumerate() {
1584 let first_checkpoint_of_epoch = index == 0
1585 && last_checkpoint
1586 .as_ref()
1587 .map(|(_, c)| c.epoch != epoch)
1588 .unwrap_or(true);
1589 if first_checkpoint_of_epoch {
1590 self.epoch_store
1591 .record_epoch_first_checkpoint_creation_time_metric();
1592 }
1593 let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1594
1595 let sequence_number = last_checkpoint
1596 .as_ref()
1597 .map(|(_, c)| c.sequence_number + 1)
1598 .unwrap_or_default();
1599 let timestamp_ms = details.timestamp_ms;
1600 if let Some((_, last_checkpoint)) = &last_checkpoint {
1601 if last_checkpoint.timestamp_ms > timestamp_ms {
1602 error!(
1603 "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1604 sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1605 );
1606 }
1607 }
1608
1609 let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1610 let epoch_rolling_gas_cost_summary =
1611 self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1612
1613 let end_of_epoch_data = if last_checkpoint_of_epoch {
1614 let (system_state_obj, system_epoch_info_event) = self
1615 .augment_epoch_last_checkpoint(
1616 &epoch_rolling_gas_cost_summary,
1617 timestamp_ms,
1618 &mut effects,
1619 &mut signatures,
1620 sequence_number,
1621 )
1622 .await?;
1623
1624 let epoch_supply_change =
1632 system_epoch_info_event.map_or(0, |event| event.supply_change());
1633
1634 let committee = system_state_obj
1635 .get_current_epoch_committee()
1636 .committee()
1637 .clone();
1638
1639 let root_state_digest = {
1642 let state_acc = self
1643 .accumulator
1644 .upgrade()
1645 .expect("No checkpoints should be getting built after local configuration");
1646 let acc = state_acc.accumulate_checkpoint(
1647 &effects,
1648 sequence_number,
1649 &self.epoch_store,
1650 )?;
1651
1652 state_acc
1653 .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1654 .await?;
1655
1656 state_acc.accumulate_running_root(
1657 &self.epoch_store,
1658 sequence_number,
1659 Some(acc),
1660 )?;
1661 state_acc
1662 .digest_epoch(self.epoch_store.clone(), sequence_number)
1663 .await?
1664 };
1665 self.metrics.highest_accumulated_epoch.set(epoch as i64);
1666 info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1667
1668 let epoch_commitments = vec![root_state_digest.into()];
1669
1670 Some(EndOfEpochData {
1671 next_epoch_committee: committee.voting_rights,
1672 next_epoch_protocol_version: ProtocolVersion::new(
1673 system_state_obj.protocol_version(),
1674 ),
1675 epoch_commitments,
1676 epoch_supply_change,
1677 })
1678 } else {
1679 None
1680 };
1681 let contents = CheckpointContents::new_with_digests_and_signatures(
1682 effects.iter().map(TransactionEffects::execution_digests),
1683 signatures,
1684 );
1685
1686 let num_txns = contents.size() as u64;
1687
1688 let network_total_transactions = last_checkpoint
1689 .as_ref()
1690 .map(|(_, c)| c.network_total_transactions + num_txns)
1691 .unwrap_or(num_txns);
1692
1693 let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1694
1695 let matching_randomness_rounds: Vec<_> = effects
1696 .iter()
1697 .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1698 .copied()
1699 .collect();
1700
1701 let summary = CheckpointSummary::new(
1702 self.epoch_store.protocol_config(),
1703 epoch,
1704 sequence_number,
1705 network_total_transactions,
1706 &contents,
1707 previous_digest,
1708 epoch_rolling_gas_cost_summary,
1709 end_of_epoch_data,
1710 timestamp_ms,
1711 matching_randomness_rounds,
1712 );
1713 summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1714 if last_checkpoint_of_epoch {
1715 info!(
1716 checkpoint_seq = sequence_number,
1717 "creating last checkpoint of epoch {}", epoch
1718 );
1719 if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1720 self.epoch_store
1721 .report_epoch_metrics_at_last_checkpoint(stats);
1722 }
1723 }
1724 last_checkpoint = Some((sequence_number, summary.clone()));
1725 checkpoints.push((summary, contents));
1726 }
1727
1728 Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1729 }
1730
1731 fn get_epoch_total_gas_cost(
1732 &self,
1733 last_checkpoint: Option<&CheckpointSummary>,
1734 cur_checkpoint_effects: &[TransactionEffects],
1735 ) -> GasCostSummary {
1736 let (previous_epoch, previous_gas_costs) = last_checkpoint
1737 .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1738 .unwrap_or_default();
1739 let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1740 if previous_epoch == self.epoch_store.epoch() {
1741 GasCostSummary::new(
1743 previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1744 previous_gas_costs.computation_cost_burned
1745 + current_gas_costs.computation_cost_burned,
1746 previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1747 previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1748 previous_gas_costs.non_refundable_storage_fee
1749 + current_gas_costs.non_refundable_storage_fee,
1750 )
1751 } else {
1752 current_gas_costs
1753 }
1754 }
1755
1756 #[instrument(level = "error", skip_all)]
1759 async fn augment_epoch_last_checkpoint(
1760 &self,
1761 epoch_total_gas_cost: &GasCostSummary,
1762 epoch_start_timestamp_ms: CheckpointTimestamp,
1763 checkpoint_effects: &mut Vec<TransactionEffects>,
1764 signatures: &mut Vec<Vec<GenericSignature>>,
1765 checkpoint: CheckpointSequenceNumber,
1766 ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1767 let (system_state, system_epoch_info_event, effects) = self
1768 .state
1769 .create_and_execute_advance_epoch_tx(
1770 &self.epoch_store,
1771 epoch_total_gas_cost,
1772 checkpoint,
1773 epoch_start_timestamp_ms,
1774 )
1775 .await?;
1776 checkpoint_effects.push(effects);
1777 signatures.push(vec![]);
1778 Ok((system_state, system_epoch_info_event))
1779 }
1780
1781 #[instrument(level = "debug", skip_all)]
1790 fn complete_checkpoint_effects(
1791 &self,
1792 mut roots: Vec<TransactionEffects>,
1793 existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1794 ) -> IotaResult<Vec<TransactionEffects>> {
1795 let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1796 let mut results = vec![];
1797 let mut seen = HashSet::new();
1798 loop {
1799 let mut pending = HashSet::new();
1800
1801 let transactions_included = self
1802 .epoch_store
1803 .builder_included_transactions_in_checkpoint(
1804 roots.iter().map(|e| e.transaction_digest()),
1805 )?;
1806
1807 for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1808 let digest = effect.transaction_digest();
1809 seen.insert(*digest);
1812
1813 if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1815 continue;
1816 }
1817
1818 if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1820 continue;
1821 }
1822
1823 let existing_effects = self
1824 .epoch_store
1825 .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1826
1827 for (dependency, effects_signature_exists) in
1828 effect.dependencies().iter().zip(existing_effects.iter())
1829 {
1830 if !effects_signature_exists {
1835 continue;
1836 }
1837 if seen.insert(*dependency) {
1838 pending.insert(*dependency);
1839 }
1840 }
1841 results.push(effect);
1842 }
1843 if pending.is_empty() {
1844 break;
1845 }
1846 let pending = pending.into_iter().collect::<Vec<_>>();
1847 let effects = self
1848 .effects_store
1849 .try_multi_get_executed_effects(&pending)?;
1850 let effects = effects
1851 .into_iter()
1852 .zip(pending)
1853 .map(|(opt, digest)| match opt {
1854 Some(x) => x,
1855 None => panic!(
1856 "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1857 ),
1858 })
1859 .collect::<Vec<_>>();
1860 roots = effects;
1861 }
1862
1863 existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1864 Ok(results)
1865 }
1866
1867 #[cfg(msim)]
1870 fn expensive_consensus_commit_prologue_invariants_check(
1871 &self,
1872 root_digests: &[TransactionDigest],
1873 sorted: &[TransactionEffects],
1874 ) {
1875 let root_txs = self
1877 .state
1878 .get_transaction_cache_reader()
1879 .multi_get_transaction_blocks(root_digests);
1880 let ccps = root_txs
1881 .iter()
1882 .filter_map(|tx| {
1883 tx.as_ref().filter(|tx| {
1884 matches!(
1885 tx.transaction_data().kind(),
1886 TransactionKind::ConsensusCommitPrologueV1(_)
1887 )
1888 })
1889 })
1890 .collect::<Vec<_>>();
1891
1892 assert!(ccps.len() <= 1);
1895
1896 let txs = self
1898 .state
1899 .get_transaction_cache_reader()
1900 .multi_get_transaction_blocks(
1901 &sorted
1902 .iter()
1903 .map(|tx| *tx.transaction_digest())
1904 .collect::<Vec<_>>(),
1905 );
1906
1907 if ccps.is_empty() {
1908 for tx in txs.iter().flatten() {
1912 assert!(!matches!(
1913 tx.transaction_data().kind(),
1914 TransactionKind::ConsensusCommitPrologueV1(_)
1915 ));
1916 }
1917 } else {
1918 assert!(matches!(
1921 txs[0].as_ref().unwrap().transaction_data().kind(),
1922 TransactionKind::ConsensusCommitPrologueV1(_)
1923 ));
1924
1925 assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1926
1927 for tx in txs.iter().skip(1).flatten() {
1928 assert!(!matches!(
1929 tx.transaction_data().kind(),
1930 TransactionKind::ConsensusCommitPrologueV1(_)
1931 ));
1932 }
1933 }
1934 }
1935}
1936
1937impl CheckpointAggregator {
1938 fn new(
1939 tables: Arc<CheckpointStore>,
1940 epoch_store: Arc<AuthorityPerEpochStore>,
1941 notify: Arc<Notify>,
1942 output: Box<dyn CertifiedCheckpointOutput>,
1943 state: Arc<AuthorityState>,
1944 metrics: Arc<CheckpointMetrics>,
1945 ) -> Self {
1946 let current = None;
1947 Self {
1948 store: tables,
1949 epoch_store,
1950 notify,
1951 current,
1952 output,
1953 state,
1954 metrics,
1955 }
1956 }
1957
1958 async fn run(mut self) {
1964 info!("Starting CheckpointAggregator");
1965 loop {
1966 if let Err(e) = self.run_and_notify().await {
1967 error!(
1968 "Error while aggregating checkpoint, will retry in 1s: {:?}",
1969 e
1970 );
1971 self.metrics.checkpoint_errors.inc();
1972 tokio::time::sleep(Duration::from_secs(1)).await;
1973 continue;
1974 }
1975
1976 let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1977 }
1978 }
1979
1980 async fn run_and_notify(&mut self) -> IotaResult {
1981 let summaries = self.run_inner()?;
1982 for summary in summaries {
1983 self.output.certified_checkpoint_created(&summary).await?;
1984 }
1985 Ok(())
1986 }
1987
1988 fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1989 let _scope = monitored_scope("CheckpointAggregator");
1990 let mut result = vec![];
1991 'outer: loop {
1992 let next_to_certify = self.next_checkpoint_to_certify()?;
1993 let current = if let Some(current) = &mut self.current {
1994 if current.summary.sequence_number < next_to_certify {
2000 self.current = None;
2001 continue;
2002 }
2003 current
2004 } else {
2005 let Some(summary) = self
2006 .epoch_store
2007 .get_built_checkpoint_summary(next_to_certify)?
2008 else {
2009 return Ok(result);
2010 };
2011 self.current = Some(CheckpointSignatureAggregator {
2012 next_index: 0,
2013 digest: summary.digest(),
2014 summary,
2015 signatures_by_digest: MultiStakeAggregator::new(
2016 self.epoch_store.committee().clone(),
2017 ),
2018 store: self.store.clone(),
2019 state: self.state.clone(),
2020 metrics: self.metrics.clone(),
2021 });
2022 self.current.as_mut().unwrap()
2023 };
2024
2025 let epoch_tables = self
2026 .epoch_store
2027 .tables()
2028 .expect("should not run past end of epoch");
2029 let iter = epoch_tables
2030 .pending_checkpoint_signatures
2031 .safe_iter_with_bounds(
2032 Some((current.summary.sequence_number, current.next_index)),
2033 None,
2034 );
2035 for item in iter {
2036 let ((seq, index), data) = item?;
2037 if seq != current.summary.sequence_number {
2038 trace!(
2039 checkpoint_seq =? current.summary.sequence_number,
2040 "Not enough checkpoint signatures",
2041 );
2042 return Ok(result);
2044 }
2045 trace!(
2046 checkpoint_seq = current.summary.sequence_number,
2047 "Processing signature for checkpoint (digest: {:?}) from {:?}",
2048 current.summary.digest(),
2049 data.summary.auth_sig().authority.concise()
2050 );
2051 self.metrics
2052 .checkpoint_participation
2053 .with_label_values(&[&format!(
2054 "{:?}",
2055 data.summary.auth_sig().authority.concise()
2056 )])
2057 .inc();
2058 if let Ok(auth_signature) = current.try_aggregate(data) {
2059 debug!(
2060 checkpoint_seq = current.summary.sequence_number,
2061 "Successfully aggregated signatures for checkpoint (digest: {:?})",
2062 current.summary.digest(),
2063 );
2064 let summary = VerifiedCheckpoint::new_unchecked(
2065 CertifiedCheckpointSummary::new_from_data_and_sig(
2066 current.summary.clone(),
2067 auth_signature,
2068 ),
2069 );
2070
2071 self.store.insert_certified_checkpoint(&summary)?;
2072 self.metrics
2073 .last_certified_checkpoint
2074 .set(current.summary.sequence_number as i64);
2075 current
2076 .summary
2077 .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2078 result.push(summary.into_inner());
2079 self.current = None;
2080 continue 'outer;
2081 } else {
2082 current.next_index = index + 1;
2083 }
2084 }
2085 break;
2086 }
2087 Ok(result)
2088 }
2089
2090 fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2091 Ok(self
2092 .store
2093 .tables
2094 .certified_checkpoints
2095 .reversed_safe_iter_with_bounds(None, None)?
2096 .next()
2097 .transpose()?
2098 .map(|(seq, _)| seq + 1)
2099 .unwrap_or_default())
2100 }
2101}
2102
2103impl CheckpointSignatureAggregator {
2104 #[expect(clippy::result_unit_err)]
2105 pub fn try_aggregate(
2106 &mut self,
2107 data: CheckpointSignatureMessage,
2108 ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2109 let their_digest = *data.summary.digest();
2110 let (_, signature) = data.summary.into_data_and_sig();
2111 let author = signature.authority;
2112 let envelope =
2113 SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2114 match self.signatures_by_digest.insert(their_digest, envelope) {
2115 InsertResult::Failed {
2117 error:
2118 IotaError::StakeAggregatorRepeatedSigner {
2119 conflicting_sig: false,
2120 ..
2121 },
2122 } => Err(()),
2123 InsertResult::Failed { error } => {
2124 warn!(
2125 checkpoint_seq = self.summary.sequence_number,
2126 "Failed to aggregate new signature from validator {:?}: {:?}",
2127 author.concise(),
2128 error
2129 );
2130 self.check_for_split_brain();
2131 Err(())
2132 }
2133 InsertResult::QuorumReached(cert) => {
2134 if their_digest != self.digest {
2138 self.metrics.remote_checkpoint_forks.inc();
2139 warn!(
2140 checkpoint_seq = self.summary.sequence_number,
2141 "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2142 author.concise(),
2143 their_digest,
2144 self.digest
2145 );
2146 return Err(());
2147 }
2148 Ok(cert)
2149 }
2150 InsertResult::NotEnoughVotes {
2151 bad_votes: _,
2152 bad_authorities: _,
2153 } => {
2154 self.check_for_split_brain();
2155 Err(())
2156 }
2157 }
2158 }
2159
2160 fn check_for_split_brain(&self) {
2165 debug!(
2166 checkpoint_seq = self.summary.sequence_number,
2167 "Checking for split brain condition"
2168 );
2169 if self.signatures_by_digest.quorum_unreachable() {
2170 let digests_by_stake_messages = self
2176 .signatures_by_digest
2177 .get_all_unique_values()
2178 .into_iter()
2179 .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2180 .map(|(digest, (_authorities, total_stake))| {
2181 format!("{digest:?} (total stake: {total_stake})")
2182 })
2183 .collect::<Vec<String>>();
2184 error!(
2185 checkpoint_seq = self.summary.sequence_number,
2186 "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2187 self.signatures_by_digest.uncommitted_stake(),
2188 digests_by_stake_messages,
2189 );
2190 self.metrics.split_brain_checkpoint_forks.inc();
2191
2192 let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2193 let local_summary = self.summary.clone();
2194 let state = self.state.clone();
2195 let tables = self.store.clone();
2196
2197 tokio::spawn(async move {
2198 diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2199 });
2200 }
2201 }
2202}
2203
2204async fn diagnose_split_brain(
2210 all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2211 local_summary: CheckpointSummary,
2212 state: Arc<AuthorityState>,
2213 tables: Arc<CheckpointStore>,
2214) {
2215 debug!(
2216 checkpoint_seq = local_summary.sequence_number,
2217 "Running split brain diagnostics..."
2218 );
2219 let time = SystemTime::now();
2220 let digest_to_validator = all_unique_values
2222 .iter()
2223 .filter_map(|(digest, (validators, _))| {
2224 if *digest != local_summary.digest() {
2225 let random_validator = validators.choose(&mut OsRng).unwrap();
2226 Some((*digest, *random_validator))
2227 } else {
2228 None
2229 }
2230 })
2231 .collect::<HashMap<_, _>>();
2232 if digest_to_validator.is_empty() {
2233 panic!(
2234 "Given split brain condition, there should be at \
2235 least one validator that disagrees with local signature"
2236 );
2237 }
2238
2239 let epoch_store = state.load_epoch_store_one_call_per_task();
2240 let committee = epoch_store
2241 .epoch_start_state()
2242 .get_iota_committee_with_network_metadata();
2243 let network_config = default_iota_network_config();
2244 let network_clients =
2245 make_network_authority_clients_with_network_config(&committee, &network_config);
2246
2247 let response_futures = digest_to_validator
2249 .values()
2250 .cloned()
2251 .map(|validator| {
2252 let client = network_clients
2253 .get(&validator)
2254 .expect("Failed to get network client");
2255 let request = CheckpointRequest {
2256 sequence_number: Some(local_summary.sequence_number),
2257 request_content: true,
2258 certified: false,
2259 };
2260 client.handle_checkpoint(request)
2261 })
2262 .collect::<Vec<_>>();
2263
2264 let digest_name_pair = digest_to_validator.iter();
2265 let response_data = futures::future::join_all(response_futures)
2266 .await
2267 .into_iter()
2268 .zip(digest_name_pair)
2269 .filter_map(|(response, (digest, name))| match response {
2270 Ok(response) => match response {
2271 CheckpointResponse {
2272 checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2273 contents: Some(contents),
2274 } => Some((*name, *digest, summary, contents)),
2275 CheckpointResponse {
2276 checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2277 contents: _,
2278 } => {
2279 panic!("Expected pending checkpoint, but got certified checkpoint");
2280 }
2281 CheckpointResponse {
2282 checkpoint: None,
2283 contents: _,
2284 } => {
2285 error!(
2286 "Summary for checkpoint {:?} not found on validator {:?}",
2287 local_summary.sequence_number, name
2288 );
2289 None
2290 }
2291 CheckpointResponse {
2292 checkpoint: _,
2293 contents: None,
2294 } => {
2295 error!(
2296 "Contents for checkpoint {:?} not found on validator {:?}",
2297 local_summary.sequence_number, name
2298 );
2299 None
2300 }
2301 },
2302 Err(e) => {
2303 error!(
2304 "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2305 e
2306 );
2307 None
2308 }
2309 })
2310 .collect::<Vec<_>>();
2311
2312 let local_checkpoint_contents = tables
2313 .get_checkpoint_contents(&local_summary.content_digest)
2314 .unwrap_or_else(|_| {
2315 panic!(
2316 "Could not find checkpoint contents for digest {:?}",
2317 local_summary.digest()
2318 )
2319 })
2320 .unwrap_or_else(|| {
2321 panic!(
2322 "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2323 local_summary.sequence_number,
2324 local_summary.digest()
2325 )
2326 });
2327 let local_contents_text = format!("{local_checkpoint_contents:?}");
2328
2329 let local_summary_text = format!("{local_summary:?}");
2330 let local_validator = state.name.concise();
2331 let diff_patches = response_data
2332 .iter()
2333 .map(|(name, other_digest, other_summary, contents)| {
2334 let other_contents_text = format!("{contents:?}");
2335 let other_summary_text = format!("{other_summary:?}");
2336 let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2337 .enumerate_transactions(&local_summary)
2338 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2339 .unzip();
2340 let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2341 .enumerate_transactions(other_summary)
2342 .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2343 .unzip();
2344 let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2345 let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2346 let local_transactions_text = format!("{local_transactions:#?}");
2347 let other_transactions_text = format!("{other_transactions:#?}");
2348 let transactions_patch =
2349 create_patch(&local_transactions_text, &other_transactions_text);
2350 let local_effects_text = format!("{local_effects:#?}");
2351 let other_effects_text = format!("{other_effects:#?}");
2352 let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2353 let seq_number = local_summary.sequence_number;
2354 let local_digest = local_summary.digest();
2355 let other_validator = name.concise();
2356 format!(
2357 "Checkpoint: {seq_number:?}\n\
2358 Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2359 Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2360 Summary Diff: \n{summary_patch}\n\n\
2361 Contents Diff: \n{contents_patch}\n\n\
2362 Transactions Diff: \n{transactions_patch}\n\n\
2363 Effects Diff: \n{effects_patch}",
2364 )
2365 })
2366 .collect::<Vec<_>>()
2367 .join("\n\n\n");
2368
2369 let header = format!(
2370 "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2371 Datetime: {time:?}"
2372 );
2373 let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2374 let path = tempfile::tempdir()
2375 .expect("Failed to create tempdir")
2376 .keep()
2377 .join(Path::new("checkpoint_fork_dump.txt"));
2378 let mut file = File::create(path).unwrap();
2379 write!(file, "{fork_logs_text}").unwrap();
2380 debug!("{}", fork_logs_text);
2381
2382 fail_point!("split_brain_reached");
2383}
2384
2385pub trait CheckpointServiceNotify {
2386 fn notify_checkpoint_signature(
2387 &self,
2388 epoch_store: &AuthorityPerEpochStore,
2389 info: &CheckpointSignatureMessage,
2390 ) -> IotaResult;
2391
2392 fn notify_checkpoint(&self) -> IotaResult;
2393}
2394
2395enum CheckpointServiceState {
2396 Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2397 Started,
2398}
2399
2400impl CheckpointServiceState {
2401 fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2402 let mut state = CheckpointServiceState::Started;
2403 std::mem::swap(self, &mut state);
2404
2405 match state {
2406 CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2407 CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2408 }
2409 }
2410}
2411
2412pub struct CheckpointService {
2413 tables: Arc<CheckpointStore>,
2414 notify_builder: Arc<Notify>,
2415 notify_aggregator: Arc<Notify>,
2416 last_signature_index: Mutex<u64>,
2417 highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2419 highest_previously_built_seq: CheckpointSequenceNumber,
2422 metrics: Arc<CheckpointMetrics>,
2423 state: Mutex<CheckpointServiceState>,
2424}
2425
2426impl CheckpointService {
2427 pub fn build(
2431 state: Arc<AuthorityState>,
2432 checkpoint_store: Arc<CheckpointStore>,
2433 epoch_store: Arc<AuthorityPerEpochStore>,
2434 effects_store: Arc<dyn TransactionCacheRead>,
2435 accumulator: Weak<StateAccumulator>,
2436 checkpoint_output: Box<dyn CheckpointOutput>,
2437 certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2438 metrics: Arc<CheckpointMetrics>,
2439 max_transactions_per_checkpoint: usize,
2440 max_checkpoint_size_bytes: usize,
2441 ) -> Arc<Self> {
2442 info!(
2443 "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2444 );
2445 let notify_builder = Arc::new(Notify::new());
2446 let notify_aggregator = Arc::new(Notify::new());
2447
2448 let highest_previously_built_seq = checkpoint_store
2450 .get_latest_locally_computed_checkpoint()
2451 .expect("failed to get latest locally computed checkpoint")
2452 .map(|s| s.sequence_number)
2453 .unwrap_or(0);
2454
2455 let highest_currently_built_seq =
2456 CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2457 .expect("epoch should not have ended")
2458 .map(|(seq, _)| seq)
2459 .unwrap_or(0);
2460
2461 let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2462
2463 let aggregator = CheckpointAggregator::new(
2464 checkpoint_store.clone(),
2465 epoch_store.clone(),
2466 notify_aggregator.clone(),
2467 certified_checkpoint_output,
2468 state.clone(),
2469 metrics.clone(),
2470 );
2471
2472 let builder = CheckpointBuilder::new(
2473 state.clone(),
2474 checkpoint_store.clone(),
2475 epoch_store.clone(),
2476 notify_builder.clone(),
2477 effects_store,
2478 accumulator,
2479 checkpoint_output,
2480 notify_aggregator.clone(),
2481 highest_currently_built_seq_tx.clone(),
2482 metrics.clone(),
2483 max_transactions_per_checkpoint,
2484 max_checkpoint_size_bytes,
2485 );
2486
2487 let last_signature_index = epoch_store
2488 .get_last_checkpoint_signature_index()
2489 .expect("should not cross end of epoch");
2490 let last_signature_index = Mutex::new(last_signature_index);
2491
2492 Arc::new(Self {
2493 tables: checkpoint_store,
2494 notify_builder,
2495 notify_aggregator,
2496 last_signature_index,
2497 highest_currently_built_seq_tx,
2498 highest_previously_built_seq,
2499 metrics,
2500 state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2501 builder, aggregator,
2502 )))),
2503 })
2504 }
2505
2506 pub async fn spawn(&self) -> JoinSet<()> {
2515 let mut tasks = JoinSet::new();
2516
2517 let (builder, aggregator) = self.state.lock().take_unstarted();
2518 tasks.spawn(monitored_future!(builder.run()));
2519 tasks.spawn(monitored_future!(aggregator.run()));
2520
2521 if tokio::time::timeout(
2527 Duration::from_secs(120),
2528 self.wait_for_rebuilt_checkpoints(),
2529 )
2530 .await
2531 .is_err()
2532 {
2533 debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2534 }
2535
2536 tasks
2537 }
2538}
2539
2540impl CheckpointService {
2541 pub async fn wait_for_rebuilt_checkpoints(&self) {
2548 let highest_previously_built_seq = self.highest_previously_built_seq;
2549 let mut rx = self.highest_currently_built_seq_tx.subscribe();
2550 let mut highest_currently_built_seq = *rx.borrow_and_update();
2551 info!(
2552 "Waiting for checkpoints to be rebuilt, previously built seq: \
2553 {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2554 );
2555 loop {
2556 if highest_currently_built_seq >= highest_previously_built_seq {
2557 info!("Checkpoint rebuild complete");
2558 break;
2559 }
2560 rx.changed().await.unwrap();
2561 highest_currently_built_seq = *rx.borrow_and_update();
2562 }
2563 }
2564
2565 #[cfg(test)]
2566 fn write_and_notify_checkpoint_for_testing(
2567 &self,
2568 epoch_store: &AuthorityPerEpochStore,
2569 checkpoint: PendingCheckpoint,
2570 ) -> IotaResult {
2571 use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2572
2573 let mut output = ConsensusCommitOutput::new();
2574 epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2575 output.set_default_commit_stats_for_testing();
2576 epoch_store.push_consensus_output_for_tests(output);
2577 self.notify_checkpoint()?;
2578 Ok(())
2579 }
2580}
2581
2582impl CheckpointServiceNotify for CheckpointService {
2583 fn notify_checkpoint_signature(
2584 &self,
2585 epoch_store: &AuthorityPerEpochStore,
2586 info: &CheckpointSignatureMessage,
2587 ) -> IotaResult {
2588 let sequence = info.summary.sequence_number;
2589 let signer = info.summary.auth_sig().authority.concise();
2590
2591 if let Some(highest_verified_checkpoint) = self
2592 .tables
2593 .get_highest_verified_checkpoint()?
2594 .map(|x| *x.sequence_number())
2595 {
2596 if sequence <= highest_verified_checkpoint {
2597 trace!(
2598 checkpoint_seq = sequence,
2599 "Ignore checkpoint signature from {} - already certified", signer,
2600 );
2601 self.metrics
2602 .last_ignored_checkpoint_signature_received
2603 .set(sequence as i64);
2604 return Ok(());
2605 }
2606 }
2607 trace!(
2608 checkpoint_seq = sequence,
2609 "Received checkpoint signature, digest {} from {}",
2610 info.summary.digest(),
2611 signer,
2612 );
2613 self.metrics
2614 .last_received_checkpoint_signatures
2615 .with_label_values(&[&signer.to_string()])
2616 .set(sequence as i64);
2617 let mut index = self.last_signature_index.lock();
2621 *index += 1;
2622 epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2623 self.notify_aggregator.notify_one();
2624 Ok(())
2625 }
2626
2627 fn notify_checkpoint(&self) -> IotaResult {
2628 self.notify_builder.notify_one();
2629 Ok(())
2630 }
2631}
2632
2633pub struct CheckpointServiceNoop {}
2635impl CheckpointServiceNotify for CheckpointServiceNoop {
2636 fn notify_checkpoint_signature(
2637 &self,
2638 _: &AuthorityPerEpochStore,
2639 _: &CheckpointSignatureMessage,
2640 ) -> IotaResult {
2641 Ok(())
2642 }
2643
2644 fn notify_checkpoint(&self) -> IotaResult {
2645 Ok(())
2646 }
2647}
2648
2649#[cfg(test)]
2650mod tests {
2651 use std::{
2652 collections::{BTreeMap, HashMap},
2653 ops::Deref,
2654 };
2655
2656 use futures::{FutureExt as _, future::BoxFuture};
2657 use iota_macros::sim_test;
2658 use iota_protocol_config::{Chain, ProtocolConfig};
2659 use iota_types::{
2660 base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2661 crypto::Signature,
2662 digests::TransactionEventsDigest,
2663 effects::{TransactionEffects, TransactionEvents},
2664 messages_checkpoint::SignedCheckpointSummary,
2665 move_package::MovePackage,
2666 object,
2667 transaction::{GenesisObject, VerifiedTransaction},
2668 };
2669 use tokio::sync::mpsc;
2670
2671 use super::*;
2672 use crate::authority::test_authority_builder::TestAuthorityBuilder;
2673
2674 #[sim_test]
2675 pub async fn checkpoint_builder_test() {
2676 telemetry_subscribers::init_for_testing();
2677
2678 let mut protocol_config =
2679 ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2680 protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2681 let state = TestAuthorityBuilder::new()
2682 .with_protocol_config(protocol_config)
2683 .build()
2684 .await;
2685
2686 let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2687 let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2688 vec![GenesisObject::RawObject {
2689 data: object::Data::Package(
2690 MovePackage::new(
2691 ObjectID::random(),
2692 SequenceNumber::new(),
2693 BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2694 100_000,
2695 Vec::new(),
2698 BTreeMap::new(),
2701 )
2702 .unwrap(),
2703 ),
2704 owner: object::Owner::Immutable,
2705 }],
2706 vec![],
2707 );
2708 for i in 0..15 {
2709 state
2710 .database_for_testing()
2711 .perpetual_tables
2712 .transactions
2713 .insert(&d(i), dummy_tx.serializable_ref())
2714 .unwrap();
2715 }
2716 for i in 15..20 {
2717 state
2718 .database_for_testing()
2719 .perpetual_tables
2720 .transactions
2721 .insert(&d(i), dummy_tx_with_data.serializable_ref())
2722 .unwrap();
2723 }
2724
2725 let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2726 commit_cert_for_test(
2727 &mut store,
2728 state.clone(),
2729 d(1),
2730 vec![d(2), d(3)],
2731 GasCostSummary::new(11, 11, 12, 11, 1),
2732 );
2733 commit_cert_for_test(
2734 &mut store,
2735 state.clone(),
2736 d(2),
2737 vec![d(3), d(4)],
2738 GasCostSummary::new(21, 21, 22, 21, 1),
2739 );
2740 commit_cert_for_test(
2741 &mut store,
2742 state.clone(),
2743 d(3),
2744 vec![],
2745 GasCostSummary::new(31, 31, 32, 31, 1),
2746 );
2747 commit_cert_for_test(
2748 &mut store,
2749 state.clone(),
2750 d(4),
2751 vec![],
2752 GasCostSummary::new(41, 41, 42, 41, 1),
2753 );
2754 for i in [5, 6, 7, 10, 11, 12, 13] {
2755 commit_cert_for_test(
2756 &mut store,
2757 state.clone(),
2758 d(i),
2759 vec![],
2760 GasCostSummary::new(41, 41, 42, 41, 1),
2761 );
2762 }
2763 for i in [15, 16, 17] {
2764 commit_cert_for_test(
2765 &mut store,
2766 state.clone(),
2767 d(i),
2768 vec![],
2769 GasCostSummary::new(51, 51, 52, 51, 1),
2770 );
2771 }
2772 let all_digests: Vec<_> = store.keys().copied().collect();
2773 for digest in all_digests {
2774 let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2775 state
2776 .epoch_store_for_testing()
2777 .test_insert_user_signature(digest, vec![signature]);
2778 }
2779
2780 let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2781 let (certified_output, mut certified_result) =
2782 mpsc::channel::<CertifiedCheckpointSummary>(10);
2783 let store = Arc::new(store);
2784
2785 let ckpt_dir = tempfile::tempdir().unwrap();
2786 let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2787 let epoch_store = state.epoch_store_for_testing();
2788
2789 let accumulator = Arc::new(StateAccumulator::new_for_tests(
2790 state.get_accumulator_store().clone(),
2791 ));
2792
2793 let checkpoint_service = CheckpointService::build(
2794 state.clone(),
2795 checkpoint_store,
2796 epoch_store.clone(),
2797 store,
2798 Arc::downgrade(&accumulator),
2799 Box::new(output),
2800 Box::new(certified_output),
2801 CheckpointMetrics::new_for_tests(),
2802 3,
2803 100_000,
2804 );
2805 let _tasks = checkpoint_service.spawn().await;
2806
2807 checkpoint_service
2808 .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2809 .unwrap();
2810 checkpoint_service
2811 .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2812 .unwrap();
2813 checkpoint_service
2814 .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2815 .unwrap();
2816 checkpoint_service
2817 .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2818 .unwrap();
2819 checkpoint_service
2820 .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2821 .unwrap();
2822 checkpoint_service
2823 .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2824 .unwrap();
2825
2826 let (c1c, c1s) = result.recv().await.unwrap();
2827 let (c2c, c2s) = result.recv().await.unwrap();
2828
2829 let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2830 let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2831 assert_eq!(c1t, vec![d(4)]);
2832 assert_eq!(c1s.previous_digest, None);
2833 assert_eq!(c1s.sequence_number, 0);
2834 assert_eq!(
2835 c1s.epoch_rolling_gas_cost_summary,
2836 GasCostSummary::new(41, 41, 42, 41, 1)
2837 );
2838
2839 assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2840 assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2841 assert_eq!(c2s.sequence_number, 1);
2842 assert_eq!(
2843 c2s.epoch_rolling_gas_cost_summary,
2844 GasCostSummary::new(104, 104, 108, 104, 4)
2845 );
2846
2847 let (c3c, c3s) = result.recv().await.unwrap();
2850 let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2851 let (c4c, c4s) = result.recv().await.unwrap();
2852 let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2853 assert_eq!(c3s.sequence_number, 2);
2854 assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2855 assert_eq!(c4s.sequence_number, 3);
2856 assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2857 assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2858 assert_eq!(c4t, vec![d(13)]);
2859
2860 let (c5c, c5s) = result.recv().await.unwrap();
2863 let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2864 let (c6c, c6s) = result.recv().await.unwrap();
2865 let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2866 assert_eq!(c5s.sequence_number, 4);
2867 assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2868 assert_eq!(c6s.sequence_number, 5);
2869 assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2870 assert_eq!(c5t, vec![d(15), d(16)]);
2871 assert_eq!(c6t, vec![d(17)]);
2872
2873 let (c7c, c7s) = result.recv().await.unwrap();
2876 let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2877 assert_eq!(c7t, vec![d(5), d(6)]);
2878 assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2879 assert_eq!(c7s.sequence_number, 6);
2880
2881 let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2882 let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2883
2884 checkpoint_service
2885 .notify_checkpoint_signature(
2886 &epoch_store,
2887 &CheckpointSignatureMessage { summary: c2ss },
2888 )
2889 .unwrap();
2890 checkpoint_service
2891 .notify_checkpoint_signature(
2892 &epoch_store,
2893 &CheckpointSignatureMessage { summary: c1ss },
2894 )
2895 .unwrap();
2896
2897 let c1sc = certified_result.recv().await.unwrap();
2898 let c2sc = certified_result.recv().await.unwrap();
2899 assert_eq!(c1sc.sequence_number, 0);
2900 assert_eq!(c2sc.sequence_number, 1);
2901 }
2902
2903 impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2904 fn try_notify_read_executed_effects(
2905 &self,
2906 digests: &[TransactionDigest],
2907 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2908 std::future::ready(Ok(digests
2909 .iter()
2910 .map(|d| self.get(d).expect("effects not found").clone())
2911 .collect()))
2912 .boxed()
2913 }
2914
2915 fn try_notify_read_executed_effects_digests(
2916 &self,
2917 digests: &[TransactionDigest],
2918 ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2919 std::future::ready(Ok(digests
2920 .iter()
2921 .map(|d| {
2922 self.get(d)
2923 .map(|fx| fx.digest())
2924 .expect("effects not found")
2925 })
2926 .collect()))
2927 .boxed()
2928 }
2929
2930 fn try_multi_get_executed_effects(
2931 &self,
2932 digests: &[TransactionDigest],
2933 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2934 Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2935 }
2936
2937 fn try_multi_get_transaction_blocks(
2944 &self,
2945 _: &[TransactionDigest],
2946 ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2947 unimplemented!()
2948 }
2949
2950 fn try_multi_get_executed_effects_digests(
2951 &self,
2952 _: &[TransactionDigest],
2953 ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2954 unimplemented!()
2955 }
2956
2957 fn try_multi_get_effects(
2958 &self,
2959 _: &[TransactionEffectsDigest],
2960 ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2961 unimplemented!()
2962 }
2963
2964 fn try_multi_get_events(
2965 &self,
2966 _: &[TransactionEventsDigest],
2967 ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2968 unimplemented!()
2969 }
2970 }
2971
2972 #[async_trait::async_trait]
2973 impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2974 async fn checkpoint_created(
2975 &self,
2976 summary: &CheckpointSummary,
2977 contents: &CheckpointContents,
2978 _epoch_store: &Arc<AuthorityPerEpochStore>,
2979 _checkpoint_store: &Arc<CheckpointStore>,
2980 ) -> IotaResult {
2981 self.try_send((contents.clone(), summary.clone())).unwrap();
2982 Ok(())
2983 }
2984 }
2985
2986 #[async_trait::async_trait]
2987 impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2988 async fn certified_checkpoint_created(
2989 &self,
2990 summary: &CertifiedCheckpointSummary,
2991 ) -> IotaResult {
2992 self.try_send(summary.clone()).unwrap();
2993 Ok(())
2994 }
2995 }
2996
2997 fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2998 PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2999 roots: t
3000 .into_iter()
3001 .map(|t| TransactionKey::Digest(d(t)))
3002 .collect(),
3003 details: PendingCheckpointInfo {
3004 timestamp_ms,
3005 last_of_epoch: false,
3006 checkpoint_height: i,
3007 },
3008 })
3009 }
3010
3011 fn d(i: u8) -> TransactionDigest {
3012 let mut bytes: [u8; 32] = Default::default();
3013 bytes[0] = i;
3014 TransactionDigest::new(bytes)
3015 }
3016
3017 fn e(
3018 transaction_digest: TransactionDigest,
3019 dependencies: Vec<TransactionDigest>,
3020 gas_used: GasCostSummary,
3021 ) -> TransactionEffects {
3022 let mut effects = TransactionEffects::default();
3023 *effects.transaction_digest_mut_for_testing() = transaction_digest;
3024 *effects.dependencies_mut_for_testing() = dependencies;
3025 *effects.gas_cost_summary_mut_for_testing() = gas_used;
3026 effects
3027 }
3028
3029 fn commit_cert_for_test(
3030 store: &mut HashMap<TransactionDigest, TransactionEffects>,
3031 state: Arc<AuthorityState>,
3032 digest: TransactionDigest,
3033 dependencies: Vec<TransactionDigest>,
3034 gas_used: GasCostSummary,
3035 ) {
3036 let epoch_store = state.epoch_store_for_testing();
3037 let effects = e(digest, dependencies, gas_used);
3038 store.insert(digest, effects.clone());
3039 epoch_store
3040 .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
3041 .expect("Inserting cert fx and sigs should not fail");
3042 }
3043}