1use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31 base_types::{TransactionDigest, TransactionEffectsDigest},
32 crypto::RandomnessRound,
33 effects::{TransactionEffects, TransactionEffectsAPI},
34 executable_transaction::VerifiedExecutableTransaction,
35 full_checkpoint_content::CheckpointData,
36 global_state_hash::GlobalStateHash,
37 messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber, VerifiedCheckpoint},
38 transaction::{TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction},
39};
40use parking_lot::Mutex;
41use tap::{TapFallible, TapOptional};
42use tracing::{debug, info, instrument};
43
44use crate::{
45 authority::{
46 AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
47 backpressure::BackpressureManager,
48 },
49 checkpoint_progress_tracker::CheckpointProgressTracker,
50 checkpoints::CheckpointStore,
51 execution_cache::{ObjectCacheRead, TransactionCacheRead},
52 global_state_hasher::GlobalStateHasher,
53 transaction_manager::TransactionManager,
54};
55
56mod data_ingestion_handler;
57pub mod metrics;
58pub(crate) mod utils;
59
60#[cfg(test)]
61pub(crate) mod tests;
62
63use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
64use metrics::CheckpointExecutorMetrics;
65use utils::*;
66
67type CheckpointDataSender = Box<dyn Fn(&CheckpointData) + Send + Sync>;
68
69#[derive(PartialEq, Eq, Debug)]
70pub enum StopReason {
71 EpochComplete,
72 RunWithRangeCondition,
73}
74
75pub(crate) struct CheckpointExecutionData {
76 pub checkpoint: VerifiedCheckpoint,
77 pub checkpoint_contents: CheckpointContents,
78 pub tx_digests: Vec<TransactionDigest>,
79 pub fx_digests: Vec<TransactionEffectsDigest>,
80}
81
82pub(crate) struct CheckpointTransactionData {
83 pub transactions: Vec<VerifiedExecutableTransaction>,
84 pub effects: Vec<TransactionEffects>,
85 pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
86}
87
88pub(crate) struct CheckpointExecutionState {
89 pub data: CheckpointExecutionData,
90 state_hash: Option<GlobalStateHash>,
91 full_data: Option<CheckpointData>,
92}
93
94impl CheckpointExecutionState {
95 pub fn new(data: CheckpointExecutionData) -> Self {
96 Self {
97 data,
98 state_hash: None,
99 full_data: None,
100 }
101 }
102
103 pub fn new_with_global_state_hash(
104 data: CheckpointExecutionData,
105 hash: GlobalStateHash,
106 ) -> Self {
107 Self {
108 data,
109 state_hash: Some(hash),
110 full_data: None,
111 }
112 }
113}
114
115macro_rules! finish_stage {
116 ($handle:expr, $stage:ident) => {
117 $handle.finish_stage(PipelineStage::$stage).await;
118 };
119}
120
121pub struct CheckpointExecutor {
122 epoch_store: Arc<AuthorityPerEpochStore>,
123 state: Arc<AuthorityState>,
124 checkpoint_store: Arc<CheckpointStore>,
125 object_cache_reader: Arc<dyn ObjectCacheRead>,
126 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
127 tx_manager: Arc<TransactionManager>,
128 global_state_hasher: Arc<GlobalStateHasher>,
129 backpressure_manager: Arc<BackpressureManager>,
130 config: CheckpointExecutorConfig,
131 metrics: Arc<CheckpointExecutorMetrics>,
132 tps_estimator: Mutex<TPSEstimator>,
133 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
134 data_sender: Option<CheckpointDataSender>,
135}
136
137impl CheckpointExecutor {
138 pub fn new(
139 epoch_store: Arc<AuthorityPerEpochStore>,
140 checkpoint_store: Arc<CheckpointStore>,
141 state: Arc<AuthorityState>,
142 global_state_hasher: Arc<GlobalStateHasher>,
143 backpressure_manager: Arc<BackpressureManager>,
144 config: CheckpointExecutorConfig,
145 metrics: Arc<CheckpointExecutorMetrics>,
146 data_sender: Option<CheckpointDataSender>,
147 checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
148 ) -> Self {
149 Self {
150 epoch_store,
151 state: state.clone(),
152 checkpoint_store,
153 object_cache_reader: state.get_object_cache_reader().clone(),
154 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
155 tx_manager: state.transaction_manager().clone(),
156 global_state_hasher,
157 backpressure_manager,
158 config,
159 metrics,
160 tps_estimator: Mutex::new(TPSEstimator::default()),
161 checkpoint_progress_tracker,
162 data_sender,
163 }
164 }
165
166 pub fn new_for_tests(
167 epoch_store: Arc<AuthorityPerEpochStore>,
168 checkpoint_store: Arc<CheckpointStore>,
169 state: Arc<AuthorityState>,
170 global_state_hasher: Arc<GlobalStateHasher>,
171 ) -> Self {
172 Self::new(
173 epoch_store,
174 checkpoint_store.clone(),
175 state,
176 global_state_hasher,
177 BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
178 Default::default(),
179 CheckpointExecutorMetrics::new_for_tests(),
180 None, None, )
183 }
184
185 fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
188 let highest_executed = self
192 .checkpoint_store
193 .get_highest_executed_checkpoint()
194 .unwrap();
195
196 if let Some(highest_executed) = &highest_executed {
197 if self.epoch_store.epoch() == highest_executed.epoch()
198 && highest_executed.is_last_checkpoint_of_epoch()
199 {
200 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
203 return None;
204 }
205 }
206
207 Some(
208 highest_executed
209 .as_ref()
210 .map(|c| c.sequence_number() + 1)
211 .unwrap_or_else(|| {
212 assert_eq!(self.epoch_store.epoch(), 0);
214 0
216 }),
217 )
218 }
219
220 #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
224 pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
225 let _metrics_scope = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
226 info!(?run_with_range, "CheckpointExecutor::run_epoch");
227 debug!(
228 "Checkpoint executor running for epoch {:?}",
229 self.epoch_store.epoch(),
230 );
231
232 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
236 info!("RunWithRange condition satisfied at {:?}", run_with_range,);
237 return StopReason::RunWithRangeCondition;
238 };
239
240 self.metrics
241 .checkpoint_exec_epoch
242 .set(self.epoch_store.epoch() as i64);
243
244 let Some(next_to_schedule) = self.get_next_to_schedule() else {
245 return StopReason::EpochComplete;
246 };
247
248 let this = Arc::new(self);
249
250 let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
251 .ok()
252 .and_then(|s| s.parse().ok())
253 .unwrap_or(this.config.checkpoint_execution_max_concurrency);
254
255 let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
256
257 let final_checkpoint_executed = stream_synced_checkpoints(
258 this.checkpoint_store.clone(),
259 next_to_schedule,
260 run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
261 )
262 .map(|checkpoint| {
264 let this = this.clone();
265 let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
266 async move {
267 let pipeline_handle = pipeline_handle.await;
268 tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
269 .await
270 .unwrap()
271 }
272 })
273 .buffered(concurrency)
274 .fold(false, |state, is_final_checkpoint| async move {
276 assert!(!state, "Cannot execute checkpoint after epoch end");
277 is_final_checkpoint
278 })
279 .await;
280
281 if final_checkpoint_executed {
282 StopReason::EpochComplete
283 } else {
284 StopReason::RunWithRangeCondition
285 }
286 }
287}
288
289impl CheckpointExecutor {
290 #[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number()))]
293 async fn execute_checkpoint(
294 self: Arc<Self>,
295 checkpoint: VerifiedCheckpoint,
296 mut pipeline_handle: PipelineHandle,
297 ) -> bool {
298 debug!("executing checkpoint");
299 let sequence_number = checkpoint.sequence_number;
300
301 checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
302 self.backpressure_manager
303 .update_highest_certified_checkpoint(sequence_number);
304
305 if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
306 let _wait_for_previous_checkpoints_guard =
307 iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
308
309 info!(
310 "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
311 );
312 self.checkpoint_store
313 .notify_read_executed_checkpoint(sequence_number - 1)
314 .await;
315 }
316
317 let _parallel_step_guard =
318 iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
319
320 let exec_start = Instant::now();
323 let ckpt_state = if self.state.is_fullnode(&self.epoch_store)
324 || checkpoint.is_last_checkpoint_of_epoch()
325 {
326 self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
327 .await
328 } else {
329 self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
330 .await
331 };
332
333 let tps = self.tps_estimator.lock().update(
334 Instant::now(),
335 ckpt_state.data.checkpoint.network_total_transactions,
336 );
337 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
338
339 self.backpressure_manager
340 .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
341
342 let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
343
344 let seq = ckpt_state.data.checkpoint.sequence_number;
345
346 let batch = self
347 .state
348 .get_cache_commit()
349 .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
350
351 finish_stage!(pipeline_handle, BuildDbBatch);
352
353 let mut ckpt_state = tokio::task::spawn_blocking({
354 let this = self.clone();
355 move || {
356 let cache_commit = this.state.get_cache_commit();
358 debug!(?seq, "committing checkpoint transactions to disk");
359 cache_commit.commit_transaction_outputs(
360 this.epoch_store.epoch(),
361 batch,
362 &ckpt_state.data.tx_digests,
363 );
364 ckpt_state
365 }
366 })
367 .await
368 .unwrap();
369
370 finish_stage!(pipeline_handle, CommitTransactionOutputs);
371
372 self.epoch_store
373 .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
374 .expect("cannot fail");
375
376 let randomness_rounds = self.extract_randomness_rounds(
377 &ckpt_state.data.checkpoint,
378 &ckpt_state.data.checkpoint_contents,
379 );
380
381 if self.state.is_fullnode(&self.epoch_store) {
382 let epoch = ckpt_state.data.checkpoint.epoch;
383 self.epoch_store.remove_shared_version_assignments(
389 randomness_rounds
390 .iter()
391 .map(|round| TransactionKey::RandomnessRound(epoch, *round)),
392 );
393
394 self.epoch_store.remove_shared_version_assignments(
395 ckpt_state
396 .data
397 .tx_digests
398 .iter()
399 .copied()
400 .map(TransactionKey::Digest),
401 );
402 }
403
404 if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
409 for round in randomness_rounds {
410 debug!(
411 ?round,
412 "notifying RandomnessReporter that randomness update was executed in checkpoint"
413 );
414 randomness_reporter
415 .notify_randomness_in_checkpoint(round)
416 .expect("epoch cannot have ended");
417 }
418 }
419
420 finish_stage!(pipeline_handle, FinalizeCheckpoint);
421
422 if let Some(checkpoint_data) = ckpt_state.full_data.take() {
423 self.commit_index_updates(checkpoint_data);
424 }
425
426 finish_stage!(pipeline_handle, UpdateRpcIndex);
427
428 self.global_state_hasher
429 .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hash)
430 .expect("Failed to accumulate running root");
431
432 if is_final_checkpoint {
433 self.checkpoint_store
434 .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
435 .expect("Failed to insert epoch last checkpoint");
436
437 self.global_state_hasher
438 .accumulate_epoch(self.epoch_store.clone(), seq)
439 .expect("Accumulating epoch cannot fail");
440
441 self.checkpoint_store
442 .prune_local_summaries()
443 .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
444 .ok();
445 }
446
447 fail_point!("crash");
448
449 self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
450
451 self.broadcast_checkpoint(&ckpt_state.data, ckpt_state.full_data.as_ref());
452
453 finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
454
455 if let Some(tracker) = &self.checkpoint_progress_tracker {
456 tracker.add_execution_time(exec_start.elapsed());
457 }
458
459 ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
463 }
464
465 #[instrument(level = "info", skip_all)]
468 async fn verify_locally_built_checkpoint(
469 &self,
470 checkpoint: VerifiedCheckpoint,
471 pipeline_handle: &mut PipelineHandle,
472 ) -> CheckpointExecutionState {
473 assert!(
474 !checkpoint.is_last_checkpoint_of_epoch(),
475 "only fullnode path has end-of-epoch logic"
476 );
477
478 let sequence_number = checkpoint.sequence_number;
479 let locally_built_checkpoint = self
480 .checkpoint_store
481 .get_locally_computed_checkpoint(sequence_number)
482 .expect("db error");
483
484 let Some(locally_built_checkpoint) = locally_built_checkpoint else {
485 return self
487 .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
488 .await;
489 };
490
491 self.metrics.checkpoint_executor_validator_path.inc();
492
493 assert_checkpoint_not_forked(
495 &locally_built_checkpoint,
496 &checkpoint,
497 &self.checkpoint_store,
498 );
499
500 let state_hash = {
503 let _metrics_scope =
504 iota_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hash");
505 self.epoch_store
506 .notify_read_checkpoint_state_hasher(&[sequence_number])
507 .await
508 .unwrap()
509 .pop()
510 .unwrap()
511 };
512
513 let checkpoint_contents = self
517 .checkpoint_store
518 .get_checkpoint_contents(&checkpoint.content_digest)
519 .expect("db error")
520 .expect("checkpoint contents not found");
521
522 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
523 .iter()
524 .map(|digests| (digests.transaction, digests.effects))
525 .unzip();
526
527 pipeline_handle
528 .skip_to(PipelineStage::FinalizeTransactions)
529 .await;
530
531 self.insert_finalized_transactions(&tx_digests, sequence_number, checkpoint.timestamp_ms);
535
536 pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
537
538 CheckpointExecutionState::new_with_global_state_hash(
539 CheckpointExecutionData {
540 checkpoint,
541 checkpoint_contents,
542 tx_digests,
543 fx_digests,
544 },
545 state_hash,
546 )
547 }
548
549 #[instrument(level = "info", skip_all)]
550 async fn execute_transactions_from_synced_checkpoint(
551 &self,
552 checkpoint: VerifiedCheckpoint,
553 pipeline_handle: &mut PipelineHandle,
554 ) -> CheckpointExecutionState {
555 let sequence_number = checkpoint.sequence_number;
556
557 let (mut ckpt_state, tx_data, unexecuted_tx_digests) = {
558 let _scope = iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
559 let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint);
560 let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
561 (ckpt_state, tx_data, unexecuted_tx_digests)
562 };
563
564 finish_stage!(pipeline_handle, ExecuteTransactions);
565
566 {
567 let _metrics_scope = iota_metrics::monitored_scope(
568 "CheckpointExecutor::notify_read_executed_effects_digests",
569 );
570 self.transaction_cache_reader
571 .notify_read_executed_effects_digests(&unexecuted_tx_digests)
572 .await;
573 }
574
575 finish_stage!(pipeline_handle, WaitForTransactions);
576
577 if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
578 self.execute_change_epoch_tx(&tx_data).await;
579 }
580
581 let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
582
583 if self.state.is_fullnode(&self.epoch_store) {
584 self.state.congestion_tracker.process_checkpoint_effects(
585 &*self.transaction_cache_reader,
586 &ckpt_state.data.checkpoint,
587 &tx_data.effects,
588 );
589 }
590
591 self.insert_finalized_transactions(
592 &ckpt_state.data.tx_digests,
593 sequence_number,
594 ckpt_state.data.checkpoint.timestamp_ms,
595 );
596
597 ckpt_state.state_hash = Some(
601 self.global_state_hasher
602 .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
603 .expect("epoch cannot have ended"),
604 );
605
606 finish_stage!(pipeline_handle, FinalizeTransactions);
607
608 ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, &tx_data);
609
610 finish_stage!(pipeline_handle, ProcessCheckpointData);
611
612 ckpt_state
613 }
614
615 fn checkpoint_data_enabled(&self) -> bool {
616 self.state.grpc_indexes_store.is_some()
617 || self.config.data_ingestion_dir.is_some()
618 || self.data_sender.is_some()
619 }
620
621 fn insert_finalized_transactions(
622 &self,
623 tx_digests: &[TransactionDigest],
624 sequence_number: CheckpointSequenceNumber,
625 timestamp_ms: u64,
626 ) {
627 self.epoch_store
628 .insert_finalized_transactions(tx_digests, sequence_number, timestamp_ms)
629 .expect("failed to insert finalized transactions");
630
631 if self.state.is_fullnode(&self.epoch_store) {
632 self.state
634 .get_checkpoint_cache()
635 .insert_finalized_transactions_perpetual_checkpoints(
636 tx_digests,
637 self.epoch_store.epoch(),
638 sequence_number,
639 );
640 }
641 }
642
643 #[instrument(level = "info", skip_all)]
644 fn process_checkpoint_data(
645 &self,
646 ckpt_data: &CheckpointExecutionData,
647 tx_data: &CheckpointTransactionData,
648 ) -> Option<CheckpointData> {
649 if !self.checkpoint_data_enabled() {
650 return None;
651 }
652
653 let checkpoint_data = load_checkpoint_data(
654 ckpt_data,
655 tx_data,
656 self.state.get_object_store(),
657 &*self.transaction_cache_reader,
658 )
659 .expect("failed to load checkpoint data");
660
661 if let Some(grpc_indexes_store) = &self.state.grpc_indexes_store {
665 grpc_indexes_store.index_checkpoint(&checkpoint_data);
666 }
667
668 if let Some(path) = &self.config.data_ingestion_dir {
669 store_checkpoint_locally(path, &checkpoint_data)
670 .expect("failed to store checkpoint locally");
671 }
672
673 Some(checkpoint_data)
674 }
675
676 #[instrument(level = "info", skip_all)]
678 fn load_checkpoint_transactions(
679 &self,
680 checkpoint: VerifiedCheckpoint,
681 ) -> (CheckpointExecutionState, CheckpointTransactionData) {
682 let seq = checkpoint.sequence_number;
683 let epoch = checkpoint.epoch;
684
685 let checkpoint_contents = self
686 .checkpoint_store
687 .get_checkpoint_contents(&checkpoint.content_digest)
688 .expect("db error")
689 .expect("checkpoint contents not found");
690
691 if let Some(full_contents) = self
693 .checkpoint_store
694 .get_full_checkpoint_contents_by_sequence_number(seq)
695 .expect("Failed to get checkpoint contents from store")
696 .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
697 {
698 let num_txns = full_contents.size();
699 let mut tx_digests = Vec::with_capacity(num_txns);
700 let mut transactions = Vec::with_capacity(num_txns);
701 let mut effects = Vec::with_capacity(num_txns);
702 let mut fx_digests = Vec::with_capacity(num_txns);
703
704 full_contents
705 .into_iter()
706 .zip(checkpoint_contents.iter())
707 .for_each(|(execution_data, digests)| {
708 let tx_digest = digests.transaction;
709 let fx_digest = digests.effects;
710 debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
711 debug_assert_eq!(fx_digest, execution_data.effects.digest());
712
713 tx_digests.push(tx_digest);
714 transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
715 VerifiedTransaction::new_unchecked(execution_data.transaction),
716 epoch,
717 seq,
718 ));
719 effects.push(execution_data.effects);
720 fx_digests.push(fx_digest);
721 });
722
723 let executed_fx_digests = self
724 .transaction_cache_reader
725 .multi_get_executed_effects_digests(&tx_digests);
726
727 (
728 CheckpointExecutionState::new(CheckpointExecutionData {
729 checkpoint,
730 checkpoint_contents,
731 tx_digests,
732 fx_digests,
733 }),
734 CheckpointTransactionData {
735 transactions,
736 effects,
737 executed_fx_digests,
738 },
739 )
740 } else {
741 let digests = checkpoint_contents.inner();
744
745 let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
746 digests.iter().map(|d| (d.transaction, d.effects)).unzip();
747 let transactions = self
748 .transaction_cache_reader
749 .multi_get_transaction_blocks(&tx_digests)
750 .into_iter()
751 .enumerate()
752 .map(|(i, tx)| {
753 let tx = tx
754 .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
755 let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
756 VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
757 })
758 .collect();
759 let effects = self
760 .transaction_cache_reader
761 .multi_get_effects(&fx_digests)
762 .into_iter()
763 .enumerate()
764 .map(|(i, effect)| {
765 effect.unwrap_or_else(|| {
766 fatal!("checkpoint effect not found for {:?}", digests[i])
767 })
768 })
769 .collect();
770
771 let executed_fx_digests = self
772 .transaction_cache_reader
773 .multi_get_executed_effects_digests(&tx_digests);
774
775 (
776 CheckpointExecutionState::new(CheckpointExecutionData {
777 checkpoint,
778 checkpoint_contents,
779 tx_digests,
780 fx_digests,
781 }),
782 CheckpointTransactionData {
783 transactions,
784 effects,
785 executed_fx_digests,
786 },
787 )
788 }
789 }
790
791 #[instrument(level = "info", skip_all)]
793 fn schedule_transaction_execution(
794 &self,
795 ckpt_state: &CheckpointExecutionState,
796 tx_data: &CheckpointTransactionData,
797 ) -> Vec<TransactionDigest> {
798 let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
800 itertools::multiunzip(
801 itertools::izip!(
802 tx_data.transactions.iter(),
803 ckpt_state.data.tx_digests.iter(),
804 ckpt_state.data.fx_digests.iter(),
805 tx_data.effects.iter(),
806 tx_data.executed_fx_digests.iter()
807 )
808 .filter_map(
809 |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
810 if let Some(executed_fx_digest) = executed_fx_digest {
811 assert_not_forked(
812 &ckpt_state.data.checkpoint,
813 tx_digest,
814 expected_fx_digest,
815 executed_fx_digest,
816 &*self.transaction_cache_reader,
817 );
818 None
819 } else if txn.transaction_data().is_end_of_epoch_tx() {
820 None
821 } else {
822 Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
823 }
824 },
825 ),
826 );
827
828 for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
829 if tx.contains_shared_object() {
830 self.epoch_store
831 .acquire_shared_version_assignments_from_effects(
832 tx,
833 effects,
834 &*self.object_cache_reader,
835 )
836 .expect("failed to acquire shared version assignments");
837 }
838 }
839
840 self.tx_manager
842 .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
843
844 unexecuted_tx_digests
845 }
846
847 #[instrument(level = "error", skip_all)]
849 async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
850 let change_epoch_tx = tx_data.transactions.last().unwrap();
851 let change_epoch_fx = tx_data.effects.last().unwrap();
852 assert_eq!(
853 change_epoch_tx.digest(),
854 change_epoch_fx.transaction_digest()
855 );
856 assert!(
857 change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
858 "final txn must be an end of epoch txn"
859 );
860
861 self.epoch_store
880 .acquire_shared_version_assignments_from_effects(
881 change_epoch_tx,
882 change_epoch_fx,
883 self.object_cache_reader.as_ref(),
884 )
885 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
886
887 info!(
888 "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
889 change_epoch_tx.digest(),
890 change_epoch_fx.digest()
891 );
892 self.tx_manager.enqueue_with_expected_effects_digest(
893 vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
894 &self.epoch_store,
895 );
896
897 self.transaction_cache_reader
898 .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
899 .await;
900 }
901
902 #[instrument(level = "debug", skip_all)]
905 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
906 let seq = *checkpoint.sequence_number();
908 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
909 if let Some(prev_highest) = self
910 .checkpoint_store
911 .get_highest_executed_checkpoint_seq_number()
912 .unwrap()
913 {
914 assert_eq!(prev_highest + 1, seq);
915 } else {
916 assert_eq!(seq, 0);
917 }
918 fail_point!("highest-executed-checkpoint");
919
920 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
923 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
924 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
925 if let Some(prune_checkpoint) = self
926 .checkpoint_store
927 .get_checkpoint_by_sequence_number(prune_seq)
928 .expect("Failed to fetch checkpoint")
929 {
930 self.checkpoint_store
931 .delete_full_checkpoint_contents(prune_seq)
932 .expect("Failed to delete full checkpoint contents");
933 self.checkpoint_store
934 .delete_contents_digest_sequence_number_mapping(
935 &prune_checkpoint.content_digest,
936 )
937 .expect("Failed to delete contents digest -> sequence number mapping");
938 } else {
939 debug!(
943 "Failed to fetch checkpoint with sequence number {:?}",
944 prune_seq
945 );
946 }
947 }
948
949 self.checkpoint_store
950 .update_highest_executed_checkpoint(checkpoint)
951 .unwrap();
952 self.metrics.last_executed_checkpoint.set(seq as i64);
953
954 self.metrics
955 .last_executed_checkpoint_timestamp_ms
956 .set(checkpoint.timestamp_ms as i64);
957 checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
958 }
959
960 fn broadcast_checkpoint(
963 &self,
964 checkpoint_exec_data: &CheckpointExecutionData,
965 checkpoint_data: Option<&CheckpointData>,
966 ) {
967 if let Some(data_sender) = &self.data_sender {
968 let checkpoint_data = if let Some(data) = checkpoint_data {
969 data.clone()
970 } else {
971 let (_, tx_data) =
974 self.load_checkpoint_transactions(checkpoint_exec_data.checkpoint.clone());
975 load_checkpoint_data(
976 checkpoint_exec_data,
977 &tx_data,
978 self.state.get_object_store(),
979 self.transaction_cache_reader.as_ref(),
980 )
981 .expect("Failed to load full CheckpointData")
982 };
983 data_sender(&checkpoint_data);
984 }
985
986 debug!(
987 "[Fullnode] Full CheckpointData is available: seq={}",
988 checkpoint_exec_data.checkpoint.sequence_number()
989 );
990 }
991
992 #[instrument(level = "info", skip_all)]
995 fn commit_index_updates(&self, checkpoint: CheckpointData) {
996 if let Some(grpc_indexes_store) = &self.state.grpc_indexes_store {
997 grpc_indexes_store
998 .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
999 .expect("failed to update gRPC indexes");
1000 }
1001 }
1002
1003 #[instrument(level = "debug", skip_all)]
1007 fn extract_randomness_rounds(
1008 &self,
1009 checkpoint: &VerifiedCheckpoint,
1010 checkpoint_contents: &CheckpointContents,
1011 ) -> Vec<RandomnessRound> {
1012 if let Some(version_specific_data) = checkpoint
1013 .version_specific_data(self.epoch_store.protocol_config())
1014 .expect("unable to get version_specific_data")
1015 {
1016 version_specific_data.into_v1().randomness_rounds
1019 } else {
1020 assert_eq!(
1025 0,
1026 self.epoch_store
1027 .protocol_config()
1028 .min_checkpoint_interval_ms_as_option()
1029 .unwrap_or_default(),
1030 );
1031 if let Some(first_digest) = checkpoint_contents.inner().first() {
1032 let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1033 .unwrap_or_else(||
1034 fatal!(
1035 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1036 checkpoint.sequence_number()
1037 )
1038 );
1039 if let TransactionKind::RandomnessStateUpdate(rsu) =
1040 maybe_randomness_tx.data().transaction_data().kind()
1041 {
1042 vec![rsu.randomness_round]
1043 } else {
1044 Vec::new()
1045 }
1046 } else {
1047 Vec::new()
1048 }
1049 }
1050 }
1051}