1use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31 accumulator::Accumulator,
32 base_types::{TransactionDigest, TransactionEffectsDigest},
33 crypto::RandomnessRound,
34 effects::{TransactionEffects, TransactionEffectsAPI},
35 executable_transaction::VerifiedExecutableTransaction,
36 full_checkpoint_content::CheckpointData,
37 inner_temporary_store::PackageStoreWithFallback,
38 message_envelope::Message,
39 messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber, VerifiedCheckpoint},
40 transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
41};
42use parking_lot::Mutex;
43use tap::{TapFallible, TapOptional};
44use tracing::{debug, info, instrument};
45
46use crate::{
47 authority::{
48 AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
49 backpressure::BackpressureManager,
50 },
51 checkpoints::CheckpointStore,
52 execution_cache::{ObjectCacheRead, TransactionCacheRead},
53 state_accumulator::StateAccumulator,
54 transaction_manager::TransactionManager,
55};
56
57mod data_ingestion_handler;
58pub mod metrics;
59pub(crate) mod utils;
60
61#[cfg(test)]
62pub(crate) mod tests;
63
64use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
65use metrics::CheckpointExecutorMetrics;
66use utils::*;
67
68type CheckpointDataSender = Box<dyn Fn(&CheckpointData) + Send + Sync>;
69
70const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
71
72#[derive(PartialEq, Eq, Debug)]
73pub enum StopReason {
74 EpochComplete,
75 RunWithRangeCondition,
76}
77
78pub(crate) struct CheckpointExecutionData {
79 pub checkpoint: VerifiedCheckpoint,
80 pub checkpoint_contents: CheckpointContents,
81 pub tx_digests: Vec<TransactionDigest>,
82 pub fx_digests: Vec<TransactionEffectsDigest>,
83 pub transactions: Vec<VerifiedExecutableTransaction>,
84 pub effects: Vec<TransactionEffects>,
85}
86
87pub(crate) struct CheckpointExecutionState {
88 pub data: CheckpointExecutionData,
89 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
90 accumulator: Option<Accumulator>,
91 full_data: Option<CheckpointData>,
92}
93
94impl CheckpointExecutionState {
95 pub fn new(
96 data: CheckpointExecutionData,
97 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
98 ) -> Self {
99 Self {
100 data,
101 executed_fx_digests,
102 accumulator: None,
103 full_data: None,
104 }
105 }
106}
107
108pub struct CheckpointExecutor {
109 epoch_store: Arc<AuthorityPerEpochStore>,
110 state: Arc<AuthorityState>,
111 checkpoint_store: Arc<CheckpointStore>,
112 object_cache_reader: Arc<dyn ObjectCacheRead>,
113 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
114 tx_manager: Arc<TransactionManager>,
115 accumulator: Arc<StateAccumulator>,
116 backpressure_manager: Arc<BackpressureManager>,
117 config: CheckpointExecutorConfig,
118 metrics: Arc<CheckpointExecutorMetrics>,
119 tps_estimator: Mutex<TPSEstimator>,
120 data_sender: Option<CheckpointDataSender>,
121}
122
123impl CheckpointExecutor {
124 pub fn new(
125 epoch_store: Arc<AuthorityPerEpochStore>,
126 checkpoint_store: Arc<CheckpointStore>,
127 state: Arc<AuthorityState>,
128 accumulator: Arc<StateAccumulator>,
129 backpressure_manager: Arc<BackpressureManager>,
130 config: CheckpointExecutorConfig,
131 metrics: Arc<CheckpointExecutorMetrics>,
132 data_sender: Option<CheckpointDataSender>,
133 ) -> Self {
134 Self {
135 epoch_store,
136 state: state.clone(),
137 checkpoint_store,
138 object_cache_reader: state.get_object_cache_reader().clone(),
139 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
140 tx_manager: state.transaction_manager().clone(),
141 accumulator,
142 backpressure_manager,
143 config,
144 metrics,
145 tps_estimator: Mutex::new(TPSEstimator::default()),
146 data_sender,
147 }
148 }
149
150 pub fn new_for_tests(
151 epoch_store: Arc<AuthorityPerEpochStore>,
152 checkpoint_store: Arc<CheckpointStore>,
153 state: Arc<AuthorityState>,
154 accumulator: Arc<StateAccumulator>,
155 ) -> Self {
156 Self::new(
157 epoch_store,
158 checkpoint_store.clone(),
159 state,
160 accumulator,
161 BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
162 Default::default(),
163 CheckpointExecutorMetrics::new_for_tests(),
164 None, )
166 }
167
168 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
171 let highest_executed = self
175 .checkpoint_store
176 .get_highest_executed_checkpoint()
177 .unwrap();
178
179 if let Some(highest_executed) = &highest_executed {
180 if self.epoch_store.epoch() == highest_executed.epoch()
181 && highest_executed.is_last_checkpoint_of_epoch()
182 {
183 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
186 return None;
187 }
188 }
189
190 Some(
191 highest_executed
192 .as_ref()
193 .map(|c| c.sequence_number() + 1)
194 .unwrap_or_else(|| {
195 assert_eq!(self.epoch_store.epoch(), 0);
197 0
199 }),
200 )
201 }
202
203 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
207 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
208 let _metrics_guard = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
209 info!(?run_with_range, "CheckpointExecutor::run_epoch");
210 debug!(
211 "Checkpoint executor running for epoch {:?}",
212 self.epoch_store.epoch(),
213 );
214
215 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
219 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
220 return StopReason::RunWithRangeCondition;
221 };
222
223 self.metrics
224 .checkpoint_exec_epoch
225 .set(self.epoch_store.epoch() as i64);
226
227 let Some(next_to_schedule) = self.get_next_to_schedule() else {
228 return StopReason::EpochComplete;
229 };
230
231 let this = Arc::new(self);
232
233 let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
234 .ok()
235 .and_then(|s| s.parse().ok())
236 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
237
238 let final_checkpoint_executed = stream_synced_checkpoints(
239 this.checkpoint_store.clone(),
240 next_to_schedule,
241 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
242 )
243 .map(|checkpoint| this.clone().execute_checkpoint(checkpoint))
245 .buffered(concurrency)
246 .map(|ckpt_state| this.clone().commit_checkpoint(ckpt_state))
249 .fold(false, |state, is_final_checkpoint| {
251 assert!(!state, "Cannot execute checkpoint after epoch end");
252 is_final_checkpoint
253 })
254 .await;
255
256 if final_checkpoint_executed {
257 StopReason::EpochComplete
258 } else {
259 StopReason::RunWithRangeCondition
260 }
261 }
262}
263
264impl CheckpointExecutor {
265 #[instrument(level = "debug", skip_all, fields(seq = ?ckpt_state.data.checkpoint.sequence_number()))]
268 async fn commit_checkpoint(self: Arc<Self>, mut ckpt_state: CheckpointExecutionState) -> bool {
270 tokio::task::spawn_blocking({
271 move || {
272 let _sequential_step_guard =
273 iota_metrics::monitored_scope("CheckpointExecutor::sequential_step");
274
275 let tps = self.tps_estimator.lock().update(
276 Instant::now(),
277 ckpt_state.data.checkpoint.network_total_transactions,
278 );
279 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
280
281 self.backpressure_manager
282 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
283
284 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
285
286 let seq = ckpt_state.data.checkpoint.sequence_number;
287
288 let cache_commit = self.state.get_cache_commit();
290 debug!(?seq, "committing checkpoint transactions to disk");
291 cache_commit.commit_transaction_outputs(
292 self.epoch_store.epoch(),
293 &ckpt_state.data.tx_digests,
294 );
295
296 self.epoch_store
297 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
298 .expect("cannot fail");
299
300 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
304 let randomness_rounds = self.extract_randomness_rounds(
305 &ckpt_state.data.checkpoint,
306 &ckpt_state.data.checkpoint_contents,
307 );
308 for round in randomness_rounds {
309 debug!(
310 ?round,
311 "notifying RandomnessReporter that randomness update was executed in checkpoint"
312 );
313 randomness_reporter
314 .notify_randomness_in_checkpoint(round)
315 .expect("epoch cannot have ended");
316 }
317 }
318
319 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
320 self.commit_index_updates(checkpoint_data);
321 }
322
323 self.accumulator
324 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.accumulator)
325 .expect("Failed to accumulate running root");
326
327 if is_final_checkpoint {
328 self.checkpoint_store
329 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
330 .expect("Failed to insert epoch last checkpoint");
331
332 self.accumulator
333 .accumulate_epoch(self.epoch_store.clone(), seq)
334 .expect("Accumulating epoch cannot fail");
335
336 self.checkpoint_store
337 .prune_local_summaries()
338 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
339 .ok();
340 }
341
342 fail_point!("crash");
343
344 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
345
346 self.broadcast_checkpoint(
347 &ckpt_state.data,
348 ckpt_state.full_data.as_ref(),
349 );
350
351 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
352 }
353 }).await.unwrap()
354 }
355
356 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
359 async fn execute_checkpoint(
360 self: Arc<Self>,
361 checkpoint: VerifiedCheckpoint,
362 ) -> CheckpointExecutionState {
363 info!("executing checkpoint");
364
365 checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
366 self.backpressure_manager
367 .update_highest_certified_checkpoint(*checkpoint.sequence_number());
368
369 let sequence_number = checkpoint.sequence_number;
370 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
371 let _wait_for_previous_checkpoints_guard =
372 iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
373
374 info!(
375 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
376 );
377 self.checkpoint_store
378 .notify_read_executed_checkpoint(sequence_number - 1)
379 .await;
380 }
381
382 let _parallel_step_guard =
383 iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
384
385 let (mut ckpt_state, unexecuted_tx_digests) = tokio::task::spawn_blocking({
386 let this = self.clone();
387 move || {
388 let _scope =
389 iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
390 let ckpt_state = this.load_checkpoint_transactions(checkpoint);
391 let unexecuted_tx_digests = this.schedule_transaction_execution(&ckpt_state);
392 (ckpt_state, unexecuted_tx_digests)
393 }
394 })
395 .await
396 .unwrap();
397
398 self.transaction_cache_reader
399 .notify_read_executed_effects_digests(&unexecuted_tx_digests)
400 .await;
401
402 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
403 self.execute_change_epoch_tx(&ckpt_state).await;
404 }
405
406 tokio::task::spawn_blocking(move || {
407 let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
408 self.epoch_store
409 .insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number)
410 .expect("failed to insert finalized transactions");
411
412 if self.state.is_fullnode(&self.epoch_store) {
413 self.state.congestion_tracker.process_checkpoint_effects(
414 &*self.transaction_cache_reader,
415 &ckpt_state.data.checkpoint,
416 &ckpt_state.data.effects,
417 );
418 }
419
420 self.state
422 .get_checkpoint_cache()
423 .insert_finalized_transactions_perpetual_checkpoints(
424 &ckpt_state.data.tx_digests,
425 self.epoch_store.epoch(),
426 sequence_number,
427 );
428
429 ckpt_state.accumulator = Some(
433 self.accumulator
434 .accumulate_checkpoint(
435 &ckpt_state.data.effects,
436 sequence_number,
437 &self.epoch_store,
438 )
439 .expect("epoch cannot have ended"),
440 );
441
442 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state);
443 ckpt_state
444 })
445 .await
446 .unwrap()
447 }
448
449 fn checkpoint_data_enabled(&self) -> bool {
450 self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some()
451 }
452
453 fn process_checkpoint_data(
454 &self,
455 ckpt_state: &CheckpointExecutionState,
456 ) -> Option<CheckpointData> {
457 if !self.checkpoint_data_enabled() {
458 return None;
459 }
460
461 let checkpoint_data = load_checkpoint_data(
462 &ckpt_state.data,
463 self.state.get_object_store(),
464 &*self.transaction_cache_reader,
465 )
466 .expect("failed to load checkpoint data");
467
468 if self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some() {
469 if let Some(rest_index) = &self.state.rest_index {
473 let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
474 Box::new(PackageStoreWithFallback::new(
475 self.state.get_backing_package_store(),
476 &checkpoint_data,
477 )),
478 );
479
480 rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
481 }
482
483 if let Some(path) = &self.config.data_ingestion_dir {
484 store_checkpoint_locally(path, &checkpoint_data)
485 .expect("failed to store checkpoint locally");
486 }
487 }
488
489 Some(checkpoint_data)
490 }
491
492 fn load_checkpoint_transactions(
494 &self,
495 checkpoint: VerifiedCheckpoint,
496 ) -> CheckpointExecutionState {
497 let seq = checkpoint.sequence_number;
498 let epoch = checkpoint.epoch;
499
500 let checkpoint_contents = self
501 .checkpoint_store
502 .get_checkpoint_contents(&checkpoint.content_digest)
503 .expect("db error")
504 .expect("checkpoint contents not found");
505
506 if let Some(full_contents) = self
508 .checkpoint_store
509 .get_full_checkpoint_contents_by_sequence_number(seq)
510 .expect("Failed to get checkpoint contents from store")
511 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
512 {
513 let num_txns = full_contents.size();
514 let mut tx_digests = Vec::with_capacity(num_txns);
515 let mut transactions = Vec::with_capacity(num_txns);
516 let mut effects = Vec::with_capacity(num_txns);
517 let mut fx_digests = Vec::with_capacity(num_txns);
518
519 full_contents
520 .into_iter()
521 .zip(checkpoint_contents.iter())
522 .for_each(|(execution_data, digests)| {
523 let tx_digest = digests.transaction;
524 let fx_digest = digests.effects;
525 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
526 debug_assert_eq!(fx_digest, execution_data.effects.digest());
527
528 tx_digests.push(tx_digest);
529 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
530 VerifiedTransaction::new_unchecked(execution_data.transaction),
531 epoch,
532 seq,
533 ));
534 effects.push(execution_data.effects);
535 fx_digests.push(fx_digest);
536 });
537
538 let executed_fx_digests = self
539 .transaction_cache_reader
540 .multi_get_executed_effects_digests(&tx_digests);
541
542 CheckpointExecutionState::new(
543 CheckpointExecutionData {
544 checkpoint,
545 checkpoint_contents,
546 tx_digests,
547 fx_digests,
548 transactions,
549 effects,
550 },
551 executed_fx_digests,
552 )
553 } else {
554 let digests = checkpoint_contents.inner();
557
558 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
559 digests.iter().map(|d| (d.transaction, d.effects)).unzip();
560 let transactions = self
561 .transaction_cache_reader
562 .multi_get_transaction_blocks(&tx_digests)
563 .into_iter()
564 .enumerate()
565 .map(|(i, tx)| {
566 let tx = tx
567 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
568 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
569 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
570 })
571 .collect();
572 let effects = self
573 .transaction_cache_reader
574 .multi_get_effects(&fx_digests)
575 .into_iter()
576 .enumerate()
577 .map(|(i, effect)| {
578 effect.unwrap_or_else(|| {
579 fatal!("checkpoint effect not found for {:?}", digests[i])
580 })
581 })
582 .collect();
583
584 let executed_fx_digests = self
585 .transaction_cache_reader
586 .multi_get_executed_effects_digests(&tx_digests);
587
588 CheckpointExecutionState::new(
589 CheckpointExecutionData {
590 checkpoint,
591 checkpoint_contents,
592 tx_digests,
593 fx_digests,
594 transactions,
595 effects,
596 },
597 executed_fx_digests,
598 )
599 }
600 }
601
602 fn schedule_transaction_execution(
604 &self,
605 ckpt_state: &CheckpointExecutionState,
606 ) -> Vec<TransactionDigest> {
607 let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
609 itertools::multiunzip(
610 itertools::izip!(
611 ckpt_state.data.transactions.iter(),
612 ckpt_state.data.tx_digests.iter(),
613 ckpt_state.data.fx_digests.iter(),
614 ckpt_state.data.effects.iter(),
615 ckpt_state.executed_fx_digests.iter()
616 )
617 .filter_map(
618 |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
619 if let Some(executed_fx_digest) = executed_fx_digest {
620 assert_not_forked(
621 &ckpt_state.data.checkpoint,
622 tx_digest,
623 expected_fx_digest,
624 executed_fx_digest,
625 &*self.transaction_cache_reader,
626 );
627 None
628 } else if txn.transaction_data().is_end_of_epoch_tx() {
629 None
630 } else {
631 Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
632 }
633 },
634 ),
635 );
636
637 for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
638 if tx.contains_shared_object() {
639 self.epoch_store
640 .acquire_shared_version_assignments_from_effects(
641 tx,
642 effects,
643 &*self.object_cache_reader,
644 )
645 .expect("failed to acquire shared version assignments");
646 }
647 }
648
649 self.tx_manager
651 .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
652
653 unexecuted_tx_digests
654 }
655
656 async fn execute_change_epoch_tx(&self, ckpt_state: &CheckpointExecutionState) {
658 let change_epoch_tx = ckpt_state.data.transactions.last().unwrap();
659 let change_epoch_fx = ckpt_state.data.effects.last().unwrap();
660 assert_eq!(
661 change_epoch_tx.digest(),
662 change_epoch_fx.transaction_digest()
663 );
664 assert!(
665 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
666 "final txn must be an end of epoch txn"
667 );
668
669 self.epoch_store
688 .acquire_shared_version_assignments_from_effects(
689 change_epoch_tx,
690 change_epoch_fx,
691 self.object_cache_reader.as_ref(),
692 )
693 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
694
695 info!(
696 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
697 change_epoch_tx.digest(),
698 change_epoch_fx.digest()
699 );
700 self.tx_manager.enqueue_with_expected_effects_digest(
701 vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
702 &self.epoch_store,
703 );
704
705 self.transaction_cache_reader
706 .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
707 .await;
708 }
709
710 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
713 let seq = *checkpoint.sequence_number();
715 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
716 if let Some(prev_highest) = self
717 .checkpoint_store
718 .get_highest_executed_checkpoint_seq_number()
719 .unwrap()
720 {
721 assert_eq!(prev_highest + 1, seq);
722 } else {
723 assert_eq!(seq, 0);
724 }
725 if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
726 info!("Finished syncing and executing checkpoint {}", seq);
727 }
728
729 fail_point!("highest-executed-checkpoint");
730
731 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
734 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
735 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
736 if let Some(prune_checkpoint) = self
737 .checkpoint_store
738 .get_checkpoint_by_sequence_number(prune_seq)
739 .expect("Failed to fetch checkpoint")
740 {
741 self.checkpoint_store
742 .delete_full_checkpoint_contents(prune_seq)
743 .expect("Failed to delete full checkpoint contents");
744 self.checkpoint_store
745 .delete_contents_digest_sequence_number_mapping(
746 &prune_checkpoint.content_digest,
747 )
748 .expect("Failed to delete contents digest -> sequence number mapping");
749 } else {
750 debug!(
754 "Failed to fetch checkpoint with sequence number {:?}",
755 prune_seq
756 );
757 }
758 }
759
760 self.checkpoint_store
761 .update_highest_executed_checkpoint(checkpoint)
762 .unwrap();
763 self.metrics.last_executed_checkpoint.set(seq as i64);
764
765 self.metrics
766 .last_executed_checkpoint_timestamp_ms
767 .set(checkpoint.timestamp_ms as i64);
768 checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
769 }
770
771 fn broadcast_checkpoint(
774 &self,
775 checkpoint_exec_data: &CheckpointExecutionData,
776 checkpoint_data: Option<&CheckpointData>,
777 ) {
778 if let Some(data_sender) = &self.data_sender {
779 let checkpoint_data = if let Some(data) = checkpoint_data {
780 data.clone()
781 } else {
782 load_checkpoint_data(
783 checkpoint_exec_data,
784 self.state.get_object_store(),
785 self.transaction_cache_reader.as_ref(),
786 )
787 .expect("Failed to load full CheckpointData")
788 };
789 data_sender(&checkpoint_data);
790 }
791
792 debug!(
793 "[Fullnode] Full CheckpointData is available: seq={}",
794 checkpoint_exec_data.checkpoint.sequence_number()
795 );
796 }
797
798 fn commit_index_updates(&self, checkpoint: CheckpointData) {
801 if let Some(rest_index) = &self.state.rest_index {
802 rest_index
803 .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
804 .expect("failed to update rest_indexes");
805 }
806 }
807
808 fn extract_randomness_rounds(
812 &self,
813 checkpoint: &VerifiedCheckpoint,
814 checkpoint_contents: &CheckpointContents,
815 ) -> Vec<RandomnessRound> {
816 if let Some(version_specific_data) = checkpoint
817 .version_specific_data(self.epoch_store.protocol_config())
818 .expect("unable to get version_specific_data")
819 {
820 version_specific_data.into_v1().randomness_rounds
823 } else {
824 assert_eq!(
829 0,
830 self.epoch_store
831 .protocol_config()
832 .min_checkpoint_interval_ms_as_option()
833 .unwrap_or_default(),
834 );
835 if let Some(first_digest) = checkpoint_contents.inner().first() {
836 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
837 .unwrap_or_else(||
838 fatal!(
839 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
840 checkpoint.sequence_number()
841 )
842 );
843 if let TransactionKind::RandomnessStateUpdate(rsu) =
844 maybe_randomness_tx.data().transaction_data().kind()
845 {
846 vec![rsu.randomness_round]
847 } else {
848 Vec::new()
849 }
850 } else {
851 Vec::new()
852 }
853 }
854 }
855}