1use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31 accumulator::Accumulator,
32 base_types::{TransactionDigest, TransactionEffectsDigest},
33 crypto::RandomnessRound,
34 effects::{TransactionEffects, TransactionEffectsAPI},
35 executable_transaction::VerifiedExecutableTransaction,
36 full_checkpoint_content::CheckpointData,
37 inner_temporary_store::PackageStoreWithFallback,
38 message_envelope::Message,
39 messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber, VerifiedCheckpoint},
40 transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
41};
42use parking_lot::Mutex;
43use tap::{TapFallible, TapOptional};
44use tracing::{debug, info, instrument};
45
46use crate::{
47 authority::{
48 AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
49 backpressure::BackpressureManager,
50 },
51 checkpoints::CheckpointStore,
52 execution_cache::{ObjectCacheRead, TransactionCacheRead},
53 state_accumulator::StateAccumulator,
54 transaction_manager::TransactionManager,
55};
56
57mod data_ingestion_handler;
58pub mod metrics;
59pub(crate) mod utils;
60
61#[cfg(test)]
62pub(crate) mod tests;
63
64use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
65use metrics::CheckpointExecutorMetrics;
66use utils::*;
67
68const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
69
70#[derive(PartialEq, Eq, Debug)]
71pub enum StopReason {
72 EpochComplete,
73 RunWithRangeCondition,
74}
75
76pub(crate) struct CheckpointExecutionData {
77 pub checkpoint: VerifiedCheckpoint,
78 pub checkpoint_contents: CheckpointContents,
79 pub tx_digests: Vec<TransactionDigest>,
80 pub fx_digests: Vec<TransactionEffectsDigest>,
81 pub transactions: Vec<VerifiedExecutableTransaction>,
82 pub effects: Vec<TransactionEffects>,
83}
84
85pub(crate) struct CheckpointExecutionState {
86 pub data: CheckpointExecutionData,
87 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
88 accumulator: Option<Accumulator>,
89 full_data: Option<CheckpointData>,
90}
91
92impl CheckpointExecutionState {
93 pub fn new(
94 data: CheckpointExecutionData,
95 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
96 ) -> Self {
97 Self {
98 data,
99 executed_fx_digests,
100 accumulator: None,
101 full_data: None,
102 }
103 }
104}
105
106pub struct CheckpointExecutor {
107 epoch_store: Arc<AuthorityPerEpochStore>,
108 state: Arc<AuthorityState>,
109 checkpoint_store: Arc<CheckpointStore>,
110 object_cache_reader: Arc<dyn ObjectCacheRead>,
111 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
112 tx_manager: Arc<TransactionManager>,
113 accumulator: Arc<StateAccumulator>,
114 backpressure_manager: Arc<BackpressureManager>,
115 config: CheckpointExecutorConfig,
116 metrics: Arc<CheckpointExecutorMetrics>,
117 tps_estimator: Mutex<TPSEstimator>,
118}
119
120impl CheckpointExecutor {
121 pub fn new(
122 epoch_store: Arc<AuthorityPerEpochStore>,
123 checkpoint_store: Arc<CheckpointStore>,
124 state: Arc<AuthorityState>,
125 accumulator: Arc<StateAccumulator>,
126 backpressure_manager: Arc<BackpressureManager>,
127 config: CheckpointExecutorConfig,
128 metrics: Arc<CheckpointExecutorMetrics>,
129 ) -> Self {
130 Self {
131 epoch_store,
132 state: state.clone(),
133 checkpoint_store,
134 object_cache_reader: state.get_object_cache_reader().clone(),
135 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
136 tx_manager: state.transaction_manager().clone(),
137 accumulator,
138 backpressure_manager,
139 config,
140 metrics,
141 tps_estimator: Mutex::new(TPSEstimator::default()),
142 }
143 }
144
145 pub fn new_for_tests(
146 epoch_store: Arc<AuthorityPerEpochStore>,
147 checkpoint_store: Arc<CheckpointStore>,
148 state: Arc<AuthorityState>,
149 accumulator: Arc<StateAccumulator>,
150 ) -> Self {
151 Self::new(
152 epoch_store,
153 checkpoint_store.clone(),
154 state,
155 accumulator,
156 BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
157 Default::default(),
158 CheckpointExecutorMetrics::new_for_tests(),
159 )
160 }
161
162 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
165 let highest_executed = self
169 .checkpoint_store
170 .get_highest_executed_checkpoint()
171 .unwrap();
172
173 if let Some(highest_executed) = &highest_executed {
174 if self.epoch_store.epoch() == highest_executed.epoch()
175 && highest_executed.is_last_checkpoint_of_epoch()
176 {
177 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
180 return None;
181 }
182 }
183
184 Some(
185 highest_executed
186 .as_ref()
187 .map(|c| c.sequence_number() + 1)
188 .unwrap_or_else(|| {
189 assert_eq!(self.epoch_store.epoch(), 0);
191 0
193 }),
194 )
195 }
196
197 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
201 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
202 let _metrics_guard = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
203 info!(?run_with_range, "CheckpointExecutor::run_epoch");
204 debug!(
205 "Checkpoint executor running for epoch {:?}",
206 self.epoch_store.epoch(),
207 );
208
209 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
213 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
214 return StopReason::RunWithRangeCondition;
215 };
216
217 self.metrics
218 .checkpoint_exec_epoch
219 .set(self.epoch_store.epoch() as i64);
220
221 let Some(next_to_schedule) = self.get_next_to_schedule() else {
222 return StopReason::EpochComplete;
223 };
224
225 let this = Arc::new(self);
226
227 let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
228 .ok()
229 .and_then(|s| s.parse().ok())
230 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
231
232 let final_checkpoint_executed = stream_synced_checkpoints(
233 this.checkpoint_store.clone(),
234 next_to_schedule,
235 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
236 )
237 .map(|checkpoint| this.clone().execute_checkpoint(checkpoint))
239 .buffered(concurrency)
240 .map(|ckpt_state| this.clone().commit_checkpoint(ckpt_state))
243 .fold(false, |state, is_final_checkpoint| {
245 assert!(!state, "Cannot execute checkpoint after epoch end");
246 is_final_checkpoint
247 })
248 .await;
249
250 if final_checkpoint_executed {
251 StopReason::EpochComplete
252 } else {
253 StopReason::RunWithRangeCondition
254 }
255 }
256}
257
258impl CheckpointExecutor {
259 #[instrument(level = "debug", skip_all, fields(seq = ?ckpt_state.data.checkpoint.sequence_number()))]
262 async fn commit_checkpoint(self: Arc<Self>, mut ckpt_state: CheckpointExecutionState) -> bool {
264 tokio::task::spawn_blocking({
265 move || {
266 let _sequential_step_guard =
267 iota_metrics::monitored_scope("CheckpointExecutor::sequential_step");
268
269 let tps = self.tps_estimator.lock().update(
270 Instant::now(),
271 ckpt_state.data.checkpoint.network_total_transactions,
272 );
273 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
274
275 self.backpressure_manager
276 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
277
278 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
279
280 let seq = ckpt_state.data.checkpoint.sequence_number;
281
282 let cache_commit = self.state.get_cache_commit();
284 debug!(?seq, "committing checkpoint transactions to disk");
285 cache_commit.commit_transaction_outputs(
286 self.epoch_store.epoch(),
287 &ckpt_state.data.tx_digests,
288 );
289
290 self.epoch_store
291 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
292 .expect("cannot fail");
293
294 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
298 let randomness_rounds = self.extract_randomness_rounds(
299 &ckpt_state.data.checkpoint,
300 &ckpt_state.data.checkpoint_contents,
301 );
302 for round in randomness_rounds {
303 debug!(
304 ?round,
305 "notifying RandomnessReporter that randomness update was executed in checkpoint"
306 );
307 randomness_reporter
308 .notify_randomness_in_checkpoint(round)
309 .expect("epoch cannot have ended");
310 }
311 }
312
313 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
314 self.commit_index_updates(checkpoint_data);
315 }
316
317 self.accumulator
318 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.accumulator)
319 .expect("Failed to accumulate running root");
320
321 if is_final_checkpoint {
322 self.checkpoint_store
323 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
324 .expect("Failed to insert epoch last checkpoint");
325
326 self.accumulator
327 .accumulate_epoch(self.epoch_store.clone(), seq)
328 .expect("Accumulating epoch cannot fail");
329
330 self.checkpoint_store
331 .prune_local_summaries()
332 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
333 .ok();
334 }
335
336 fail_point!("crash");
337
338 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
339
340 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
341 }
342 }).await.unwrap()
343 }
344
345 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
348 async fn execute_checkpoint(
349 self: Arc<Self>,
350 checkpoint: VerifiedCheckpoint,
351 ) -> CheckpointExecutionState {
352 info!("executing checkpoint");
353
354 checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
355 self.backpressure_manager
356 .update_highest_certified_checkpoint(*checkpoint.sequence_number());
357
358 let sequence_number = checkpoint.sequence_number;
359 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
360 let _wait_for_previous_checkpoints_guard =
361 iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
362
363 info!(
364 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
365 );
366 self.checkpoint_store
367 .notify_read_executed_checkpoint(sequence_number - 1)
368 .await;
369 }
370
371 let _parallel_step_guard =
372 iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
373
374 let (mut ckpt_state, unexecuted_tx_digests) = tokio::task::spawn_blocking({
375 let this = self.clone();
376 move || {
377 let _scope =
378 iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
379 let ckpt_state = this.load_checkpoint_transactions(checkpoint);
380 let unexecuted_tx_digests = this.schedule_transaction_execution(&ckpt_state);
381 (ckpt_state, unexecuted_tx_digests)
382 }
383 })
384 .await
385 .unwrap();
386
387 self.transaction_cache_reader
388 .notify_read_executed_effects_digests(&unexecuted_tx_digests)
389 .await;
390
391 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
392 self.execute_change_epoch_tx(&ckpt_state).await;
393 }
394
395 tokio::task::spawn_blocking(move || {
396 let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
397 self.epoch_store
398 .insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number)
399 .expect("failed to insert finalized transactions");
400
401 if self.state.is_fullnode(&self.epoch_store) {
402 self.state.congestion_tracker.process_checkpoint_effects(
403 &*self.transaction_cache_reader,
404 &ckpt_state.data.checkpoint,
405 &ckpt_state.data.effects,
406 );
407 }
408
409 self.state
411 .get_checkpoint_cache()
412 .insert_finalized_transactions_perpetual_checkpoints(
413 &ckpt_state.data.tx_digests,
414 self.epoch_store.epoch(),
415 sequence_number,
416 );
417
418 ckpt_state.accumulator = Some(
422 self.accumulator
423 .accumulate_checkpoint(
424 &ckpt_state.data.effects,
425 sequence_number,
426 &self.epoch_store,
427 )
428 .expect("epoch cannot have ended"),
429 );
430
431 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state);
432 ckpt_state
433 })
434 .await
435 .unwrap()
436 }
437
438 fn checkpoint_data_enabled(&self) -> bool {
439 self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some()
440 }
441
442 fn process_checkpoint_data(
443 &self,
444 ckpt_state: &CheckpointExecutionState,
445 ) -> Option<CheckpointData> {
446 if !self.checkpoint_data_enabled() {
447 return None;
448 }
449
450 let checkpoint_data = load_checkpoint_data(
451 &ckpt_state.data,
452 &*self.object_cache_reader,
453 &*self.transaction_cache_reader,
454 )
455 .expect("failed to load checkpoint data");
456
457 if self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some() {
458 if let Some(rest_index) = &self.state.rest_index {
462 let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
463 Box::new(PackageStoreWithFallback::new(
464 self.state.get_backing_package_store(),
465 &checkpoint_data,
466 )),
467 );
468
469 rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
470 }
471
472 if let Some(path) = &self.config.data_ingestion_dir {
473 store_checkpoint_locally(path, &checkpoint_data)
474 .expect("failed to store checkpoint locally");
475 }
476 }
477
478 Some(checkpoint_data)
479 }
480
481 fn load_checkpoint_transactions(
483 &self,
484 checkpoint: VerifiedCheckpoint,
485 ) -> CheckpointExecutionState {
486 let seq = checkpoint.sequence_number;
487 let epoch = checkpoint.epoch;
488
489 let checkpoint_contents = self
490 .checkpoint_store
491 .get_checkpoint_contents(&checkpoint.content_digest)
492 .expect("db error")
493 .expect("checkpoint contents not found");
494
495 if let Some(full_contents) = self
497 .checkpoint_store
498 .get_full_checkpoint_contents_by_sequence_number(seq)
499 .expect("Failed to get checkpoint contents from store")
500 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
501 {
502 let num_txns = full_contents.size();
503 let mut tx_digests = Vec::with_capacity(num_txns);
504 let mut transactions = Vec::with_capacity(num_txns);
505 let mut effects = Vec::with_capacity(num_txns);
506 let mut fx_digests = Vec::with_capacity(num_txns);
507
508 full_contents
509 .into_iter()
510 .zip(checkpoint_contents.iter())
511 .for_each(|(execution_data, digests)| {
512 let tx_digest = digests.transaction;
513 let fx_digest = digests.effects;
514 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
515 debug_assert_eq!(fx_digest, execution_data.effects.digest());
516
517 tx_digests.push(tx_digest);
518 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
519 VerifiedTransaction::new_unchecked(execution_data.transaction),
520 epoch,
521 seq,
522 ));
523 effects.push(execution_data.effects);
524 fx_digests.push(fx_digest);
525 });
526
527 let executed_fx_digests = self
528 .transaction_cache_reader
529 .multi_get_executed_effects_digests(&tx_digests);
530
531 CheckpointExecutionState::new(
532 CheckpointExecutionData {
533 checkpoint,
534 checkpoint_contents,
535 tx_digests,
536 fx_digests,
537 transactions,
538 effects,
539 },
540 executed_fx_digests,
541 )
542 } else {
543 let digests = checkpoint_contents.inner();
546
547 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
548 digests.iter().map(|d| (d.transaction, d.effects)).unzip();
549 let transactions = self
550 .transaction_cache_reader
551 .multi_get_transaction_blocks(&tx_digests)
552 .into_iter()
553 .enumerate()
554 .map(|(i, tx)| {
555 let tx = tx
556 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
557 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
558 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
559 })
560 .collect();
561 let effects = self
562 .transaction_cache_reader
563 .multi_get_effects(&fx_digests)
564 .into_iter()
565 .enumerate()
566 .map(|(i, effect)| {
567 effect.unwrap_or_else(|| {
568 fatal!("checkpoint effect not found for {:?}", digests[i])
569 })
570 })
571 .collect();
572
573 let executed_fx_digests = self
574 .transaction_cache_reader
575 .multi_get_executed_effects_digests(&tx_digests);
576
577 CheckpointExecutionState::new(
578 CheckpointExecutionData {
579 checkpoint,
580 checkpoint_contents,
581 tx_digests,
582 fx_digests,
583 transactions,
584 effects,
585 },
586 executed_fx_digests,
587 )
588 }
589 }
590
591 fn schedule_transaction_execution(
593 &self,
594 ckpt_state: &CheckpointExecutionState,
595 ) -> Vec<TransactionDigest> {
596 let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
598 itertools::multiunzip(
599 itertools::izip!(
600 ckpt_state.data.transactions.iter(),
601 ckpt_state.data.tx_digests.iter(),
602 ckpt_state.data.fx_digests.iter(),
603 ckpt_state.data.effects.iter(),
604 ckpt_state.executed_fx_digests.iter()
605 )
606 .filter_map(
607 |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
608 if let Some(executed_fx_digest) = executed_fx_digest {
609 assert_not_forked(
610 &ckpt_state.data.checkpoint,
611 tx_digest,
612 expected_fx_digest,
613 executed_fx_digest,
614 &*self.transaction_cache_reader,
615 );
616 None
617 } else if txn.transaction_data().is_end_of_epoch_tx() {
618 None
619 } else {
620 Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
621 }
622 },
623 ),
624 );
625
626 for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
627 if tx.contains_shared_object() {
628 self.epoch_store
629 .acquire_shared_version_assignments_from_effects(
630 tx,
631 effects,
632 &*self.object_cache_reader,
633 )
634 .expect("failed to acquire shared version assignments");
635 }
636 }
637
638 self.tx_manager
640 .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
641
642 unexecuted_tx_digests
643 }
644
645 async fn execute_change_epoch_tx(&self, ckpt_state: &CheckpointExecutionState) {
647 let change_epoch_tx = ckpt_state.data.transactions.last().unwrap();
648 let change_epoch_fx = ckpt_state.data.effects.last().unwrap();
649 assert_eq!(
650 change_epoch_tx.digest(),
651 change_epoch_fx.transaction_digest()
652 );
653 assert!(
654 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
655 "final txn must be an end of epoch txn"
656 );
657
658 self.epoch_store
677 .acquire_shared_version_assignments_from_effects(
678 change_epoch_tx,
679 change_epoch_fx,
680 self.object_cache_reader.as_ref(),
681 )
682 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
683
684 info!(
685 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
686 change_epoch_tx.digest(),
687 change_epoch_fx.digest()
688 );
689 self.tx_manager.enqueue_with_expected_effects_digest(
690 vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
691 &self.epoch_store,
692 );
693
694 self.transaction_cache_reader
695 .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
696 .await;
697 }
698
699 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
702 let seq = *checkpoint.sequence_number();
704 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
705 if let Some(prev_highest) = self
706 .checkpoint_store
707 .get_highest_executed_checkpoint_seq_number()
708 .unwrap()
709 {
710 assert_eq!(prev_highest + 1, seq);
711 } else {
712 assert_eq!(seq, 0);
713 }
714 if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
715 info!("Finished syncing and executing checkpoint {}", seq);
716 }
717
718 fail_point!("highest-executed-checkpoint");
719
720 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
723 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
724 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
725 if let Some(prune_checkpoint) = self
726 .checkpoint_store
727 .get_checkpoint_by_sequence_number(prune_seq)
728 .expect("Failed to fetch checkpoint")
729 {
730 self.checkpoint_store
731 .delete_full_checkpoint_contents(prune_seq)
732 .expect("Failed to delete full checkpoint contents");
733 self.checkpoint_store
734 .delete_contents_digest_sequence_number_mapping(
735 &prune_checkpoint.content_digest,
736 )
737 .expect("Failed to delete contents digest -> sequence number mapping");
738 } else {
739 debug!(
743 "Failed to fetch checkpoint with sequence number {:?}",
744 prune_seq
745 );
746 }
747 }
748
749 self.checkpoint_store
750 .update_highest_executed_checkpoint(checkpoint)
751 .unwrap();
752 self.metrics.last_executed_checkpoint.set(seq as i64);
753
754 self.metrics
755 .last_executed_checkpoint_timestamp_ms
756 .set(checkpoint.timestamp_ms as i64);
757 checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
758 }
759
760 fn commit_index_updates(&self, checkpoint: CheckpointData) {
763 if let Some(rest_index) = &self.state.rest_index {
764 rest_index
765 .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
766 .expect("failed to update rest_indexes");
767 }
768 }
769
770 fn extract_randomness_rounds(
774 &self,
775 checkpoint: &VerifiedCheckpoint,
776 checkpoint_contents: &CheckpointContents,
777 ) -> Vec<RandomnessRound> {
778 if let Some(version_specific_data) = checkpoint
779 .version_specific_data(self.epoch_store.protocol_config())
780 .expect("unable to get version_specific_data")
781 {
782 version_specific_data.into_v1().randomness_rounds
785 } else {
786 assert_eq!(
791 0,
792 self.epoch_store
793 .protocol_config()
794 .min_checkpoint_interval_ms_as_option()
795 .unwrap_or_default(),
796 );
797 if let Some(first_digest) = checkpoint_contents.inner().first() {
798 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
799 .unwrap_or_else(||
800 fatal!(
801 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
802 checkpoint.sequence_number()
803 )
804 );
805 if let TransactionKind::RandomnessStateUpdate(rsu) =
806 maybe_randomness_tx.data().transaction_data().kind()
807 {
808 vec![rsu.randomness_round]
809 } else {
810 Vec::new()
811 }
812 } else {
813 Vec::new()
814 }
815 }
816 }
817}