Skip to main content

iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31    base_types::{TransactionDigest, TransactionEffectsDigest},
32    crypto::RandomnessRound,
33    effects::{TransactionEffects, TransactionEffectsAPI},
34    executable_transaction::VerifiedExecutableTransaction,
35    full_checkpoint_content::CheckpointData,
36    global_state_hash::GlobalStateHash,
37    messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber, VerifiedCheckpoint},
38    transaction::{TransactionDataAPI, TransactionKey, TransactionKind, VerifiedTransaction},
39};
40use parking_lot::Mutex;
41use tap::{TapFallible, TapOptional};
42use tracing::{debug, info, instrument};
43
44use crate::{
45    authority::{
46        AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
47        backpressure::BackpressureManager,
48    },
49    checkpoint_progress_tracker::CheckpointProgressTracker,
50    checkpoints::CheckpointStore,
51    execution_cache::{ObjectCacheRead, TransactionCacheRead},
52    global_state_hasher::GlobalStateHasher,
53    transaction_manager::TransactionManager,
54};
55
56mod data_ingestion_handler;
57pub mod metrics;
58pub(crate) mod utils;
59
60#[cfg(test)]
61pub(crate) mod tests;
62
63use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
64use metrics::CheckpointExecutorMetrics;
65use utils::*;
66
67type CheckpointDataSender = Box<dyn Fn(&CheckpointData) + Send + Sync>;
68
69#[derive(PartialEq, Eq, Debug)]
70pub enum StopReason {
71    EpochComplete,
72    RunWithRangeCondition,
73}
74
75pub(crate) struct CheckpointExecutionData {
76    pub checkpoint: VerifiedCheckpoint,
77    pub checkpoint_contents: CheckpointContents,
78    pub tx_digests: Vec<TransactionDigest>,
79    pub fx_digests: Vec<TransactionEffectsDigest>,
80}
81
82pub(crate) struct CheckpointTransactionData {
83    pub transactions: Vec<VerifiedExecutableTransaction>,
84    pub effects: Vec<TransactionEffects>,
85    pub executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
86}
87
88pub(crate) struct CheckpointExecutionState {
89    pub data: CheckpointExecutionData,
90    state_hash: Option<GlobalStateHash>,
91    full_data: Option<CheckpointData>,
92}
93
94impl CheckpointExecutionState {
95    pub fn new(data: CheckpointExecutionData) -> Self {
96        Self {
97            data,
98            state_hash: None,
99            full_data: None,
100        }
101    }
102
103    pub fn new_with_global_state_hash(
104        data: CheckpointExecutionData,
105        hash: GlobalStateHash,
106    ) -> Self {
107        Self {
108            data,
109            state_hash: Some(hash),
110            full_data: None,
111        }
112    }
113}
114
115macro_rules! finish_stage {
116    ($handle:expr, $stage:ident) => {
117        $handle.finish_stage(PipelineStage::$stage).await;
118    };
119}
120
121pub struct CheckpointExecutor {
122    epoch_store: Arc<AuthorityPerEpochStore>,
123    state: Arc<AuthorityState>,
124    checkpoint_store: Arc<CheckpointStore>,
125    object_cache_reader: Arc<dyn ObjectCacheRead>,
126    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
127    tx_manager: Arc<TransactionManager>,
128    global_state_hasher: Arc<GlobalStateHasher>,
129    backpressure_manager: Arc<BackpressureManager>,
130    config: CheckpointExecutorConfig,
131    metrics: Arc<CheckpointExecutorMetrics>,
132    tps_estimator: Mutex<TPSEstimator>,
133    checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
134    data_sender: Option<CheckpointDataSender>,
135}
136
137impl CheckpointExecutor {
138    pub fn new(
139        epoch_store: Arc<AuthorityPerEpochStore>,
140        checkpoint_store: Arc<CheckpointStore>,
141        state: Arc<AuthorityState>,
142        global_state_hasher: Arc<GlobalStateHasher>,
143        backpressure_manager: Arc<BackpressureManager>,
144        config: CheckpointExecutorConfig,
145        metrics: Arc<CheckpointExecutorMetrics>,
146        data_sender: Option<CheckpointDataSender>,
147        checkpoint_progress_tracker: Option<Arc<CheckpointProgressTracker>>,
148    ) -> Self {
149        Self {
150            epoch_store,
151            state: state.clone(),
152            checkpoint_store,
153            object_cache_reader: state.get_object_cache_reader().clone(),
154            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
155            tx_manager: state.transaction_manager().clone(),
156            global_state_hasher,
157            backpressure_manager,
158            config,
159            metrics,
160            tps_estimator: Mutex::new(TPSEstimator::default()),
161            checkpoint_progress_tracker,
162            data_sender,
163        }
164    }
165
166    pub fn new_for_tests(
167        epoch_store: Arc<AuthorityPerEpochStore>,
168        checkpoint_store: Arc<CheckpointStore>,
169        state: Arc<AuthorityState>,
170        global_state_hasher: Arc<GlobalStateHasher>,
171    ) -> Self {
172        Self::new(
173            epoch_store,
174            checkpoint_store.clone(),
175            state,
176            global_state_hasher,
177            BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
178            Default::default(),
179            CheckpointExecutorMetrics::new_for_tests(),
180            None, // No callback for data
181            None, // No progress tracker for tests
182        )
183    }
184
185    // Gets the next checkpoint to schedule for execution. If the epoch is already
186    // completed, returns None.
187    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
188        // Decide the first checkpoint to schedule for execution.
189        // If we haven't executed anything in the past, we schedule checkpoint 0.
190        // Otherwise we schedule the one after highest executed.
191        let highest_executed = self
192            .checkpoint_store
193            .get_highest_executed_checkpoint()
194            .unwrap();
195
196        if let Some(highest_executed) = &highest_executed {
197            if self.epoch_store.epoch() == highest_executed.epoch()
198                && highest_executed.is_last_checkpoint_of_epoch()
199            {
200                // We can arrive at this point if we bump the highest_executed_checkpoint
201                // watermark, and then crash before completing reconfiguration.
202                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
203                return None;
204            }
205        }
206
207        Some(
208            highest_executed
209                .as_ref()
210                .map(|c| c.sequence_number() + 1)
211                .unwrap_or_else(|| {
212                    // TODO this invariant may no longer hold once we introduce snapshots
213                    assert_eq!(self.epoch_store.epoch(), 0);
214                    // we need to execute the genesis checkpoint
215                    0
216                }),
217        )
218    }
219
220    /// Execute all checkpoints for the current epoch, ensuring that the node
221    /// has not forked, and return when finished.
222    /// If `run_with_range` is set, execution will stop early.
223    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
224    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
225        let _metrics_scope = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
226        info!(?run_with_range, "CheckpointExecutor::run_epoch");
227        debug!(
228            "Checkpoint executor running for epoch {:?}",
229            self.epoch_store.epoch(),
230        );
231
232        // check if we want to run this epoch based on RunWithRange condition value
233        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
234        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
235        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
236            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
237            return StopReason::RunWithRangeCondition;
238        };
239
240        self.metrics
241            .checkpoint_exec_epoch
242            .set(self.epoch_store.epoch() as i64);
243
244        let Some(next_to_schedule) = self.get_next_to_schedule() else {
245            return StopReason::EpochComplete;
246        };
247
248        let this = Arc::new(self);
249
250        let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
251            .ok()
252            .and_then(|s| s.parse().ok())
253            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
254
255        let pipeline_stages = PipelineStages::new(next_to_schedule, this.metrics.clone());
256
257        let final_checkpoint_executed = stream_synced_checkpoints(
258            this.checkpoint_store.clone(),
259            next_to_schedule,
260            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
261        )
262        // Checkpoint loading and execution is parallelized
263        .map(|checkpoint| {
264            let this = this.clone();
265            let pipeline_handle = pipeline_stages.handle(*checkpoint.sequence_number());
266            async move {
267                let pipeline_handle = pipeline_handle.await;
268                tokio::spawn(this.execute_checkpoint(checkpoint, pipeline_handle))
269                    .await
270                    .unwrap()
271            }
272        })
273        .buffered(concurrency)
274        // Take the last value from the stream to determine if we completed the epoch
275        .fold(false, |state, is_final_checkpoint| async move {
276            assert!(!state, "Cannot execute checkpoint after epoch end");
277            is_final_checkpoint
278        })
279        .await;
280
281        if final_checkpoint_executed {
282            StopReason::EpochComplete
283        } else {
284            StopReason::RunWithRangeCondition
285        }
286    }
287}
288
289impl CheckpointExecutor {
290    /// Load all data for a checkpoint, ensure all transactions are executed,
291    /// and check for forks.
292    #[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number()))]
293    async fn execute_checkpoint(
294        self: Arc<Self>,
295        checkpoint: VerifiedCheckpoint,
296        mut pipeline_handle: PipelineHandle,
297    ) -> bool /* is final checkpoint */ {
298        debug!("executing checkpoint");
299        let sequence_number = checkpoint.sequence_number;
300
301        checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
302        self.backpressure_manager
303            .update_highest_certified_checkpoint(sequence_number);
304
305        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
306            let _wait_for_previous_checkpoints_guard =
307                iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
308
309            info!(
310                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
311            );
312            self.checkpoint_store
313                .notify_read_executed_checkpoint(sequence_number - 1)
314                .await;
315        }
316
317        let _parallel_step_guard =
318            iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
319
320        // Note: only `execute_transactions_from_synced_checkpoint` has end-of-epoch
321        // logic.
322        let exec_start = Instant::now();
323        let ckpt_state = if self.state.is_fullnode(&self.epoch_store)
324            || checkpoint.is_last_checkpoint_of_epoch()
325        {
326            self.execute_transactions_from_synced_checkpoint(checkpoint, &mut pipeline_handle)
327                .await
328        } else {
329            self.verify_locally_built_checkpoint(checkpoint, &mut pipeline_handle)
330                .await
331        };
332
333        let tps = self.tps_estimator.lock().update(
334            Instant::now(),
335            ckpt_state.data.checkpoint.network_total_transactions,
336        );
337        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
338
339        self.backpressure_manager
340            .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
341
342        let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
343
344        let seq = ckpt_state.data.checkpoint.sequence_number;
345
346        let batch = self
347            .state
348            .get_cache_commit()
349            .build_db_batch(self.epoch_store.epoch(), &ckpt_state.data.tx_digests);
350
351        finish_stage!(pipeline_handle, BuildDbBatch);
352
353        let mut ckpt_state = tokio::task::spawn_blocking({
354            let this = self.clone();
355            move || {
356                // Commit all transaction effects to disk
357                let cache_commit = this.state.get_cache_commit();
358                debug!(?seq, "committing checkpoint transactions to disk");
359                cache_commit.commit_transaction_outputs(
360                    this.epoch_store.epoch(),
361                    batch,
362                    &ckpt_state.data.tx_digests,
363                );
364                ckpt_state
365            }
366        })
367        .await
368        .unwrap();
369
370        finish_stage!(pipeline_handle, CommitTransactionOutputs);
371
372        self.epoch_store
373            .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
374            .expect("cannot fail");
375
376        let randomness_rounds = self.extract_randomness_rounds(
377            &ckpt_state.data.checkpoint,
378            &ckpt_state.data.checkpoint_contents,
379        );
380
381        if self.state.is_fullnode(&self.epoch_store) {
382            let epoch = ckpt_state.data.checkpoint.epoch;
383            // Remove version assignments on fullnodes after checkpoint execution.
384            // On validators, version assignments are removed when consensus output is
385            // committed. We cannot remove here on validators because checkpoint
386            // execution can run ahead of consensus, which would then re-insert
387            // version assignments.
388            self.epoch_store.remove_shared_version_assignments(
389                randomness_rounds
390                    .iter()
391                    .map(|round| TransactionKey::RandomnessRound(epoch, *round)),
392            );
393
394            self.epoch_store.remove_shared_version_assignments(
395                ckpt_state
396                    .data
397                    .tx_digests
398                    .iter()
399                    .copied()
400                    .map(TransactionKey::Digest),
401            );
402        }
403
404        // Once the checkpoint is finalized, we know that any randomness contained in
405        // this checkpoint has been successfully included in a checkpoint
406        // certified by quorum of validators. (RandomnessManager/
407        // RandomnessReporter is only present on validators.)
408        if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
409            for round in randomness_rounds {
410                debug!(
411                    ?round,
412                    "notifying RandomnessReporter that randomness update was executed in checkpoint"
413                );
414                randomness_reporter
415                    .notify_randomness_in_checkpoint(round)
416                    .expect("epoch cannot have ended");
417            }
418        }
419
420        finish_stage!(pipeline_handle, FinalizeCheckpoint);
421
422        if let Some(checkpoint_data) = ckpt_state.full_data.take() {
423            self.commit_index_updates(checkpoint_data);
424        }
425
426        finish_stage!(pipeline_handle, UpdateRpcIndex);
427
428        self.global_state_hasher
429            .accumulate_running_root(&self.epoch_store, seq, ckpt_state.state_hash)
430            .expect("Failed to accumulate running root");
431
432        if is_final_checkpoint {
433            self.checkpoint_store
434                .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
435                .expect("Failed to insert epoch last checkpoint");
436
437            self.global_state_hasher
438                .accumulate_epoch(self.epoch_store.clone(), seq)
439                .expect("Accumulating epoch cannot fail");
440
441            self.checkpoint_store
442                .prune_local_summaries()
443                .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
444                .ok();
445        }
446
447        fail_point!("crash");
448
449        self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
450
451        self.broadcast_checkpoint(&ckpt_state.data, ckpt_state.full_data.as_ref());
452
453        finish_stage!(pipeline_handle, BumpHighestExecutedCheckpoint);
454
455        if let Some(tracker) = &self.checkpoint_progress_tracker {
456            tracker.add_execution_time(exec_start.elapsed());
457        }
458
459        // Important: code after the last pipeline stage is finished can run out of
460        // checkpoint order.
461
462        ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
463    }
464
465    // On validators, checkpoints have often already been constructed locally, in
466    // which case we can skip many steps of the checkpoint execution process.
467    #[instrument(level = "info", skip_all)]
468    async fn verify_locally_built_checkpoint(
469        &self,
470        checkpoint: VerifiedCheckpoint,
471        pipeline_handle: &mut PipelineHandle,
472    ) -> CheckpointExecutionState {
473        assert!(
474            !checkpoint.is_last_checkpoint_of_epoch(),
475            "only fullnode path has end-of-epoch logic"
476        );
477
478        let sequence_number = checkpoint.sequence_number;
479        let locally_built_checkpoint = self
480            .checkpoint_store
481            .get_locally_computed_checkpoint(sequence_number)
482            .expect("db error");
483
484        let Some(locally_built_checkpoint) = locally_built_checkpoint else {
485            // fall back to tx-by-tx execution path if we are catching up.
486            return self
487                .execute_transactions_from_synced_checkpoint(checkpoint, pipeline_handle)
488                .await;
489        };
490
491        self.metrics.checkpoint_executor_validator_path.inc();
492
493        // Check for fork
494        assert_checkpoint_not_forked(
495            &locally_built_checkpoint,
496            &checkpoint,
497            &self.checkpoint_store,
498        );
499
500        // Checkpoint builder triggers accumulation of the checkpoint, so this is
501        // guaranteed to finish.
502        let state_hash = {
503            let _metrics_scope =
504                iota_metrics::monitored_scope("CheckpointExecutor::notify_read_state_hash");
505            self.epoch_store
506                .notify_read_checkpoint_state_hasher(&[sequence_number])
507                .await
508                .unwrap()
509                .pop()
510                .unwrap()
511        };
512
513        // Checkpoint builder triggers accumulation of the checkpoint, so this is
514        // guaranteed to finish.
515
516        let checkpoint_contents = self
517            .checkpoint_store
518            .get_checkpoint_contents(&checkpoint.content_digest)
519            .expect("db error")
520            .expect("checkpoint contents not found");
521
522        let (tx_digests, fx_digests): (Vec<_>, Vec<_>) = checkpoint_contents
523            .iter()
524            .map(|digests| (digests.transaction, digests.effects))
525            .unzip();
526
527        pipeline_handle
528            .skip_to(PipelineStage::FinalizeTransactions)
529            .await;
530
531        // Currently this code only runs on validators, where this method call does
532        // nothing. But in the future, fullnodes may follow the consensus dag
533        // and build their own checkpoints.
534        self.insert_finalized_transactions(&tx_digests, sequence_number, checkpoint.timestamp_ms);
535
536        pipeline_handle.skip_to(PipelineStage::BuildDbBatch).await;
537
538        CheckpointExecutionState::new_with_global_state_hash(
539            CheckpointExecutionData {
540                checkpoint,
541                checkpoint_contents,
542                tx_digests,
543                fx_digests,
544            },
545            state_hash,
546        )
547    }
548
549    #[instrument(level = "info", skip_all)]
550    async fn execute_transactions_from_synced_checkpoint(
551        &self,
552        checkpoint: VerifiedCheckpoint,
553        pipeline_handle: &mut PipelineHandle,
554    ) -> CheckpointExecutionState {
555        let sequence_number = checkpoint.sequence_number;
556
557        let (mut ckpt_state, tx_data, unexecuted_tx_digests) = {
558            let _scope = iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
559            let (ckpt_state, tx_data) = self.load_checkpoint_transactions(checkpoint);
560            let unexecuted_tx_digests = self.schedule_transaction_execution(&ckpt_state, &tx_data);
561            (ckpt_state, tx_data, unexecuted_tx_digests)
562        };
563
564        finish_stage!(pipeline_handle, ExecuteTransactions);
565
566        {
567            let _metrics_scope = iota_metrics::monitored_scope(
568                "CheckpointExecutor::notify_read_executed_effects_digests",
569            );
570            self.transaction_cache_reader
571                .notify_read_executed_effects_digests(&unexecuted_tx_digests)
572                .await;
573        }
574
575        finish_stage!(pipeline_handle, WaitForTransactions);
576
577        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
578            self.execute_change_epoch_tx(&tx_data).await;
579        }
580
581        let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
582
583        if self.state.is_fullnode(&self.epoch_store) {
584            self.state.congestion_tracker.process_checkpoint_effects(
585                &*self.transaction_cache_reader,
586                &ckpt_state.data.checkpoint,
587                &tx_data.effects,
588            );
589        }
590
591        self.insert_finalized_transactions(
592            &ckpt_state.data.tx_digests,
593            sequence_number,
594            ckpt_state.data.checkpoint.timestamp_ms,
595        );
596
597        // The early versions of the hasher (prior to effectsv2) rely on db
598        // state, so we must wait until all transactions have been executed
599        // before accumulating the checkpoint.
600        ckpt_state.state_hash = Some(
601            self.global_state_hasher
602                .accumulate_checkpoint(&tx_data.effects, sequence_number, &self.epoch_store)
603                .expect("epoch cannot have ended"),
604        );
605
606        finish_stage!(pipeline_handle, FinalizeTransactions);
607
608        ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state.data, &tx_data);
609
610        finish_stage!(pipeline_handle, ProcessCheckpointData);
611
612        ckpt_state
613    }
614
615    fn checkpoint_data_enabled(&self) -> bool {
616        self.state.grpc_indexes_store.is_some()
617            || self.config.data_ingestion_dir.is_some()
618            || self.data_sender.is_some()
619    }
620
621    fn insert_finalized_transactions(
622        &self,
623        tx_digests: &[TransactionDigest],
624        sequence_number: CheckpointSequenceNumber,
625        timestamp_ms: u64,
626    ) {
627        self.epoch_store
628            .insert_finalized_transactions(tx_digests, sequence_number, timestamp_ms)
629            .expect("failed to insert finalized transactions");
630
631        if self.state.is_fullnode(&self.epoch_store) {
632            // TODO remove once we no longer need to support this table for read RPC
633            self.state
634                .get_checkpoint_cache()
635                .insert_finalized_transactions_perpetual_checkpoints(
636                    tx_digests,
637                    self.epoch_store.epoch(),
638                    sequence_number,
639                );
640        }
641    }
642
643    #[instrument(level = "info", skip_all)]
644    fn process_checkpoint_data(
645        &self,
646        ckpt_data: &CheckpointExecutionData,
647        tx_data: &CheckpointTransactionData,
648    ) -> Option<CheckpointData> {
649        if !self.checkpoint_data_enabled() {
650            return None;
651        }
652
653        let checkpoint_data = load_checkpoint_data(
654            ckpt_data,
655            tx_data,
656            self.state.get_object_store(),
657            &*self.transaction_cache_reader,
658        )
659        .expect("failed to load checkpoint data");
660
661        // Index the checkpoint. this is done out of order and is not written and
662        // committed to the DB until later (committing must be done
663        // in-order)
664        if let Some(grpc_indexes_store) = &self.state.grpc_indexes_store {
665            grpc_indexes_store.index_checkpoint(&checkpoint_data);
666        }
667
668        if let Some(path) = &self.config.data_ingestion_dir {
669            store_checkpoint_locally(path, &checkpoint_data)
670                .expect("failed to store checkpoint locally");
671        }
672
673        Some(checkpoint_data)
674    }
675
676    // Load all required transaction and effects data for the checkpoint.
677    #[instrument(level = "info", skip_all)]
678    fn load_checkpoint_transactions(
679        &self,
680        checkpoint: VerifiedCheckpoint,
681    ) -> (CheckpointExecutionState, CheckpointTransactionData) {
682        let seq = checkpoint.sequence_number;
683        let epoch = checkpoint.epoch;
684
685        let checkpoint_contents = self
686            .checkpoint_store
687            .get_checkpoint_contents(&checkpoint.content_digest)
688            .expect("db error")
689            .expect("checkpoint contents not found");
690
691        // attempt to load full checkpoint contents in bulk
692        if let Some(full_contents) = self
693            .checkpoint_store
694            .get_full_checkpoint_contents_by_sequence_number(seq)
695            .expect("Failed to get checkpoint contents from store")
696            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
697        {
698            let num_txns = full_contents.size();
699            let mut tx_digests = Vec::with_capacity(num_txns);
700            let mut transactions = Vec::with_capacity(num_txns);
701            let mut effects = Vec::with_capacity(num_txns);
702            let mut fx_digests = Vec::with_capacity(num_txns);
703
704            full_contents
705                .into_iter()
706                .zip(checkpoint_contents.iter())
707                .for_each(|(execution_data, digests)| {
708                    let tx_digest = digests.transaction;
709                    let fx_digest = digests.effects;
710                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
711                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
712
713                    tx_digests.push(tx_digest);
714                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
715                        VerifiedTransaction::new_unchecked(execution_data.transaction),
716                        epoch,
717                        seq,
718                    ));
719                    effects.push(execution_data.effects);
720                    fx_digests.push(fx_digest);
721                });
722
723            let executed_fx_digests = self
724                .transaction_cache_reader
725                .multi_get_executed_effects_digests(&tx_digests);
726
727            (
728                CheckpointExecutionState::new(CheckpointExecutionData {
729                    checkpoint,
730                    checkpoint_contents,
731                    tx_digests,
732                    fx_digests,
733                }),
734                CheckpointTransactionData {
735                    transactions,
736                    effects,
737                    executed_fx_digests,
738                },
739            )
740        } else {
741            // load items one-by-one
742
743            let digests = checkpoint_contents.inner();
744
745            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
746                digests.iter().map(|d| (d.transaction, d.effects)).unzip();
747            let transactions = self
748                .transaction_cache_reader
749                .multi_get_transaction_blocks(&tx_digests)
750                .into_iter()
751                .enumerate()
752                .map(|(i, tx)| {
753                    let tx = tx
754                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
755                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
756                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
757                })
758                .collect();
759            let effects = self
760                .transaction_cache_reader
761                .multi_get_effects(&fx_digests)
762                .into_iter()
763                .enumerate()
764                .map(|(i, effect)| {
765                    effect.unwrap_or_else(|| {
766                        fatal!("checkpoint effect not found for {:?}", digests[i])
767                    })
768                })
769                .collect();
770
771            let executed_fx_digests = self
772                .transaction_cache_reader
773                .multi_get_executed_effects_digests(&tx_digests);
774
775            (
776                CheckpointExecutionState::new(CheckpointExecutionData {
777                    checkpoint,
778                    checkpoint_contents,
779                    tx_digests,
780                    fx_digests,
781                }),
782                CheckpointTransactionData {
783                    transactions,
784                    effects,
785                    executed_fx_digests,
786                },
787            )
788        }
789    }
790
791    // Schedule all unexecuted transactions in the checkpoint for execution
792    #[instrument(level = "info", skip_all)]
793    fn schedule_transaction_execution(
794        &self,
795        ckpt_state: &CheckpointExecutionState,
796        tx_data: &CheckpointTransactionData,
797    ) -> Vec<TransactionDigest> {
798        // Find unexecuted transactions and their expected effects digests
799        let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
800            itertools::multiunzip(
801                itertools::izip!(
802                    tx_data.transactions.iter(),
803                    ckpt_state.data.tx_digests.iter(),
804                    ckpt_state.data.fx_digests.iter(),
805                    tx_data.effects.iter(),
806                    tx_data.executed_fx_digests.iter()
807                )
808                .filter_map(
809                    |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
810                        if let Some(executed_fx_digest) = executed_fx_digest {
811                            assert_not_forked(
812                                &ckpt_state.data.checkpoint,
813                                tx_digest,
814                                expected_fx_digest,
815                                executed_fx_digest,
816                                &*self.transaction_cache_reader,
817                            );
818                            None
819                        } else if txn.transaction_data().is_end_of_epoch_tx() {
820                            None
821                        } else {
822                            Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
823                        }
824                    },
825                ),
826            );
827
828        for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
829            if tx.contains_shared_object() {
830                self.epoch_store
831                    .acquire_shared_version_assignments_from_effects(
832                        tx,
833                        effects,
834                        &*self.object_cache_reader,
835                    )
836                    .expect("failed to acquire shared version assignments");
837            }
838        }
839
840        // Enqueue unexecuted transactions with their expected effects digests
841        self.tx_manager
842            .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
843
844        unexecuted_tx_digests
845    }
846
847    // Execute the change epoch txn
848    #[instrument(level = "error", skip_all)]
849    async fn execute_change_epoch_tx(&self, tx_data: &CheckpointTransactionData) {
850        let change_epoch_tx = tx_data.transactions.last().unwrap();
851        let change_epoch_fx = tx_data.effects.last().unwrap();
852        assert_eq!(
853            change_epoch_tx.digest(),
854            change_epoch_fx.transaction_digest()
855        );
856        assert!(
857            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
858            "final txn must be an end of epoch txn"
859        );
860
861        // Ordinarily we would assert that the change epoch txn has not been executed
862        // yet. However, during crash recovery, it is possible that we already
863        // passed this point and the txn has been executed. You can uncomment
864        // this assert if you are debugging a problem related to reconfig. If
865        // you hit this assert and it is not because of crash-recovery,
866        // it may indicate a bug in the checkpoint executor.
867        //
868        //     if self
869        //         .transaction_cache_reader
870        //         .get_executed_effects(change_epoch_tx.digest())
871        //         .is_some()
872        //     {
873        //         fatal!(
874        //             "end of epoch txn must not have been executed: {:?}",
875        //             change_epoch_tx.digest()
876        //         );
877        //     }
878
879        self.epoch_store
880            .acquire_shared_version_assignments_from_effects(
881                change_epoch_tx,
882                change_epoch_fx,
883                self.object_cache_reader.as_ref(),
884            )
885            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
886
887        info!(
888            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
889            change_epoch_tx.digest(),
890            change_epoch_fx.digest()
891        );
892        self.tx_manager.enqueue_with_expected_effects_digest(
893            vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
894            &self.epoch_store,
895        );
896
897        self.transaction_cache_reader
898            .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
899            .await;
900    }
901
902    // Increment the highest executed checkpoint watermark and prune old
903    // full-checkpoint contents
904    #[instrument(level = "debug", skip_all)]
905    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
906        // Ensure that we are not skipping checkpoints at any point
907        let seq = *checkpoint.sequence_number();
908        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
909        if let Some(prev_highest) = self
910            .checkpoint_store
911            .get_highest_executed_checkpoint_seq_number()
912            .unwrap()
913        {
914            assert_eq!(prev_highest + 1, seq);
915        } else {
916            assert_eq!(seq, 0);
917        }
918        fail_point!("highest-executed-checkpoint");
919
920        // We store a fixed number of additional FullCheckpointContents after execution
921        // is complete for use in state sync.
922        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
923        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
924            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
925            if let Some(prune_checkpoint) = self
926                .checkpoint_store
927                .get_checkpoint_by_sequence_number(prune_seq)
928                .expect("Failed to fetch checkpoint")
929            {
930                self.checkpoint_store
931                    .delete_full_checkpoint_contents(prune_seq)
932                    .expect("Failed to delete full checkpoint contents");
933                self.checkpoint_store
934                    .delete_contents_digest_sequence_number_mapping(
935                        &prune_checkpoint.content_digest,
936                    )
937                    .expect("Failed to delete contents digest -> sequence number mapping");
938            } else {
939                // If this is directly after a snapshot restore with skiplisting,
940                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
941                // checkpoints.
942                debug!(
943                    "Failed to fetch checkpoint with sequence number {:?}",
944                    prune_seq
945                );
946            }
947        }
948
949        self.checkpoint_store
950            .update_highest_executed_checkpoint(checkpoint)
951            .unwrap();
952        self.metrics.last_executed_checkpoint.set(seq as i64);
953
954        self.metrics
955            .last_executed_checkpoint_timestamp_ms
956            .set(checkpoint.timestamp_ms as i64);
957        checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
958    }
959
960    /// Helper to broadcast checkpoint summary and data if the
961    /// channels are set.
962    fn broadcast_checkpoint(
963        &self,
964        checkpoint_exec_data: &CheckpointExecutionData,
965        checkpoint_data: Option<&CheckpointData>,
966    ) {
967        if let Some(data_sender) = &self.data_sender {
968            let checkpoint_data = if let Some(data) = checkpoint_data {
969                data.clone()
970            } else {
971                // Reconstruct checkpoint data if needed (rare case: data_sender configured but
972                // checkpoint_data_enabled is false)
973                let (_, tx_data) =
974                    self.load_checkpoint_transactions(checkpoint_exec_data.checkpoint.clone());
975                load_checkpoint_data(
976                    checkpoint_exec_data,
977                    &tx_data,
978                    self.state.get_object_store(),
979                    self.transaction_cache_reader.as_ref(),
980                )
981                .expect("Failed to load full CheckpointData")
982            };
983            data_sender(&checkpoint_data);
984        }
985
986        debug!(
987            "[Fullnode] Full CheckpointData is available: seq={}",
988            checkpoint_exec_data.checkpoint.sequence_number()
989        );
990    }
991
992    /// If configured, commit the pending index updates for the provided
993    /// checkpoint
994    #[instrument(level = "info", skip_all)]
995    fn commit_index_updates(&self, checkpoint: CheckpointData) {
996        if let Some(grpc_indexes_store) = &self.state.grpc_indexes_store {
997            grpc_indexes_store
998                .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
999                .expect("failed to update gRPC indexes");
1000        }
1001    }
1002
1003    // Extract randomness rounds from the checkpoint version-specific data (if
1004    // available). Otherwise, extract randomness rounds from the first
1005    // transaction in the checkpoint
1006    #[instrument(level = "debug", skip_all)]
1007    fn extract_randomness_rounds(
1008        &self,
1009        checkpoint: &VerifiedCheckpoint,
1010        checkpoint_contents: &CheckpointContents,
1011    ) -> Vec<RandomnessRound> {
1012        if let Some(version_specific_data) = checkpoint
1013            .version_specific_data(self.epoch_store.protocol_config())
1014            .expect("unable to get version_specific_data")
1015        {
1016            // With version-specific data, randomness rounds are stored in checkpoint
1017            // summary.
1018            version_specific_data.into_v1().randomness_rounds
1019        } else {
1020            // Before version-specific data, checkpoint batching must be disabled. In this
1021            // case, randomness state update tx must be first if it exists,
1022            // because all other transactions in a checkpoint that includes a
1023            // randomness state update are causally dependent on it.
1024            assert_eq!(
1025                0,
1026                self.epoch_store
1027                    .protocol_config()
1028                    .min_checkpoint_interval_ms_as_option()
1029                    .unwrap_or_default(),
1030            );
1031            if let Some(first_digest) = checkpoint_contents.inner().first() {
1032                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
1033                .unwrap_or_else(||
1034                    fatal!(
1035                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1036                        checkpoint.sequence_number()
1037                    )
1038                );
1039                if let TransactionKind::RandomnessStateUpdate(rsu) =
1040                    maybe_randomness_tx.data().transaction_data().kind()
1041                {
1042                    vec![rsu.randomness_round]
1043                } else {
1044                    Vec::new()
1045                }
1046            } else {
1047                Vec::new()
1048            }
1049        }
1050    }
1051}