iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31    accumulator::Accumulator,
32    base_types::{TransactionDigest, TransactionEffectsDigest},
33    crypto::RandomnessRound,
34    effects::{TransactionEffects, TransactionEffectsAPI},
35    executable_transaction::VerifiedExecutableTransaction,
36    full_checkpoint_content::CheckpointData,
37    inner_temporary_store::PackageStoreWithFallback,
38    message_envelope::Message,
39    messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber, VerifiedCheckpoint},
40    transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
41};
42use parking_lot::Mutex;
43use tap::{TapFallible, TapOptional};
44use tracing::{debug, info, instrument};
45
46use crate::{
47    authority::{
48        AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
49        backpressure::BackpressureManager,
50    },
51    checkpoints::CheckpointStore,
52    execution_cache::{ObjectCacheRead, TransactionCacheRead},
53    state_accumulator::StateAccumulator,
54    transaction_manager::TransactionManager,
55};
56
57mod data_ingestion_handler;
58pub mod metrics;
59pub(crate) mod utils;
60
61#[cfg(test)]
62pub(crate) mod tests;
63
64use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
65use metrics::CheckpointExecutorMetrics;
66use utils::*;
67
68type CheckpointDataSender = Box<dyn Fn(&CheckpointData) + Send + Sync>;
69
70const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
71
72#[derive(PartialEq, Eq, Debug)]
73pub enum StopReason {
74    EpochComplete,
75    RunWithRangeCondition,
76}
77
78pub(crate) struct CheckpointExecutionData {
79    pub checkpoint: VerifiedCheckpoint,
80    pub checkpoint_contents: CheckpointContents,
81    pub tx_digests: Vec<TransactionDigest>,
82    pub fx_digests: Vec<TransactionEffectsDigest>,
83    pub transactions: Vec<VerifiedExecutableTransaction>,
84    pub effects: Vec<TransactionEffects>,
85}
86
87pub(crate) struct CheckpointExecutionState {
88    pub data: CheckpointExecutionData,
89    executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
90    accumulator: Option<Accumulator>,
91    full_data: Option<CheckpointData>,
92}
93
94impl CheckpointExecutionState {
95    pub fn new(
96        data: CheckpointExecutionData,
97        executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
98    ) -> Self {
99        Self {
100            data,
101            executed_fx_digests,
102            accumulator: None,
103            full_data: None,
104        }
105    }
106}
107
108pub struct CheckpointExecutor {
109    epoch_store: Arc<AuthorityPerEpochStore>,
110    state: Arc<AuthorityState>,
111    checkpoint_store: Arc<CheckpointStore>,
112    object_cache_reader: Arc<dyn ObjectCacheRead>,
113    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
114    tx_manager: Arc<TransactionManager>,
115    accumulator: Arc<StateAccumulator>,
116    backpressure_manager: Arc<BackpressureManager>,
117    config: CheckpointExecutorConfig,
118    metrics: Arc<CheckpointExecutorMetrics>,
119    tps_estimator: Mutex<TPSEstimator>,
120    data_sender: Option<CheckpointDataSender>,
121}
122
123impl CheckpointExecutor {
124    pub fn new(
125        epoch_store: Arc<AuthorityPerEpochStore>,
126        checkpoint_store: Arc<CheckpointStore>,
127        state: Arc<AuthorityState>,
128        accumulator: Arc<StateAccumulator>,
129        backpressure_manager: Arc<BackpressureManager>,
130        config: CheckpointExecutorConfig,
131        metrics: Arc<CheckpointExecutorMetrics>,
132        data_sender: Option<CheckpointDataSender>,
133    ) -> Self {
134        Self {
135            epoch_store,
136            state: state.clone(),
137            checkpoint_store,
138            object_cache_reader: state.get_object_cache_reader().clone(),
139            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
140            tx_manager: state.transaction_manager().clone(),
141            accumulator,
142            backpressure_manager,
143            config,
144            metrics,
145            tps_estimator: Mutex::new(TPSEstimator::default()),
146            data_sender,
147        }
148    }
149
150    pub fn new_for_tests(
151        epoch_store: Arc<AuthorityPerEpochStore>,
152        checkpoint_store: Arc<CheckpointStore>,
153        state: Arc<AuthorityState>,
154        accumulator: Arc<StateAccumulator>,
155    ) -> Self {
156        Self::new(
157            epoch_store,
158            checkpoint_store.clone(),
159            state,
160            accumulator,
161            BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
162            Default::default(),
163            CheckpointExecutorMetrics::new_for_tests(),
164            None, // No callback for data
165        )
166    }
167
168    // Gets the next checkpoint to schedule for execution. If the epoch is already
169    // completed, returns None.
170    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
171        // Decide the first checkpoint to schedule for execution.
172        // If we haven't executed anything in the past, we schedule checkpoint 0.
173        // Otherwise we schedule the one after highest executed.
174        let highest_executed = self
175            .checkpoint_store
176            .get_highest_executed_checkpoint()
177            .unwrap();
178
179        if let Some(highest_executed) = &highest_executed {
180            if self.epoch_store.epoch() == highest_executed.epoch()
181                && highest_executed.is_last_checkpoint_of_epoch()
182            {
183                // We can arrive at this point if we bump the highest_executed_checkpoint
184                // watermark, and then crash before completing reconfiguration.
185                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
186                return None;
187            }
188        }
189
190        Some(
191            highest_executed
192                .as_ref()
193                .map(|c| c.sequence_number() + 1)
194                .unwrap_or_else(|| {
195                    // TODO this invariant may no longer hold once we introduce snapshots
196                    assert_eq!(self.epoch_store.epoch(), 0);
197                    // we need to execute the genesis checkpoint
198                    0
199                }),
200        )
201    }
202
203    /// Execute all checkpoints for the current epoch, ensuring that the node
204    /// has not forked, and return when finished.
205    /// If `run_with_range` is set, execution will stop early.
206    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
207    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
208        let _metrics_guard = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
209        info!(?run_with_range, "CheckpointExecutor::run_epoch");
210        debug!(
211            "Checkpoint executor running for epoch {:?}",
212            self.epoch_store.epoch(),
213        );
214
215        // check if we want to run this epoch based on RunWithRange condition value
216        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
217        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
218        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
219            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
220            return StopReason::RunWithRangeCondition;
221        };
222
223        self.metrics
224            .checkpoint_exec_epoch
225            .set(self.epoch_store.epoch() as i64);
226
227        let Some(next_to_schedule) = self.get_next_to_schedule() else {
228            return StopReason::EpochComplete;
229        };
230
231        let this = Arc::new(self);
232
233        let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
234            .ok()
235            .and_then(|s| s.parse().ok())
236            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
237
238        let final_checkpoint_executed = stream_synced_checkpoints(
239            this.checkpoint_store.clone(),
240            next_to_schedule,
241            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
242        )
243        // Checkpoint loading and execution is parallelized
244        .map(|checkpoint| this.clone().execute_checkpoint(checkpoint))
245        .buffered(concurrency)
246        // Committing checkpoint contents must be done serially
247        // Returns whether the checkpoint just executed was the final checkpoint of the epoch
248        .map(|ckpt_state| this.clone().commit_checkpoint(ckpt_state))
249        // Take the last value from the stream to determine if we completed the epoch
250        .fold(false, |state, is_final_checkpoint| {
251            assert!(!state, "Cannot execute checkpoint after epoch end");
252            is_final_checkpoint
253        })
254        .await;
255
256        if final_checkpoint_executed {
257            StopReason::EpochComplete
258        } else {
259            StopReason::RunWithRangeCondition
260        }
261    }
262}
263
264impl CheckpointExecutor {
265    /// Serially process checkpoints after all transactions have been executed,
266    /// in consecutive order.
267    #[instrument(level = "debug", skip_all, fields(seq = ?ckpt_state.data.checkpoint.sequence_number()))]
268    async fn commit_checkpoint(self: Arc<Self>, mut ckpt_state: CheckpointExecutionState) -> bool /* is final checkpoint */
269    {
270        tokio::task::spawn_blocking({
271            move || {
272                let _sequential_step_guard =
273                    iota_metrics::monitored_scope("CheckpointExecutor::sequential_step");
274
275                let tps = self.tps_estimator.lock().update(
276                    Instant::now(),
277                    ckpt_state.data.checkpoint.network_total_transactions,
278                );
279                self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
280
281                self.backpressure_manager
282                    .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
283
284                let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
285
286                let seq = ckpt_state.data.checkpoint.sequence_number;
287
288                // Commit all transaction effects to disk
289                let cache_commit = self.state.get_cache_commit();
290                debug!(?seq, "committing checkpoint transactions to disk");
291                cache_commit.commit_transaction_outputs(
292                    self.epoch_store.epoch(),
293                    &ckpt_state.data.tx_digests,
294                );
295
296                self.epoch_store
297                    .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
298                    .expect("cannot fail");
299
300                // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
301                // been successfully included in a checkpoint certified by quorum of validators.
302                // (RandomnessManager/RandomnessReporter is only present on validators.)
303                if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
304                    let randomness_rounds = self.extract_randomness_rounds(
305                        &ckpt_state.data.checkpoint,
306                        &ckpt_state.data.checkpoint_contents,
307                    );
308                    for round in randomness_rounds {
309                        debug!(
310                            ?round,
311                            "notifying RandomnessReporter that randomness update was executed in checkpoint"
312                        );
313                        randomness_reporter
314                            .notify_randomness_in_checkpoint(round)
315                            .expect("epoch cannot have ended");
316                    }
317                }
318
319                if let Some(checkpoint_data) = ckpt_state.full_data.take() {
320                    self.commit_index_updates(checkpoint_data);
321                }
322
323                self.accumulator
324                    .accumulate_running_root(&self.epoch_store, seq, ckpt_state.accumulator)
325                    .expect("Failed to accumulate running root");
326
327                if is_final_checkpoint {
328                    self.checkpoint_store
329                        .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
330                        .expect("Failed to insert epoch last checkpoint");
331
332                    self.accumulator
333                        .accumulate_epoch(self.epoch_store.clone(), seq)
334                        .expect("Accumulating epoch cannot fail");
335
336                    self.checkpoint_store
337                        .prune_local_summaries()
338                        .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
339                        .ok();
340                }
341
342                fail_point!("crash");
343
344                self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
345
346                self.broadcast_checkpoint(
347                    &ckpt_state.data,
348                    ckpt_state.full_data.as_ref(),
349                );
350
351                ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
352            }
353        }).await.unwrap()
354    }
355
356    /// Load all data for a checkpoint, ensure all transactions are executed,
357    /// and check for forks.
358    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
359    async fn execute_checkpoint(
360        self: Arc<Self>,
361        checkpoint: VerifiedCheckpoint,
362    ) -> CheckpointExecutionState {
363        info!("executing checkpoint");
364
365        checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
366        self.backpressure_manager
367            .update_highest_certified_checkpoint(*checkpoint.sequence_number());
368
369        let sequence_number = checkpoint.sequence_number;
370        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
371            let _wait_for_previous_checkpoints_guard =
372                iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
373
374            info!(
375                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
376            );
377            self.checkpoint_store
378                .notify_read_executed_checkpoint(sequence_number - 1)
379                .await;
380        }
381
382        let _parallel_step_guard =
383            iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
384
385        let (mut ckpt_state, unexecuted_tx_digests) = tokio::task::spawn_blocking({
386            let this = self.clone();
387            move || {
388                let _scope =
389                    iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
390                let ckpt_state = this.load_checkpoint_transactions(checkpoint);
391                let unexecuted_tx_digests = this.schedule_transaction_execution(&ckpt_state);
392                (ckpt_state, unexecuted_tx_digests)
393            }
394        })
395        .await
396        .unwrap();
397
398        self.transaction_cache_reader
399            .notify_read_executed_effects_digests(&unexecuted_tx_digests)
400            .await;
401
402        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
403            self.execute_change_epoch_tx(&ckpt_state).await;
404        }
405
406        tokio::task::spawn_blocking(move || {
407            let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
408            self.epoch_store
409                .insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number)
410                .expect("failed to insert finalized transactions");
411
412            if self.state.is_fullnode(&self.epoch_store) {
413                self.state.congestion_tracker.process_checkpoint_effects(
414                    &*self.transaction_cache_reader,
415                    &ckpt_state.data.checkpoint,
416                    &ckpt_state.data.effects,
417                );
418            }
419
420            // TODO remove once we no longer need to support this table for read RPC
421            self.state
422                .get_checkpoint_cache()
423                .insert_finalized_transactions_perpetual_checkpoints(
424                    &ckpt_state.data.tx_digests,
425                    self.epoch_store.epoch(),
426                    sequence_number,
427                );
428
429            // The early versions of the accumulator (prior to effectsv2) rely on db
430            // state, so we must wait until all transactions have been executed
431            // before accumulating the checkpoint.
432            ckpt_state.accumulator = Some(
433                self.accumulator
434                    .accumulate_checkpoint(
435                        &ckpt_state.data.effects,
436                        sequence_number,
437                        &self.epoch_store,
438                    )
439                    .expect("epoch cannot have ended"),
440            );
441
442            ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state);
443            ckpt_state
444        })
445        .await
446        .unwrap()
447    }
448
449    fn checkpoint_data_enabled(&self) -> bool {
450        self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some()
451    }
452
453    fn process_checkpoint_data(
454        &self,
455        ckpt_state: &CheckpointExecutionState,
456    ) -> Option<CheckpointData> {
457        if !self.checkpoint_data_enabled() {
458            return None;
459        }
460
461        let checkpoint_data = load_checkpoint_data(
462            &ckpt_state.data,
463            self.state.get_object_store(),
464            &*self.transaction_cache_reader,
465        )
466        .expect("failed to load checkpoint data");
467
468        if self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some() {
469            // Index the checkpoint. this is done out of order and is not written and
470            // committed to the DB until later (committing must be done
471            // in-order)
472            if let Some(rest_index) = &self.state.rest_index {
473                let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
474                    Box::new(PackageStoreWithFallback::new(
475                        self.state.get_backing_package_store(),
476                        &checkpoint_data,
477                    )),
478                );
479
480                rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
481            }
482
483            if let Some(path) = &self.config.data_ingestion_dir {
484                store_checkpoint_locally(path, &checkpoint_data)
485                    .expect("failed to store checkpoint locally");
486            }
487        }
488
489        Some(checkpoint_data)
490    }
491
492    // Load all required transaction and effects data for the checkpoint.
493    fn load_checkpoint_transactions(
494        &self,
495        checkpoint: VerifiedCheckpoint,
496    ) -> CheckpointExecutionState {
497        let seq = checkpoint.sequence_number;
498        let epoch = checkpoint.epoch;
499
500        let checkpoint_contents = self
501            .checkpoint_store
502            .get_checkpoint_contents(&checkpoint.content_digest)
503            .expect("db error")
504            .expect("checkpoint contents not found");
505
506        // attempt to load full checkpoint contents in bulk
507        if let Some(full_contents) = self
508            .checkpoint_store
509            .get_full_checkpoint_contents_by_sequence_number(seq)
510            .expect("Failed to get checkpoint contents from store")
511            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
512        {
513            let num_txns = full_contents.size();
514            let mut tx_digests = Vec::with_capacity(num_txns);
515            let mut transactions = Vec::with_capacity(num_txns);
516            let mut effects = Vec::with_capacity(num_txns);
517            let mut fx_digests = Vec::with_capacity(num_txns);
518
519            full_contents
520                .into_iter()
521                .zip(checkpoint_contents.iter())
522                .for_each(|(execution_data, digests)| {
523                    let tx_digest = digests.transaction;
524                    let fx_digest = digests.effects;
525                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
526                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
527
528                    tx_digests.push(tx_digest);
529                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
530                        VerifiedTransaction::new_unchecked(execution_data.transaction),
531                        epoch,
532                        seq,
533                    ));
534                    effects.push(execution_data.effects);
535                    fx_digests.push(fx_digest);
536                });
537
538            let executed_fx_digests = self
539                .transaction_cache_reader
540                .multi_get_executed_effects_digests(&tx_digests);
541
542            CheckpointExecutionState::new(
543                CheckpointExecutionData {
544                    checkpoint,
545                    checkpoint_contents,
546                    tx_digests,
547                    fx_digests,
548                    transactions,
549                    effects,
550                },
551                executed_fx_digests,
552            )
553        } else {
554            // load items one-by-one
555
556            let digests = checkpoint_contents.inner();
557
558            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
559                digests.iter().map(|d| (d.transaction, d.effects)).unzip();
560            let transactions = self
561                .transaction_cache_reader
562                .multi_get_transaction_blocks(&tx_digests)
563                .into_iter()
564                .enumerate()
565                .map(|(i, tx)| {
566                    let tx = tx
567                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
568                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
569                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
570                })
571                .collect();
572            let effects = self
573                .transaction_cache_reader
574                .multi_get_effects(&fx_digests)
575                .into_iter()
576                .enumerate()
577                .map(|(i, effect)| {
578                    effect.unwrap_or_else(|| {
579                        fatal!("checkpoint effect not found for {:?}", digests[i])
580                    })
581                })
582                .collect();
583
584            let executed_fx_digests = self
585                .transaction_cache_reader
586                .multi_get_executed_effects_digests(&tx_digests);
587
588            CheckpointExecutionState::new(
589                CheckpointExecutionData {
590                    checkpoint,
591                    checkpoint_contents,
592                    tx_digests,
593                    fx_digests,
594                    transactions,
595                    effects,
596                },
597                executed_fx_digests,
598            )
599        }
600    }
601
602    // Schedule all unexecuted transactions in the checkpoint for execution
603    fn schedule_transaction_execution(
604        &self,
605        ckpt_state: &CheckpointExecutionState,
606    ) -> Vec<TransactionDigest> {
607        // Find unexecuted transactions and their expected effects digests
608        let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
609            itertools::multiunzip(
610                itertools::izip!(
611                    ckpt_state.data.transactions.iter(),
612                    ckpt_state.data.tx_digests.iter(),
613                    ckpt_state.data.fx_digests.iter(),
614                    ckpt_state.data.effects.iter(),
615                    ckpt_state.executed_fx_digests.iter()
616                )
617                .filter_map(
618                    |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
619                        if let Some(executed_fx_digest) = executed_fx_digest {
620                            assert_not_forked(
621                                &ckpt_state.data.checkpoint,
622                                tx_digest,
623                                expected_fx_digest,
624                                executed_fx_digest,
625                                &*self.transaction_cache_reader,
626                            );
627                            None
628                        } else if txn.transaction_data().is_end_of_epoch_tx() {
629                            None
630                        } else {
631                            Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
632                        }
633                    },
634                ),
635            );
636
637        for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
638            if tx.contains_shared_object() {
639                self.epoch_store
640                    .acquire_shared_version_assignments_from_effects(
641                        tx,
642                        effects,
643                        &*self.object_cache_reader,
644                    )
645                    .expect("failed to acquire shared version assignments");
646            }
647        }
648
649        // Enqueue unexecuted transactions with their expected effects digests
650        self.tx_manager
651            .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
652
653        unexecuted_tx_digests
654    }
655
656    // Execute the change epoch txn
657    async fn execute_change_epoch_tx(&self, ckpt_state: &CheckpointExecutionState) {
658        let change_epoch_tx = ckpt_state.data.transactions.last().unwrap();
659        let change_epoch_fx = ckpt_state.data.effects.last().unwrap();
660        assert_eq!(
661            change_epoch_tx.digest(),
662            change_epoch_fx.transaction_digest()
663        );
664        assert!(
665            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
666            "final txn must be an end of epoch txn"
667        );
668
669        // Ordinarily we would assert that the change epoch txn has not been executed
670        // yet. However, during crash recovery, it is possible that we already
671        // passed this point and the txn has been executed. You can uncomment
672        // this assert if you are debugging a problem related to reconfig. If
673        // you hit this assert and it is not because of crash-recovery,
674        // it may indicate a bug in the checkpoint executor.
675        //
676        //     if self
677        //         .transaction_cache_reader
678        //         .get_executed_effects(change_epoch_tx.digest())
679        //         .is_some()
680        //     {
681        //         fatal!(
682        //             "end of epoch txn must not have been executed: {:?}",
683        //             change_epoch_tx.digest()
684        //         );
685        //     }
686
687        self.epoch_store
688            .acquire_shared_version_assignments_from_effects(
689                change_epoch_tx,
690                change_epoch_fx,
691                self.object_cache_reader.as_ref(),
692            )
693            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
694
695        info!(
696            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
697            change_epoch_tx.digest(),
698            change_epoch_fx.digest()
699        );
700        self.tx_manager.enqueue_with_expected_effects_digest(
701            vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
702            &self.epoch_store,
703        );
704
705        self.transaction_cache_reader
706            .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
707            .await;
708    }
709
710    // Increment the highest executed checkpoint watermark and prune old
711    // full-checkpoint contents
712    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
713        // Ensure that we are not skipping checkpoints at any point
714        let seq = *checkpoint.sequence_number();
715        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
716        if let Some(prev_highest) = self
717            .checkpoint_store
718            .get_highest_executed_checkpoint_seq_number()
719            .unwrap()
720        {
721            assert_eq!(prev_highest + 1, seq);
722        } else {
723            assert_eq!(seq, 0);
724        }
725        if seq.is_multiple_of(CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL) {
726            info!("Finished syncing and executing checkpoint {}", seq);
727        }
728
729        fail_point!("highest-executed-checkpoint");
730
731        // We store a fixed number of additional FullCheckpointContents after execution
732        // is complete for use in state sync.
733        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
734        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
735            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
736            if let Some(prune_checkpoint) = self
737                .checkpoint_store
738                .get_checkpoint_by_sequence_number(prune_seq)
739                .expect("Failed to fetch checkpoint")
740            {
741                self.checkpoint_store
742                    .delete_full_checkpoint_contents(prune_seq)
743                    .expect("Failed to delete full checkpoint contents");
744                self.checkpoint_store
745                    .delete_contents_digest_sequence_number_mapping(
746                        &prune_checkpoint.content_digest,
747                    )
748                    .expect("Failed to delete contents digest -> sequence number mapping");
749            } else {
750                // If this is directly after a snapshot restore with skiplisting,
751                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
752                // checkpoints.
753                debug!(
754                    "Failed to fetch checkpoint with sequence number {:?}",
755                    prune_seq
756                );
757            }
758        }
759
760        self.checkpoint_store
761            .update_highest_executed_checkpoint(checkpoint)
762            .unwrap();
763        self.metrics.last_executed_checkpoint.set(seq as i64);
764
765        self.metrics
766            .last_executed_checkpoint_timestamp_ms
767            .set(checkpoint.timestamp_ms as i64);
768        checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
769    }
770
771    /// Helper to broadcast checkpoint summary and data if the
772    /// channels are set.
773    fn broadcast_checkpoint(
774        &self,
775        checkpoint_exec_data: &CheckpointExecutionData,
776        checkpoint_data: Option<&CheckpointData>,
777    ) {
778        if let Some(data_sender) = &self.data_sender {
779            let checkpoint_data = if let Some(data) = checkpoint_data {
780                data.clone()
781            } else {
782                load_checkpoint_data(
783                    checkpoint_exec_data,
784                    self.state.get_object_store(),
785                    self.transaction_cache_reader.as_ref(),
786                )
787                .expect("Failed to load full CheckpointData")
788            };
789            data_sender(&checkpoint_data);
790        }
791
792        debug!(
793            "[Fullnode] Full CheckpointData is available: seq={}",
794            checkpoint_exec_data.checkpoint.sequence_number()
795        );
796    }
797
798    /// If configured, commit the pending index updates for the provided
799    /// checkpoint
800    fn commit_index_updates(&self, checkpoint: CheckpointData) {
801        if let Some(rest_index) = &self.state.rest_index {
802            rest_index
803                .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
804                .expect("failed to update rest_indexes");
805        }
806    }
807
808    // Extract randomness rounds from the checkpoint version-specific data (if
809    // available). Otherwise, extract randomness rounds from the first
810    // transaction in the checkpoint
811    fn extract_randomness_rounds(
812        &self,
813        checkpoint: &VerifiedCheckpoint,
814        checkpoint_contents: &CheckpointContents,
815    ) -> Vec<RandomnessRound> {
816        if let Some(version_specific_data) = checkpoint
817            .version_specific_data(self.epoch_store.protocol_config())
818            .expect("unable to get version_specific_data")
819        {
820            // With version-specific data, randomness rounds are stored in checkpoint
821            // summary.
822            version_specific_data.into_v1().randomness_rounds
823        } else {
824            // Before version-specific data, checkpoint batching must be disabled. In this
825            // case, randomness state update tx must be first if it exists,
826            // because all other transactions in a checkpoint that includes a
827            // randomness state update are causally dependent on it.
828            assert_eq!(
829                0,
830                self.epoch_store
831                    .protocol_config()
832                    .min_checkpoint_interval_ms_as_option()
833                    .unwrap_or_default(),
834            );
835            if let Some(first_digest) = checkpoint_contents.inner().first() {
836                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
837                .unwrap_or_else(||
838                    fatal!(
839                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
840                        checkpoint.sequence_number()
841                    )
842                );
843                if let TransactionKind::RandomnessStateUpdate(rsu) =
844                    maybe_randomness_tx.data().transaction_data().kind()
845                {
846                    vec![rsu.randomness_round]
847                } else {
848                    Vec::new()
849                }
850            } else {
851                Vec::new()
852            }
853        }
854    }
855}