iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31    accumulator::Accumulator,
32    base_types::{TransactionDigest, TransactionEffectsDigest},
33    crypto::RandomnessRound,
34    effects::{TransactionEffects, TransactionEffectsAPI},
35    executable_transaction::VerifiedExecutableTransaction,
36    full_checkpoint_content::CheckpointData,
37    inner_temporary_store::PackageStoreWithFallback,
38    message_envelope::Message,
39    messages_checkpoint::{
40        CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber,
41        VerifiedCheckpoint,
42    },
43    transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
44};
45use parking_lot::Mutex;
46use tap::{TapFallible, TapOptional};
47use tracing::{debug, info, instrument};
48
49use crate::{
50    authority::{
51        AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
52        backpressure::BackpressureManager,
53    },
54    checkpoints::CheckpointStore,
55    execution_cache::{ObjectCacheRead, TransactionCacheRead},
56    state_accumulator::StateAccumulator,
57    transaction_manager::TransactionManager,
58};
59
60mod data_ingestion_handler;
61pub mod metrics;
62pub(crate) mod utils;
63
64#[cfg(test)]
65pub(crate) mod tests;
66
67use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
68use metrics::CheckpointExecutorMetrics;
69use utils::*;
70
71type CheckpointSummarySender = Box<dyn Fn(&CertifiedCheckpointSummary) + Send + Sync>;
72type CheckpointDataSender = Box<dyn Fn(&CheckpointData) + Send + Sync>;
73
74const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
75
76#[derive(PartialEq, Eq, Debug)]
77pub enum StopReason {
78    EpochComplete,
79    RunWithRangeCondition,
80}
81
82pub(crate) struct CheckpointExecutionData {
83    pub checkpoint: VerifiedCheckpoint,
84    pub checkpoint_contents: CheckpointContents,
85    pub tx_digests: Vec<TransactionDigest>,
86    pub fx_digests: Vec<TransactionEffectsDigest>,
87    pub transactions: Vec<VerifiedExecutableTransaction>,
88    pub effects: Vec<TransactionEffects>,
89}
90
91pub(crate) struct CheckpointExecutionState {
92    pub data: CheckpointExecutionData,
93    executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
94    accumulator: Option<Accumulator>,
95    full_data: Option<CheckpointData>,
96}
97
98impl CheckpointExecutionState {
99    pub fn new(
100        data: CheckpointExecutionData,
101        executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
102    ) -> Self {
103        Self {
104            data,
105            executed_fx_digests,
106            accumulator: None,
107            full_data: None,
108        }
109    }
110}
111
112pub struct CheckpointExecutor {
113    epoch_store: Arc<AuthorityPerEpochStore>,
114    state: Arc<AuthorityState>,
115    checkpoint_store: Arc<CheckpointStore>,
116    object_cache_reader: Arc<dyn ObjectCacheRead>,
117    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
118    tx_manager: Arc<TransactionManager>,
119    accumulator: Arc<StateAccumulator>,
120    backpressure_manager: Arc<BackpressureManager>,
121    config: CheckpointExecutorConfig,
122    metrics: Arc<CheckpointExecutorMetrics>,
123    tps_estimator: Mutex<TPSEstimator>,
124    summary_sender: Option<CheckpointSummarySender>,
125    data_sender: Option<CheckpointDataSender>,
126}
127
128impl CheckpointExecutor {
129    pub fn new(
130        epoch_store: Arc<AuthorityPerEpochStore>,
131        checkpoint_store: Arc<CheckpointStore>,
132        state: Arc<AuthorityState>,
133        accumulator: Arc<StateAccumulator>,
134        backpressure_manager: Arc<BackpressureManager>,
135        config: CheckpointExecutorConfig,
136        metrics: Arc<CheckpointExecutorMetrics>,
137        summary_sender: Option<CheckpointSummarySender>,
138        data_sender: Option<CheckpointDataSender>,
139    ) -> Self {
140        Self {
141            epoch_store,
142            state: state.clone(),
143            checkpoint_store,
144            object_cache_reader: state.get_object_cache_reader().clone(),
145            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
146            tx_manager: state.transaction_manager().clone(),
147            accumulator,
148            backpressure_manager,
149            config,
150            metrics,
151            tps_estimator: Mutex::new(TPSEstimator::default()),
152            summary_sender,
153            data_sender,
154        }
155    }
156
157    pub fn new_for_tests(
158        epoch_store: Arc<AuthorityPerEpochStore>,
159        checkpoint_store: Arc<CheckpointStore>,
160        state: Arc<AuthorityState>,
161        accumulator: Arc<StateAccumulator>,
162    ) -> Self {
163        Self::new(
164            epoch_store,
165            checkpoint_store.clone(),
166            state,
167            accumulator,
168            BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
169            Default::default(),
170            CheckpointExecutorMetrics::new_for_tests(),
171            None, // No callback for summary
172            None, // No callback for data
173        )
174    }
175
176    // Gets the next checkpoint to schedule for execution. If the epoch is already
177    // completed, returns None.
178    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
179        // Decide the first checkpoint to schedule for execution.
180        // If we haven't executed anything in the past, we schedule checkpoint 0.
181        // Otherwise we schedule the one after highest executed.
182        let highest_executed = self
183            .checkpoint_store
184            .get_highest_executed_checkpoint()
185            .unwrap();
186
187        if let Some(highest_executed) = &highest_executed {
188            if self.epoch_store.epoch() == highest_executed.epoch()
189                && highest_executed.is_last_checkpoint_of_epoch()
190            {
191                // We can arrive at this point if we bump the highest_executed_checkpoint
192                // watermark, and then crash before completing reconfiguration.
193                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
194                return None;
195            }
196        }
197
198        Some(
199            highest_executed
200                .as_ref()
201                .map(|c| c.sequence_number() + 1)
202                .unwrap_or_else(|| {
203                    // TODO this invariant may no longer hold once we introduce snapshots
204                    assert_eq!(self.epoch_store.epoch(), 0);
205                    // we need to execute the genesis checkpoint
206                    0
207                }),
208        )
209    }
210
211    /// Execute all checkpoints for the current epoch, ensuring that the node
212    /// has not forked, and return when finished.
213    /// If `run_with_range` is set, execution will stop early.
214    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
215    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
216        let _metrics_guard = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
217        info!(?run_with_range, "CheckpointExecutor::run_epoch");
218        debug!(
219            "Checkpoint executor running for epoch {:?}",
220            self.epoch_store.epoch(),
221        );
222
223        // check if we want to run this epoch based on RunWithRange condition value
224        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
225        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
226        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
227            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
228            return StopReason::RunWithRangeCondition;
229        };
230
231        self.metrics
232            .checkpoint_exec_epoch
233            .set(self.epoch_store.epoch() as i64);
234
235        let Some(next_to_schedule) = self.get_next_to_schedule() else {
236            return StopReason::EpochComplete;
237        };
238
239        let this = Arc::new(self);
240
241        let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
242            .ok()
243            .and_then(|s| s.parse().ok())
244            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
245
246        let final_checkpoint_executed = stream_synced_checkpoints(
247            this.checkpoint_store.clone(),
248            next_to_schedule,
249            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
250        )
251        // Checkpoint loading and execution is parallelized
252        .map(|checkpoint| this.clone().execute_checkpoint(checkpoint))
253        .buffered(concurrency)
254        // Committing checkpoint contents must be done serially
255        // Returns whether the checkpoint just executed was the final checkpoint of the epoch
256        .map(|ckpt_state| this.clone().commit_checkpoint(ckpt_state))
257        // Take the last value from the stream to determine if we completed the epoch
258        .fold(false, |state, is_final_checkpoint| {
259            assert!(!state, "Cannot execute checkpoint after epoch end");
260            is_final_checkpoint
261        })
262        .await;
263
264        if final_checkpoint_executed {
265            StopReason::EpochComplete
266        } else {
267            StopReason::RunWithRangeCondition
268        }
269    }
270}
271
272impl CheckpointExecutor {
273    /// Serially process checkpoints after all transactions have been executed,
274    /// in consecutive order.
275    #[instrument(level = "debug", skip_all, fields(seq = ?ckpt_state.data.checkpoint.sequence_number()))]
276    async fn commit_checkpoint(self: Arc<Self>, mut ckpt_state: CheckpointExecutionState) -> bool /* is final checkpoint */
277    {
278        tokio::task::spawn_blocking({
279            move || {
280                let _sequential_step_guard =
281                    iota_metrics::monitored_scope("CheckpointExecutor::sequential_step");
282
283                let tps = self.tps_estimator.lock().update(
284                    Instant::now(),
285                    ckpt_state.data.checkpoint.network_total_transactions,
286                );
287                self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
288
289                self.backpressure_manager
290                    .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
291
292                let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
293
294                let seq = ckpt_state.data.checkpoint.sequence_number;
295
296                // Commit all transaction effects to disk
297                let cache_commit = self.state.get_cache_commit();
298                debug!(?seq, "committing checkpoint transactions to disk");
299                cache_commit.commit_transaction_outputs(
300                    self.epoch_store.epoch(),
301                    &ckpt_state.data.tx_digests,
302                );
303
304                self.epoch_store
305                    .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
306                    .expect("cannot fail");
307
308                // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
309                // been successfully included in a checkpoint certified by quorum of validators.
310                // (RandomnessManager/RandomnessReporter is only present on validators.)
311                if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
312                    let randomness_rounds = self.extract_randomness_rounds(
313                        &ckpt_state.data.checkpoint,
314                        &ckpt_state.data.checkpoint_contents,
315                    );
316                    for round in randomness_rounds {
317                        debug!(
318                            ?round,
319                            "notifying RandomnessReporter that randomness update was executed in checkpoint"
320                        );
321                        randomness_reporter
322                            .notify_randomness_in_checkpoint(round)
323                            .expect("epoch cannot have ended");
324                    }
325                }
326
327                if let Some(checkpoint_data) = ckpt_state.full_data.take() {
328                    self.commit_index_updates(checkpoint_data);
329                }
330
331                self.accumulator
332                    .accumulate_running_root(&self.epoch_store, seq, ckpt_state.accumulator)
333                    .expect("Failed to accumulate running root");
334
335                if is_final_checkpoint {
336                    self.checkpoint_store
337                        .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
338                        .expect("Failed to insert epoch last checkpoint");
339
340                    self.accumulator
341                        .accumulate_epoch(self.epoch_store.clone(), seq)
342                        .expect("Accumulating epoch cannot fail");
343
344                    self.checkpoint_store
345                        .prune_local_summaries()
346                        .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
347                        .ok();
348                }
349
350                fail_point!("crash");
351
352                self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
353
354                self.broadcast_checkpoint(
355                    &ckpt_state.data,
356                    ckpt_state.full_data.as_ref(),
357                );
358
359                ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
360            }
361        }).await.unwrap()
362    }
363
364    /// Load all data for a checkpoint, ensure all transactions are executed,
365    /// and check for forks.
366    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
367    async fn execute_checkpoint(
368        self: Arc<Self>,
369        checkpoint: VerifiedCheckpoint,
370    ) -> CheckpointExecutionState {
371        info!("executing checkpoint");
372
373        checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
374        self.backpressure_manager
375            .update_highest_certified_checkpoint(*checkpoint.sequence_number());
376
377        let sequence_number = checkpoint.sequence_number;
378        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
379            let _wait_for_previous_checkpoints_guard =
380                iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
381
382            info!(
383                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
384            );
385            self.checkpoint_store
386                .notify_read_executed_checkpoint(sequence_number - 1)
387                .await;
388        }
389
390        let _parallel_step_guard =
391            iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
392
393        let (mut ckpt_state, unexecuted_tx_digests) = tokio::task::spawn_blocking({
394            let this = self.clone();
395            move || {
396                let _scope =
397                    iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
398                let ckpt_state = this.load_checkpoint_transactions(checkpoint);
399                let unexecuted_tx_digests = this.schedule_transaction_execution(&ckpt_state);
400                (ckpt_state, unexecuted_tx_digests)
401            }
402        })
403        .await
404        .unwrap();
405
406        self.transaction_cache_reader
407            .notify_read_executed_effects_digests(&unexecuted_tx_digests)
408            .await;
409
410        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
411            self.execute_change_epoch_tx(&ckpt_state).await;
412        }
413
414        tokio::task::spawn_blocking(move || {
415            let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
416            self.epoch_store
417                .insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number)
418                .expect("failed to insert finalized transactions");
419
420            if self.state.is_fullnode(&self.epoch_store) {
421                self.state.congestion_tracker.process_checkpoint_effects(
422                    &*self.transaction_cache_reader,
423                    &ckpt_state.data.checkpoint,
424                    &ckpt_state.data.effects,
425                );
426            }
427
428            // TODO remove once we no longer need to support this table for read RPC
429            self.state
430                .get_checkpoint_cache()
431                .insert_finalized_transactions_perpetual_checkpoints(
432                    &ckpt_state.data.tx_digests,
433                    self.epoch_store.epoch(),
434                    sequence_number,
435                );
436
437            // The early versions of the accumulator (prior to effectsv2) rely on db
438            // state, so we must wait until all transactions have been executed
439            // before accumulating the checkpoint.
440            ckpt_state.accumulator = Some(
441                self.accumulator
442                    .accumulate_checkpoint(
443                        &ckpt_state.data.effects,
444                        sequence_number,
445                        &self.epoch_store,
446                    )
447                    .expect("epoch cannot have ended"),
448            );
449
450            ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state);
451            ckpt_state
452        })
453        .await
454        .unwrap()
455    }
456
457    fn checkpoint_data_enabled(&self) -> bool {
458        self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some()
459    }
460
461    fn process_checkpoint_data(
462        &self,
463        ckpt_state: &CheckpointExecutionState,
464    ) -> Option<CheckpointData> {
465        if !self.checkpoint_data_enabled() {
466            return None;
467        }
468
469        let checkpoint_data = load_checkpoint_data(
470            &ckpt_state.data,
471            &*self.object_cache_reader,
472            &*self.transaction_cache_reader,
473        )
474        .expect("failed to load checkpoint data");
475
476        if self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some() {
477            // Index the checkpoint. this is done out of order and is not written and
478            // committed to the DB until later (committing must be done
479            // in-order)
480            if let Some(rest_index) = &self.state.rest_index {
481                let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
482                    Box::new(PackageStoreWithFallback::new(
483                        self.state.get_backing_package_store(),
484                        &checkpoint_data,
485                    )),
486                );
487
488                rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
489            }
490
491            if let Some(path) = &self.config.data_ingestion_dir {
492                store_checkpoint_locally(path, &checkpoint_data)
493                    .expect("failed to store checkpoint locally");
494            }
495        }
496
497        Some(checkpoint_data)
498    }
499
500    // Load all required transaction and effects data for the checkpoint.
501    fn load_checkpoint_transactions(
502        &self,
503        checkpoint: VerifiedCheckpoint,
504    ) -> CheckpointExecutionState {
505        let seq = checkpoint.sequence_number;
506        let epoch = checkpoint.epoch;
507
508        let checkpoint_contents = self
509            .checkpoint_store
510            .get_checkpoint_contents(&checkpoint.content_digest)
511            .expect("db error")
512            .expect("checkpoint contents not found");
513
514        // attempt to load full checkpoint contents in bulk
515        if let Some(full_contents) = self
516            .checkpoint_store
517            .get_full_checkpoint_contents_by_sequence_number(seq)
518            .expect("Failed to get checkpoint contents from store")
519            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
520        {
521            let num_txns = full_contents.size();
522            let mut tx_digests = Vec::with_capacity(num_txns);
523            let mut transactions = Vec::with_capacity(num_txns);
524            let mut effects = Vec::with_capacity(num_txns);
525            let mut fx_digests = Vec::with_capacity(num_txns);
526
527            full_contents
528                .into_iter()
529                .zip(checkpoint_contents.iter())
530                .for_each(|(execution_data, digests)| {
531                    let tx_digest = digests.transaction;
532                    let fx_digest = digests.effects;
533                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
534                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
535
536                    tx_digests.push(tx_digest);
537                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
538                        VerifiedTransaction::new_unchecked(execution_data.transaction),
539                        epoch,
540                        seq,
541                    ));
542                    effects.push(execution_data.effects);
543                    fx_digests.push(fx_digest);
544                });
545
546            let executed_fx_digests = self
547                .transaction_cache_reader
548                .multi_get_executed_effects_digests(&tx_digests);
549
550            CheckpointExecutionState::new(
551                CheckpointExecutionData {
552                    checkpoint,
553                    checkpoint_contents,
554                    tx_digests,
555                    fx_digests,
556                    transactions,
557                    effects,
558                },
559                executed_fx_digests,
560            )
561        } else {
562            // load items one-by-one
563
564            let digests = checkpoint_contents.inner();
565
566            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
567                digests.iter().map(|d| (d.transaction, d.effects)).unzip();
568            let transactions = self
569                .transaction_cache_reader
570                .multi_get_transaction_blocks(&tx_digests)
571                .into_iter()
572                .enumerate()
573                .map(|(i, tx)| {
574                    let tx = tx
575                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
576                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
577                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
578                })
579                .collect();
580            let effects = self
581                .transaction_cache_reader
582                .multi_get_effects(&fx_digests)
583                .into_iter()
584                .enumerate()
585                .map(|(i, effect)| {
586                    effect.unwrap_or_else(|| {
587                        fatal!("checkpoint effect not found for {:?}", digests[i])
588                    })
589                })
590                .collect();
591
592            let executed_fx_digests = self
593                .transaction_cache_reader
594                .multi_get_executed_effects_digests(&tx_digests);
595
596            CheckpointExecutionState::new(
597                CheckpointExecutionData {
598                    checkpoint,
599                    checkpoint_contents,
600                    tx_digests,
601                    fx_digests,
602                    transactions,
603                    effects,
604                },
605                executed_fx_digests,
606            )
607        }
608    }
609
610    // Schedule all unexecuted transactions in the checkpoint for execution
611    fn schedule_transaction_execution(
612        &self,
613        ckpt_state: &CheckpointExecutionState,
614    ) -> Vec<TransactionDigest> {
615        // Find unexecuted transactions and their expected effects digests
616        let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
617            itertools::multiunzip(
618                itertools::izip!(
619                    ckpt_state.data.transactions.iter(),
620                    ckpt_state.data.tx_digests.iter(),
621                    ckpt_state.data.fx_digests.iter(),
622                    ckpt_state.data.effects.iter(),
623                    ckpt_state.executed_fx_digests.iter()
624                )
625                .filter_map(
626                    |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
627                        if let Some(executed_fx_digest) = executed_fx_digest {
628                            assert_not_forked(
629                                &ckpt_state.data.checkpoint,
630                                tx_digest,
631                                expected_fx_digest,
632                                executed_fx_digest,
633                                &*self.transaction_cache_reader,
634                            );
635                            None
636                        } else if txn.transaction_data().is_end_of_epoch_tx() {
637                            None
638                        } else {
639                            Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
640                        }
641                    },
642                ),
643            );
644
645        for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
646            if tx.contains_shared_object() {
647                self.epoch_store
648                    .acquire_shared_version_assignments_from_effects(
649                        tx,
650                        effects,
651                        &*self.object_cache_reader,
652                    )
653                    .expect("failed to acquire shared version assignments");
654            }
655        }
656
657        // Enqueue unexecuted transactions with their expected effects digests
658        self.tx_manager
659            .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
660
661        unexecuted_tx_digests
662    }
663
664    // Execute the change epoch txn
665    async fn execute_change_epoch_tx(&self, ckpt_state: &CheckpointExecutionState) {
666        let change_epoch_tx = ckpt_state.data.transactions.last().unwrap();
667        let change_epoch_fx = ckpt_state.data.effects.last().unwrap();
668        assert_eq!(
669            change_epoch_tx.digest(),
670            change_epoch_fx.transaction_digest()
671        );
672        assert!(
673            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
674            "final txn must be an end of epoch txn"
675        );
676
677        // Ordinarily we would assert that the change epoch txn has not been executed
678        // yet. However, during crash recovery, it is possible that we already
679        // passed this point and the txn has been executed. You can uncomment
680        // this assert if you are debugging a problem related to reconfig. If
681        // you hit this assert and it is not because of crash-recovery,
682        // it may indicate a bug in the checkpoint executor.
683        //
684        //     if self
685        //         .transaction_cache_reader
686        //         .get_executed_effects(change_epoch_tx.digest())
687        //         .is_some()
688        //     {
689        //         fatal!(
690        //             "end of epoch txn must not have been executed: {:?}",
691        //             change_epoch_tx.digest()
692        //         );
693        //     }
694
695        self.epoch_store
696            .acquire_shared_version_assignments_from_effects(
697                change_epoch_tx,
698                change_epoch_fx,
699                self.object_cache_reader.as_ref(),
700            )
701            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
702
703        info!(
704            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
705            change_epoch_tx.digest(),
706            change_epoch_fx.digest()
707        );
708        self.tx_manager.enqueue_with_expected_effects_digest(
709            vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
710            &self.epoch_store,
711        );
712
713        self.transaction_cache_reader
714            .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
715            .await;
716    }
717
718    // Increment the highest executed checkpoint watermark and prune old
719    // full-checkpoint contents
720    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
721        // Ensure that we are not skipping checkpoints at any point
722        let seq = *checkpoint.sequence_number();
723        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
724        if let Some(prev_highest) = self
725            .checkpoint_store
726            .get_highest_executed_checkpoint_seq_number()
727            .unwrap()
728        {
729            assert_eq!(prev_highest + 1, seq);
730        } else {
731            assert_eq!(seq, 0);
732        }
733        if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
734            info!("Finished syncing and executing checkpoint {}", seq);
735        }
736
737        fail_point!("highest-executed-checkpoint");
738
739        // We store a fixed number of additional FullCheckpointContents after execution
740        // is complete for use in state sync.
741        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
742        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
743            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
744            if let Some(prune_checkpoint) = self
745                .checkpoint_store
746                .get_checkpoint_by_sequence_number(prune_seq)
747                .expect("Failed to fetch checkpoint")
748            {
749                self.checkpoint_store
750                    .delete_full_checkpoint_contents(prune_seq)
751                    .expect("Failed to delete full checkpoint contents");
752                self.checkpoint_store
753                    .delete_contents_digest_sequence_number_mapping(
754                        &prune_checkpoint.content_digest,
755                    )
756                    .expect("Failed to delete contents digest -> sequence number mapping");
757            } else {
758                // If this is directly after a snapshot restore with skiplisting,
759                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
760                // checkpoints.
761                debug!(
762                    "Failed to fetch checkpoint with sequence number {:?}",
763                    prune_seq
764                );
765            }
766        }
767
768        self.checkpoint_store
769            .update_highest_executed_checkpoint(checkpoint)
770            .unwrap();
771        self.metrics.last_executed_checkpoint.set(seq as i64);
772
773        self.metrics
774            .last_executed_checkpoint_timestamp_ms
775            .set(checkpoint.timestamp_ms as i64);
776        checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
777    }
778
779    /// Helper to broadcast checkpoint summary and data if the
780    /// channels are set.
781    fn broadcast_checkpoint(
782        &self,
783        checkpoint_exec_data: &CheckpointExecutionData,
784        checkpoint_data: Option<&CheckpointData>,
785    ) {
786        if let Some(summary_sender) = &self.summary_sender {
787            let summary = CertifiedCheckpointSummary::from(checkpoint_exec_data.checkpoint.clone());
788            summary_sender(&summary);
789        }
790        if let Some(data_sender) = &self.data_sender {
791            let checkpoint_data = if let Some(data) = checkpoint_data {
792                data.clone()
793            } else {
794                load_checkpoint_data(
795                    checkpoint_exec_data,
796                    self.object_cache_reader.as_ref(),
797                    self.transaction_cache_reader.as_ref(),
798                )
799                .expect("Failed to load full CheckpointData")
800            };
801            data_sender(&checkpoint_data);
802        }
803
804        debug!(
805            "[Fullnode] Full CheckpointData is available: seq={}",
806            checkpoint_exec_data.checkpoint.sequence_number()
807        );
808    }
809
810    /// If configured, commit the pending index updates for the provided
811    /// checkpoint
812    fn commit_index_updates(&self, checkpoint: CheckpointData) {
813        if let Some(rest_index) = &self.state.rest_index {
814            rest_index
815                .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
816                .expect("failed to update rest_indexes");
817        }
818    }
819
820    // Extract randomness rounds from the checkpoint version-specific data (if
821    // available). Otherwise, extract randomness rounds from the first
822    // transaction in the checkpoint
823    fn extract_randomness_rounds(
824        &self,
825        checkpoint: &VerifiedCheckpoint,
826        checkpoint_contents: &CheckpointContents,
827    ) -> Vec<RandomnessRound> {
828        if let Some(version_specific_data) = checkpoint
829            .version_specific_data(self.epoch_store.protocol_config())
830            .expect("unable to get version_specific_data")
831        {
832            // With version-specific data, randomness rounds are stored in checkpoint
833            // summary.
834            version_specific_data.into_v1().randomness_rounds
835        } else {
836            // Before version-specific data, checkpoint batching must be disabled. In this
837            // case, randomness state update tx must be first if it exists,
838            // because all other transactions in a checkpoint that includes a
839            // randomness state update are causally dependent on it.
840            assert_eq!(
841                0,
842                self.epoch_store
843                    .protocol_config()
844                    .min_checkpoint_interval_ms_as_option()
845                    .unwrap_or_default(),
846            );
847            if let Some(first_digest) = checkpoint_contents.inner().first() {
848                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
849                .unwrap_or_else(||
850                    fatal!(
851                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
852                        checkpoint.sequence_number()
853                    )
854                );
855                if let TransactionKind::RandomnessStateUpdate(rsu) =
856                    maybe_randomness_tx.data().transaction_data().kind()
857                {
858                    vec![rsu.randomness_round]
859                } else {
860                    Vec::new()
861                }
862            } else {
863                Vec::new()
864            }
865        }
866    }
867}