1use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31 accumulator::Accumulator,
32 base_types::{TransactionDigest, TransactionEffectsDigest},
33 crypto::RandomnessRound,
34 effects::{TransactionEffects, TransactionEffectsAPI},
35 executable_transaction::VerifiedExecutableTransaction,
36 full_checkpoint_content::CheckpointData,
37 inner_temporary_store::PackageStoreWithFallback,
38 message_envelope::Message,
39 messages_checkpoint::{
40 CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
41 VerifiedCheckpoint,
42 },
43 transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
44};
45use parking_lot::Mutex;
46use tap::{TapFallible, TapOptional};
47use tracing::{debug, info, instrument};
48
49use crate::{
50 authority::{
51 AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
52 backpressure::BackpressureManager,
53 },
54 checkpoints::CheckpointStore,
55 execution_cache::{ObjectCacheRead, TransactionCacheRead},
56 state_accumulator::StateAccumulator,
57 transaction_manager::TransactionManager,
58};
59
60mod data_ingestion_handler;
61pub mod metrics;
62pub(crate) mod utils;
63
64#[cfg(test)]
65pub(crate) mod tests;
66
67use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
68use metrics::CheckpointExecutorMetrics;
69use utils::*;
70
71type CheckpointSummarySender = Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>;
72type CheckpointDataSender = Box<dyn Fn(&CheckpointData) + Send + Sync>;
73
74const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
75
76#[derive(PartialEq, Eq, Debug)]
77pub enum StopReason {
78 EpochComplete,
79 RunWithRangeCondition,
80}
81
82pub(crate) struct CheckpointExecutionData {
83 pub checkpoint: VerifiedCheckpoint,
84 pub checkpoint_contents: CheckpointContents,
85 pub tx_digests: Vec<TransactionDigest>,
86 pub fx_digests: Vec<TransactionEffectsDigest>,
87 pub transactions: Vec<VerifiedExecutableTransaction>,
88 pub effects: Vec<TransactionEffects>,
89}
90
91pub(crate) struct CheckpointExecutionState {
92 pub data: CheckpointExecutionData,
93 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
94 accumulator: Option<Accumulator>,
95 full_data: Option<CheckpointData>,
96}
97
98impl CheckpointExecutionState {
99 pub fn new(
100 data: CheckpointExecutionData,
101 executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
102 ) -> Self {
103 Self {
104 data,
105 executed_fx_digests,
106 accumulator: None,
107 full_data: None,
108 }
109 }
110}
111
112pub struct CheckpointExecutor {
113 epoch_store: Arc<AuthorityPerEpochStore>,
114 state: Arc<AuthorityState>,
115 checkpoint_store: Arc<CheckpointStore>,
116 object_cache_reader: Arc<dyn ObjectCacheRead>,
117 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
118 tx_manager: Arc<TransactionManager>,
119 accumulator: Arc<StateAccumulator>,
120 backpressure_manager: Arc<BackpressureManager>,
121 config: CheckpointExecutorConfig,
122 metrics: Arc<CheckpointExecutorMetrics>,
123 tps_estimator: Mutex<TPSEstimator>,
124 summary_sender: Option<CheckpointSummarySender>,
125 data_sender: Option<CheckpointDataSender>,
126}
127
128impl CheckpointExecutor {
129 pub fn new(
130 epoch_store: Arc<AuthorityPerEpochStore>,
131 checkpoint_store: Arc<CheckpointStore>,
132 state: Arc<AuthorityState>,
133 accumulator: Arc<StateAccumulator>,
134 backpressure_manager: Arc<BackpressureManager>,
135 config: CheckpointExecutorConfig,
136 metrics: Arc<CheckpointExecutorMetrics>,
137 summary_sender: Option<CheckpointSummarySender>,
138 data_sender: Option<CheckpointDataSender>,
139 ) -> Self {
140 Self {
141 epoch_store,
142 state: state.clone(),
143 checkpoint_store,
144 object_cache_reader: state.get_object_cache_reader().clone(),
145 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
146 tx_manager: state.transaction_manager().clone(),
147 accumulator,
148 backpressure_manager,
149 config,
150 metrics,
151 tps_estimator: Mutex::new(TPSEstimator::default()),
152 summary_sender,
153 data_sender,
154 }
155 }
156
157 pub fn new_for_tests(
158 epoch_store: Arc<AuthorityPerEpochStore>,
159 checkpoint_store: Arc<CheckpointStore>,
160 state: Arc<AuthorityState>,
161 accumulator: Arc<StateAccumulator>,
162 ) -> Self {
163 Self::new(
164 epoch_store,
165 checkpoint_store.clone(),
166 state,
167 accumulator,
168 BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
169 Default::default(),
170 CheckpointExecutorMetrics::new_for_tests(),
171 None, None, )
174 }
175
176 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
179 let highest_executed = self
183 .checkpoint_store
184 .get_highest_executed_checkpoint()
185 .unwrap();
186
187 if let Some(highest_executed) = &highest_executed {
188 if self.epoch_store.epoch() == highest_executed.epoch()
189 && highest_executed.is_last_checkpoint_of_epoch()
190 {
191 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
194 return None;
195 }
196 }
197
198 Some(
199 highest_executed
200 .as_ref()
201 .map(|c| c.sequence_number() + 1)
202 .unwrap_or_else(|| {
203 assert_eq!(self.epoch_store.epoch(), 0);
205 0
207 }),
208 )
209 }
210
211 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
215 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
216 let _metrics_guard = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
217 info!(?run_with_range, "CheckpointExecutor::run_epoch");
218 debug!(
219 "Checkpoint executor running for epoch {:?}",
220 self.epoch_store.epoch(),
221 );
222
223 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
227 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
228 return StopReason::RunWithRangeCondition;
229 };
230
231 self.metrics
232 .checkpoint_exec_epoch
233 .set(self.epoch_store.epoch() as i64);
234
235 let Some(next_to_schedule) = self.get_next_to_schedule() else {
236 return StopReason::EpochComplete;
237 };
238
239 let this = Arc::new(self);
240
241 let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
242 .ok()
243 .and_then(|s| s.parse().ok())
244 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
245
246 let final_checkpoint_executed = stream_synced_checkpoints(
247 this.checkpoint_store.clone(),
248 next_to_schedule,
249 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
250 )
251 .map(|checkpoint| this.clone().execute_checkpoint(checkpoint))
253 .buffered(concurrency)
254 .map(|ckpt_state| this.clone().commit_checkpoint(ckpt_state))
257 .fold(false, |state, is_final_checkpoint| {
259 assert!(!state, "Cannot execute checkpoint after epoch end");
260 is_final_checkpoint
261 })
262 .await;
263
264 if final_checkpoint_executed {
265 StopReason::EpochComplete
266 } else {
267 StopReason::RunWithRangeCondition
268 }
269 }
270}
271
272impl CheckpointExecutor {
273 #[instrument(level = "debug", skip_all, fields(seq = ?ckpt_state.data.checkpoint.sequence_number()))]
276 async fn commit_checkpoint(self: Arc<Self>, mut ckpt_state: CheckpointExecutionState) -> bool {
278 tokio::task::spawn_blocking({
279 move || {
280 let _sequential_step_guard =
281 iota_metrics::monitored_scope("CheckpointExecutor::sequential_step");
282
283 let tps = self.tps_estimator.lock().update(
284 Instant::now(),
285 ckpt_state.data.checkpoint.network_total_transactions,
286 );
287 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
288
289 self.backpressure_manager
290 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
291
292 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
293
294 let seq = ckpt_state.data.checkpoint.sequence_number;
295
296 let cache_commit = self.state.get_cache_commit();
298 debug!(?seq, "committing checkpoint transactions to disk");
299 cache_commit.commit_transaction_outputs(
300 self.epoch_store.epoch(),
301 &ckpt_state.data.tx_digests,
302 );
303
304 self.epoch_store
305 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
306 .expect("cannot fail");
307
308 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
312 let randomness_rounds = self.extract_randomness_rounds(
313 &ckpt_state.data.checkpoint,
314 &ckpt_state.data.checkpoint_contents,
315 );
316 for round in randomness_rounds {
317 debug!(
318 ?round,
319 "notifying RandomnessReporter that randomness update was executed in checkpoint"
320 );
321 randomness_reporter
322 .notify_randomness_in_checkpoint(round)
323 .expect("epoch cannot have ended");
324 }
325 }
326
327 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
328 self.commit_index_updates(checkpoint_data);
329 }
330
331 self.accumulator
332 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.accumulator)
333 .expect("Failed to accumulate running root");
334
335 if is_final_checkpoint {
336 self.checkpoint_store
337 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
338 .expect("Failed to insert epoch last checkpoint");
339
340 self.accumulator
341 .accumulate_epoch(self.epoch_store.clone(), seq)
342 .expect("Accumulating epoch cannot fail");
343
344 self.checkpoint_store
345 .prune_local_summaries()
346 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
347 .ok();
348 }
349
350 fail_point!("crash");
351
352 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
353
354 self.broadcast_checkpoint(
355 &ckpt_state.data,
356 ckpt_state.full_data.as_ref(),
357 );
358
359 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
360 }
361 }).await.unwrap()
362 }
363
364 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
367 async fn execute_checkpoint(
368 self: Arc<Self>,
369 checkpoint: VerifiedCheckpoint,
370 ) -> CheckpointExecutionState {
371 info!("executing checkpoint");
372
373 checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
374 self.backpressure_manager
375 .update_highest_certified_checkpoint(*checkpoint.sequence_number());
376
377 let sequence_number = checkpoint.sequence_number;
378 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
379 let _wait_for_previous_checkpoints_guard =
380 iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
381
382 info!(
383 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
384 );
385 self.checkpoint_store
386 .notify_read_executed_checkpoint(sequence_number - 1)
387 .await;
388 }
389
390 let _parallel_step_guard =
391 iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
392
393 let (mut ckpt_state, unexecuted_tx_digests) = tokio::task::spawn_blocking({
394 let this = self.clone();
395 move || {
396 let _scope =
397 iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
398 let ckpt_state = this.load_checkpoint_transactions(checkpoint);
399 let unexecuted_tx_digests = this.schedule_transaction_execution(&ckpt_state);
400 (ckpt_state, unexecuted_tx_digests)
401 }
402 })
403 .await
404 .unwrap();
405
406 self.transaction_cache_reader
407 .notify_read_executed_effects_digests(&unexecuted_tx_digests)
408 .await;
409
410 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
411 self.execute_change_epoch_tx(&ckpt_state).await;
412 }
413
414 tokio::task::spawn_blocking(move || {
415 let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
416 self.epoch_store
417 .insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number)
418 .expect("failed to insert finalized transactions");
419
420 if self.state.is_fullnode(&self.epoch_store) {
421 self.state.congestion_tracker.process_checkpoint_effects(
422 &*self.transaction_cache_reader,
423 &ckpt_state.data.checkpoint,
424 &ckpt_state.data.effects,
425 );
426 }
427
428 self.state
430 .get_checkpoint_cache()
431 .insert_finalized_transactions_perpetual_checkpoints(
432 &ckpt_state.data.tx_digests,
433 self.epoch_store.epoch(),
434 sequence_number,
435 );
436
437 ckpt_state.accumulator = Some(
441 self.accumulator
442 .accumulate_checkpoint(
443 &ckpt_state.data.effects,
444 sequence_number,
445 &self.epoch_store,
446 )
447 .expect("epoch cannot have ended"),
448 );
449
450 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state);
451 ckpt_state
452 })
453 .await
454 .unwrap()
455 }
456
457 fn checkpoint_data_enabled(&self) -> bool {
458 self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some()
459 }
460
461 fn process_checkpoint_data(
462 &self,
463 ckpt_state: &CheckpointExecutionState,
464 ) -> Option<CheckpointData> {
465 if !self.checkpoint_data_enabled() {
466 return None;
467 }
468
469 let checkpoint_data = load_checkpoint_data(
470 &ckpt_state.data,
471 &*self.object_cache_reader,
472 &*self.transaction_cache_reader,
473 )
474 .expect("failed to load checkpoint data");
475
476 if self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some() {
477 if let Some(rest_index) = &self.state.rest_index {
481 let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
482 Box::new(PackageStoreWithFallback::new(
483 self.state.get_backing_package_store(),
484 &checkpoint_data,
485 )),
486 );
487
488 rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
489 }
490
491 if let Some(path) = &self.config.data_ingestion_dir {
492 store_checkpoint_locally(path, &checkpoint_data)
493 .expect("failed to store checkpoint locally");
494 }
495 }
496
497 Some(checkpoint_data)
498 }
499
500 fn load_checkpoint_transactions(
502 &self,
503 checkpoint: VerifiedCheckpoint,
504 ) -> CheckpointExecutionState {
505 let seq = checkpoint.sequence_number;
506 let epoch = checkpoint.epoch;
507
508 let checkpoint_contents = self
509 .checkpoint_store
510 .get_checkpoint_contents(&checkpoint.content_digest)
511 .expect("db error")
512 .expect("checkpoint contents not found");
513
514 if let Some(full_contents) = self
516 .checkpoint_store
517 .get_full_checkpoint_contents_by_sequence_number(seq)
518 .expect("Failed to get checkpoint contents from store")
519 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
520 {
521 let num_txns = full_contents.size();
522 let mut tx_digests = Vec::with_capacity(num_txns);
523 let mut transactions = Vec::with_capacity(num_txns);
524 let mut effects = Vec::with_capacity(num_txns);
525 let mut fx_digests = Vec::with_capacity(num_txns);
526
527 full_contents
528 .into_iter()
529 .zip(checkpoint_contents.iter())
530 .for_each(|(execution_data, digests)| {
531 let tx_digest = digests.transaction;
532 let fx_digest = digests.effects;
533 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
534 debug_assert_eq!(fx_digest, execution_data.effects.digest());
535
536 tx_digests.push(tx_digest);
537 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
538 VerifiedTransaction::new_unchecked(execution_data.transaction),
539 epoch,
540 seq,
541 ));
542 effects.push(execution_data.effects);
543 fx_digests.push(fx_digest);
544 });
545
546 let executed_fx_digests = self
547 .transaction_cache_reader
548 .multi_get_executed_effects_digests(&tx_digests);
549
550 CheckpointExecutionState::new(
551 CheckpointExecutionData {
552 checkpoint,
553 checkpoint_contents,
554 tx_digests,
555 fx_digests,
556 transactions,
557 effects,
558 },
559 executed_fx_digests,
560 )
561 } else {
562 let digests = checkpoint_contents.inner();
565
566 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
567 digests.iter().map(|d| (d.transaction, d.effects)).unzip();
568 let transactions = self
569 .transaction_cache_reader
570 .multi_get_transaction_blocks(&tx_digests)
571 .into_iter()
572 .enumerate()
573 .map(|(i, tx)| {
574 let tx = tx
575 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
576 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
577 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
578 })
579 .collect();
580 let effects = self
581 .transaction_cache_reader
582 .multi_get_effects(&fx_digests)
583 .into_iter()
584 .enumerate()
585 .map(|(i, effect)| {
586 effect.unwrap_or_else(|| {
587 fatal!("checkpoint effect not found for {:?}", digests[i])
588 })
589 })
590 .collect();
591
592 let executed_fx_digests = self
593 .transaction_cache_reader
594 .multi_get_executed_effects_digests(&tx_digests);
595
596 CheckpointExecutionState::new(
597 CheckpointExecutionData {
598 checkpoint,
599 checkpoint_contents,
600 tx_digests,
601 fx_digests,
602 transactions,
603 effects,
604 },
605 executed_fx_digests,
606 )
607 }
608 }
609
610 fn schedule_transaction_execution(
612 &self,
613 ckpt_state: &CheckpointExecutionState,
614 ) -> Vec<TransactionDigest> {
615 let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
617 itertools::multiunzip(
618 itertools::izip!(
619 ckpt_state.data.transactions.iter(),
620 ckpt_state.data.tx_digests.iter(),
621 ckpt_state.data.fx_digests.iter(),
622 ckpt_state.data.effects.iter(),
623 ckpt_state.executed_fx_digests.iter()
624 )
625 .filter_map(
626 |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
627 if let Some(executed_fx_digest) = executed_fx_digest {
628 assert_not_forked(
629 &ckpt_state.data.checkpoint,
630 tx_digest,
631 expected_fx_digest,
632 executed_fx_digest,
633 &*self.transaction_cache_reader,
634 );
635 None
636 } else if txn.transaction_data().is_end_of_epoch_tx() {
637 None
638 } else {
639 Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
640 }
641 },
642 ),
643 );
644
645 for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
646 if tx.contains_shared_object() {
647 self.epoch_store
648 .acquire_shared_version_assignments_from_effects(
649 tx,
650 effects,
651 &*self.object_cache_reader,
652 )
653 .expect("failed to acquire shared version assignments");
654 }
655 }
656
657 self.tx_manager
659 .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
660
661 unexecuted_tx_digests
662 }
663
664 async fn execute_change_epoch_tx(&self, ckpt_state: &CheckpointExecutionState) {
666 let change_epoch_tx = ckpt_state.data.transactions.last().unwrap();
667 let change_epoch_fx = ckpt_state.data.effects.last().unwrap();
668 assert_eq!(
669 change_epoch_tx.digest(),
670 change_epoch_fx.transaction_digest()
671 );
672 assert!(
673 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
674 "final txn must be an end of epoch txn"
675 );
676
677 self.epoch_store
696 .acquire_shared_version_assignments_from_effects(
697 change_epoch_tx,
698 change_epoch_fx,
699 self.object_cache_reader.as_ref(),
700 )
701 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
702
703 info!(
704 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
705 change_epoch_tx.digest(),
706 change_epoch_fx.digest()
707 );
708 self.tx_manager.enqueue_with_expected_effects_digest(
709 vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
710 &self.epoch_store,
711 );
712
713 self.transaction_cache_reader
714 .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
715 .await;
716 }
717
718 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
721 let seq = *checkpoint.sequence_number();
723 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
724 if let Some(prev_highest) = self
725 .checkpoint_store
726 .get_highest_executed_checkpoint_seq_number()
727 .unwrap()
728 {
729 assert_eq!(prev_highest + 1, seq);
730 } else {
731 assert_eq!(seq, 0);
732 }
733 if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
734 info!("Finished syncing and executing checkpoint {}", seq);
735 }
736
737 fail_point!("highest-executed-checkpoint");
738
739 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
742 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
743 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
744 if let Some(prune_checkpoint) = self
745 .checkpoint_store
746 .get_checkpoint_by_sequence_number(prune_seq)
747 .expect("Failed to fetch checkpoint")
748 {
749 self.checkpoint_store
750 .delete_full_checkpoint_contents(prune_seq)
751 .expect("Failed to delete full checkpoint contents");
752 self.checkpoint_store
753 .delete_contents_digest_sequence_number_mapping(
754 &prune_checkpoint.content_digest,
755 )
756 .expect("Failed to delete contents digest -> sequence number mapping");
757 } else {
758 debug!(
762 "Failed to fetch checkpoint with sequence number {:?}",
763 prune_seq
764 );
765 }
766 }
767
768 self.checkpoint_store
769 .update_highest_executed_checkpoint(checkpoint)
770 .unwrap();
771 self.metrics.last_executed_checkpoint.set(seq as i64);
772
773 self.metrics
774 .last_executed_checkpoint_timestamp_ms
775 .set(checkpoint.timestamp_ms as i64);
776 checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
777 }
778
779 fn broadcast_checkpoint(
782 &self,
783 checkpoint_exec_data: &CheckpointExecutionData,
784 checkpoint_data: Option<&CheckpointData>,
785 ) {
786 if let Some(summary_sender) = &self.summary_sender {
787 let summary = CertifiedCheckpointSummary::from(checkpoint_exec_data.checkpoint.clone());
788 summary_sender(&summary);
789 }
790 if let Some(data_sender) = &self.data_sender {
791 let checkpoint_data = if let Some(data) = checkpoint_data {
792 data.clone()
793 } else {
794 load_checkpoint_data(
795 checkpoint_exec_data,
796 self.object_cache_reader.as_ref(),
797 self.transaction_cache_reader.as_ref(),
798 )
799 .expect("Failed to load full CheckpointData")
800 };
801 data_sender(&checkpoint_data);
802 }
803
804 debug!(
805 "[Fullnode] Full CheckpointData is available: seq={}",
806 checkpoint_exec_data.checkpoint.sequence_number()
807 );
808 }
809
810 fn commit_index_updates(&self, checkpoint: CheckpointData) {
813 if let Some(rest_index) = &self.state.rest_index {
814 rest_index
815 .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
816 .expect("failed to update rest_indexes");
817 }
818 }
819
820 fn extract_randomness_rounds(
824 &self,
825 checkpoint: &VerifiedCheckpoint,
826 checkpoint_contents: &CheckpointContents,
827 ) -> Vec<RandomnessRound> {
828 if let Some(version_specific_data) = checkpoint
829 .version_specific_data(self.epoch_store.protocol_config())
830 .expect("unable to get version_specific_data")
831 {
832 version_specific_data.into_v1().randomness_rounds
835 } else {
836 assert_eq!(
841 0,
842 self.epoch_store
843 .protocol_config()
844 .min_checkpoint_interval_ms_as_option()
845 .unwrap_or_default(),
846 );
847 if let Some(first_digest) = checkpoint_contents.inner().first() {
848 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
849 .unwrap_or_else(||
850 fatal!(
851 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
852 checkpoint.sequence_number()
853 )
854 );
855 if let TransactionKind::RandomnessStateUpdate(rsu) =
856 maybe_randomness_tx.data().transaction_data().kind()
857 {
858 vec![rsu.randomness_round]
859 } else {
860 Vec::new()
861 }
862 } else {
863 Vec::new()
864 }
865 }
866 }
867}