iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{sync::Arc, time::Instant};
25
26use futures::StreamExt;
27use iota_common::{debug_fatal, fatal};
28use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
29use iota_macros::fail_point;
30use iota_types::{
31    accumulator::Accumulator,
32    base_types::{TransactionDigest, TransactionEffectsDigest},
33    crypto::RandomnessRound,
34    effects::{TransactionEffects, TransactionEffectsAPI},
35    executable_transaction::VerifiedExecutableTransaction,
36    full_checkpoint_content::CheckpointData,
37    inner_temporary_store::PackageStoreWithFallback,
38    message_envelope::Message,
39    messages_checkpoint::{CheckpointContents, CheckpointSequenceNumber, VerifiedCheckpoint},
40    transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
41};
42use parking_lot::Mutex;
43use tap::{TapFallible, TapOptional};
44use tracing::{debug, info, instrument};
45
46use crate::{
47    authority::{
48        AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore,
49        backpressure::BackpressureManager,
50    },
51    checkpoints::CheckpointStore,
52    execution_cache::{ObjectCacheRead, TransactionCacheRead},
53    state_accumulator::StateAccumulator,
54    transaction_manager::TransactionManager,
55};
56
57mod data_ingestion_handler;
58pub mod metrics;
59pub(crate) mod utils;
60
61#[cfg(test)]
62pub(crate) mod tests;
63
64use data_ingestion_handler::{load_checkpoint_data, store_checkpoint_locally};
65use metrics::CheckpointExecutorMetrics;
66use utils::*;
67
68const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
69
70#[derive(PartialEq, Eq, Debug)]
71pub enum StopReason {
72    EpochComplete,
73    RunWithRangeCondition,
74}
75
76pub(crate) struct CheckpointExecutionData {
77    pub checkpoint: VerifiedCheckpoint,
78    pub checkpoint_contents: CheckpointContents,
79    pub tx_digests: Vec<TransactionDigest>,
80    pub fx_digests: Vec<TransactionEffectsDigest>,
81    pub transactions: Vec<VerifiedExecutableTransaction>,
82    pub effects: Vec<TransactionEffects>,
83}
84
85pub(crate) struct CheckpointExecutionState {
86    pub data: CheckpointExecutionData,
87    executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
88    accumulator: Option<Accumulator>,
89    full_data: Option<CheckpointData>,
90}
91
92impl CheckpointExecutionState {
93    pub fn new(
94        data: CheckpointExecutionData,
95        executed_fx_digests: Vec<Option<TransactionEffectsDigest>>,
96    ) -> Self {
97        Self {
98            data,
99            executed_fx_digests,
100            accumulator: None,
101            full_data: None,
102        }
103    }
104}
105
106pub struct CheckpointExecutor {
107    epoch_store: Arc<AuthorityPerEpochStore>,
108    state: Arc<AuthorityState>,
109    checkpoint_store: Arc<CheckpointStore>,
110    object_cache_reader: Arc<dyn ObjectCacheRead>,
111    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
112    tx_manager: Arc<TransactionManager>,
113    accumulator: Arc<StateAccumulator>,
114    backpressure_manager: Arc<BackpressureManager>,
115    config: CheckpointExecutorConfig,
116    metrics: Arc<CheckpointExecutorMetrics>,
117    tps_estimator: Mutex<TPSEstimator>,
118}
119
120impl CheckpointExecutor {
121    pub fn new(
122        epoch_store: Arc<AuthorityPerEpochStore>,
123        checkpoint_store: Arc<CheckpointStore>,
124        state: Arc<AuthorityState>,
125        accumulator: Arc<StateAccumulator>,
126        backpressure_manager: Arc<BackpressureManager>,
127        config: CheckpointExecutorConfig,
128        metrics: Arc<CheckpointExecutorMetrics>,
129    ) -> Self {
130        Self {
131            epoch_store,
132            state: state.clone(),
133            checkpoint_store,
134            object_cache_reader: state.get_object_cache_reader().clone(),
135            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
136            tx_manager: state.transaction_manager().clone(),
137            accumulator,
138            backpressure_manager,
139            config,
140            metrics,
141            tps_estimator: Mutex::new(TPSEstimator::default()),
142        }
143    }
144
145    pub fn new_for_tests(
146        epoch_store: Arc<AuthorityPerEpochStore>,
147        checkpoint_store: Arc<CheckpointStore>,
148        state: Arc<AuthorityState>,
149        accumulator: Arc<StateAccumulator>,
150    ) -> Self {
151        Self::new(
152            epoch_store,
153            checkpoint_store.clone(),
154            state,
155            accumulator,
156            BackpressureManager::new_from_checkpoint_store(&checkpoint_store),
157            Default::default(),
158            CheckpointExecutorMetrics::new_for_tests(),
159        )
160    }
161
162    // Gets the next checkpoint to schedule for execution. If the epoch is already
163    // completed, returns None.
164    fn get_next_to_schedule(&self) -> Option<CheckpointSequenceNumber> {
165        // Decide the first checkpoint to schedule for execution.
166        // If we haven't executed anything in the past, we schedule checkpoint 0.
167        // Otherwise we schedule the one after highest executed.
168        let highest_executed = self
169            .checkpoint_store
170            .get_highest_executed_checkpoint()
171            .unwrap();
172
173        if let Some(highest_executed) = &highest_executed {
174            if self.epoch_store.epoch() == highest_executed.epoch()
175                && highest_executed.is_last_checkpoint_of_epoch()
176            {
177                // We can arrive at this point if we bump the highest_executed_checkpoint
178                // watermark, and then crash before completing reconfiguration.
179                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
180                return None;
181            }
182        }
183
184        Some(
185            highest_executed
186                .as_ref()
187                .map(|c| c.sequence_number() + 1)
188                .unwrap_or_else(|| {
189                    // TODO this invariant may no longer hold once we introduce snapshots
190                    assert_eq!(self.epoch_store.epoch(), 0);
191                    // we need to execute the genesis checkpoint
192                    0
193                }),
194        )
195    }
196
197    /// Execute all checkpoints for the current epoch, ensuring that the node
198    /// has not forked, and return when finished.
199    /// If `run_with_range` is set, execution will stop early.
200    #[instrument(level = "error", skip_all, fields(epoch = ?self.epoch_store.epoch()))]
201    pub async fn run_epoch(self, run_with_range: Option<RunWithRange>) -> StopReason {
202        let _metrics_guard = iota_metrics::monitored_scope("CheckpointExecutor::run_epoch");
203        info!(?run_with_range, "CheckpointExecutor::run_epoch");
204        debug!(
205            "Checkpoint executor running for epoch {:?}",
206            self.epoch_store.epoch(),
207        );
208
209        // check if we want to run this epoch based on RunWithRange condition value
210        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
211        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
212        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(self.epoch_store.epoch())) {
213            info!("RunWithRange condition satisfied at {:?}", run_with_range,);
214            return StopReason::RunWithRangeCondition;
215        };
216
217        self.metrics
218            .checkpoint_exec_epoch
219            .set(self.epoch_store.epoch() as i64);
220
221        let Some(next_to_schedule) = self.get_next_to_schedule() else {
222            return StopReason::EpochComplete;
223        };
224
225        let this = Arc::new(self);
226
227        let concurrency = std::env::var("IOTA_CHECKPOINT_EXECUTION_MAX_CONCURRENCY")
228            .ok()
229            .and_then(|s| s.parse().ok())
230            .unwrap_or(this.config.checkpoint_execution_max_concurrency);
231
232        let final_checkpoint_executed = stream_synced_checkpoints(
233            this.checkpoint_store.clone(),
234            next_to_schedule,
235            run_with_range.and_then(|rwr| rwr.into_checkpoint_bound()),
236        )
237        // Checkpoint loading and execution is parallelized
238        .map(|checkpoint| this.clone().execute_checkpoint(checkpoint))
239        .buffered(concurrency)
240        // Committing checkpoint contents must be done serially
241        // Returns whether the checkpoint just executed was the final checkpoint of the epoch
242        .map(|ckpt_state| this.clone().commit_checkpoint(ckpt_state))
243        // Take the last value from the stream to determine if we completed the epoch
244        .fold(false, |state, is_final_checkpoint| {
245            assert!(!state, "Cannot execute checkpoint after epoch end");
246            is_final_checkpoint
247        })
248        .await;
249
250        if final_checkpoint_executed {
251            StopReason::EpochComplete
252        } else {
253            StopReason::RunWithRangeCondition
254        }
255    }
256}
257
258impl CheckpointExecutor {
259    /// Serially process checkpoints after all transactions have been executed,
260    /// in consecutive order.
261    #[instrument(level = "debug", skip_all, fields(seq = ?ckpt_state.data.checkpoint.sequence_number()))]
262    async fn commit_checkpoint(self: Arc<Self>, mut ckpt_state: CheckpointExecutionState) -> bool /* is final checkpoint */
263    {
264        tokio::task::spawn_blocking({
265            move || {
266                let _sequential_step_guard =
267                    iota_metrics::monitored_scope("CheckpointExecutor::sequential_step");
268
269                let tps = self.tps_estimator.lock().update(
270                    Instant::now(),
271                    ckpt_state.data.checkpoint.network_total_transactions,
272                );
273                self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
274
275                self.backpressure_manager
276                    .update_highest_executed_checkpoint(*ckpt_state.data.checkpoint.sequence_number());
277
278                let is_final_checkpoint = ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch();
279
280                let seq = ckpt_state.data.checkpoint.sequence_number;
281
282                // Commit all transaction effects to disk
283                let cache_commit = self.state.get_cache_commit();
284                debug!(?seq, "committing checkpoint transactions to disk");
285                cache_commit.commit_transaction_outputs(
286                    self.epoch_store.epoch(),
287                    &ckpt_state.data.tx_digests,
288                );
289
290                self.epoch_store
291                    .handle_finalized_checkpoint(&ckpt_state.data.checkpoint, &ckpt_state.data.tx_digests)
292                    .expect("cannot fail");
293
294                // Once the checkpoint is finalized, we know that any randomness contained in this checkpoint has
295                // been successfully included in a checkpoint certified by quorum of validators.
296                // (RandomnessManager/RandomnessReporter is only present on validators.)
297                if let Some(randomness_reporter) = self.epoch_store.randomness_reporter() {
298                    let randomness_rounds = self.extract_randomness_rounds(
299                        &ckpt_state.data.checkpoint,
300                        &ckpt_state.data.checkpoint_contents,
301                    );
302                    for round in randomness_rounds {
303                        debug!(
304                            ?round,
305                            "notifying RandomnessReporter that randomness update was executed in checkpoint"
306                        );
307                        randomness_reporter
308                            .notify_randomness_in_checkpoint(round)
309                            .expect("epoch cannot have ended");
310                    }
311                }
312
313                if let Some(checkpoint_data) = ckpt_state.full_data.take() {
314                    self.commit_index_updates(checkpoint_data);
315                }
316
317                self.accumulator
318                    .accumulate_running_root(&self.epoch_store, seq, ckpt_state.accumulator)
319                    .expect("Failed to accumulate running root");
320
321                if is_final_checkpoint {
322                    self.checkpoint_store
323                        .insert_epoch_last_checkpoint(self.epoch_store.epoch(), &ckpt_state.data.checkpoint)
324                        .expect("Failed to insert epoch last checkpoint");
325
326                    self.accumulator
327                        .accumulate_epoch(self.epoch_store.clone(), seq)
328                        .expect("Accumulating epoch cannot fail");
329
330                    self.checkpoint_store
331                        .prune_local_summaries()
332                        .tap_err(|e| debug_fatal!("Failed to prune local summaries: {}", e))
333                        .ok();
334                }
335
336                fail_point!("crash");
337
338                self.bump_highest_executed_checkpoint(&ckpt_state.data.checkpoint);
339
340                ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch()
341            }
342        }).await.unwrap()
343    }
344
345    /// Load all data for a checkpoint, ensure all transactions are executed,
346    /// and check for forks.
347    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
348    async fn execute_checkpoint(
349        self: Arc<Self>,
350        checkpoint: VerifiedCheckpoint,
351    ) -> CheckpointExecutionState {
352        info!("executing checkpoint");
353
354        checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
355        self.backpressure_manager
356            .update_highest_certified_checkpoint(*checkpoint.sequence_number());
357
358        let sequence_number = checkpoint.sequence_number;
359        if checkpoint.is_last_checkpoint_of_epoch() && sequence_number > 0 {
360            let _wait_for_previous_checkpoints_guard =
361                iota_metrics::monitored_scope("CheckpointExecutor::wait_for_previous_checkpoints");
362
363            info!(
364                "Reached end of epoch checkpoint, waiting for all previous checkpoints to be executed"
365            );
366            self.checkpoint_store
367                .notify_read_executed_checkpoint(sequence_number - 1)
368                .await;
369        }
370
371        let _parallel_step_guard =
372            iota_metrics::monitored_scope("CheckpointExecutor::parallel_step");
373
374        let (mut ckpt_state, unexecuted_tx_digests) = tokio::task::spawn_blocking({
375            let this = self.clone();
376            move || {
377                let _scope =
378                    iota_metrics::monitored_scope("CheckpointExecutor::execute_transactions");
379                let ckpt_state = this.load_checkpoint_transactions(checkpoint);
380                let unexecuted_tx_digests = this.schedule_transaction_execution(&ckpt_state);
381                (ckpt_state, unexecuted_tx_digests)
382            }
383        })
384        .await
385        .unwrap();
386
387        self.transaction_cache_reader
388            .notify_read_executed_effects_digests(&unexecuted_tx_digests)
389            .await;
390
391        if ckpt_state.data.checkpoint.is_last_checkpoint_of_epoch() {
392            self.execute_change_epoch_tx(&ckpt_state).await;
393        }
394
395        tokio::task::spawn_blocking(move || {
396            let _scope = iota_metrics::monitored_scope("CheckpointExecutor::finalize_checkpoint");
397            self.epoch_store
398                .insert_finalized_transactions(&ckpt_state.data.tx_digests, sequence_number)
399                .expect("failed to insert finalized transactions");
400
401            if self.state.is_fullnode(&self.epoch_store) {
402                self.state.congestion_tracker.process_checkpoint_effects(
403                    &*self.transaction_cache_reader,
404                    &ckpt_state.data.checkpoint,
405                    &ckpt_state.data.effects,
406                );
407            }
408
409            // TODO remove once we no longer need to support this table for read RPC
410            self.state
411                .get_checkpoint_cache()
412                .insert_finalized_transactions_perpetual_checkpoints(
413                    &ckpt_state.data.tx_digests,
414                    self.epoch_store.epoch(),
415                    sequence_number,
416                );
417
418            // The early versions of the accumulator (prior to effectsv2) rely on db
419            // state, so we must wait until all transactions have been executed
420            // before accumulating the checkpoint.
421            ckpt_state.accumulator = Some(
422                self.accumulator
423                    .accumulate_checkpoint(
424                        &ckpt_state.data.effects,
425                        sequence_number,
426                        &self.epoch_store,
427                    )
428                    .expect("epoch cannot have ended"),
429            );
430
431            ckpt_state.full_data = self.process_checkpoint_data(&ckpt_state);
432            ckpt_state
433        })
434        .await
435        .unwrap()
436    }
437
438    fn checkpoint_data_enabled(&self) -> bool {
439        self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some()
440    }
441
442    fn process_checkpoint_data(
443        &self,
444        ckpt_state: &CheckpointExecutionState,
445    ) -> Option<CheckpointData> {
446        if !self.checkpoint_data_enabled() {
447            return None;
448        }
449
450        let checkpoint_data = load_checkpoint_data(
451            &ckpt_state.data,
452            &*self.object_cache_reader,
453            &*self.transaction_cache_reader,
454        )
455        .expect("failed to load checkpoint data");
456
457        if self.state.rest_index.is_some() || self.config.data_ingestion_dir.is_some() {
458            // Index the checkpoint. this is done out of order and is not written and
459            // committed to the DB until later (committing must be done
460            // in-order)
461            if let Some(rest_index) = &self.state.rest_index {
462                let mut layout_resolver = self.epoch_store.executor().type_layout_resolver(
463                    Box::new(PackageStoreWithFallback::new(
464                        self.state.get_backing_package_store(),
465                        &checkpoint_data,
466                    )),
467                );
468
469                rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut());
470            }
471
472            if let Some(path) = &self.config.data_ingestion_dir {
473                store_checkpoint_locally(path, &checkpoint_data)
474                    .expect("failed to store checkpoint locally");
475            }
476        }
477
478        Some(checkpoint_data)
479    }
480
481    // Load all required transaction and effects data for the checkpoint.
482    fn load_checkpoint_transactions(
483        &self,
484        checkpoint: VerifiedCheckpoint,
485    ) -> CheckpointExecutionState {
486        let seq = checkpoint.sequence_number;
487        let epoch = checkpoint.epoch;
488
489        let checkpoint_contents = self
490            .checkpoint_store
491            .get_checkpoint_contents(&checkpoint.content_digest)
492            .expect("db error")
493            .expect("checkpoint contents not found");
494
495        // attempt to load full checkpoint contents in bulk
496        if let Some(full_contents) = self
497            .checkpoint_store
498            .get_full_checkpoint_contents_by_sequence_number(seq)
499            .expect("Failed to get checkpoint contents from store")
500            .tap_some(|_| debug!("loaded full checkpoint contents in bulk for sequence {seq}"))
501        {
502            let num_txns = full_contents.size();
503            let mut tx_digests = Vec::with_capacity(num_txns);
504            let mut transactions = Vec::with_capacity(num_txns);
505            let mut effects = Vec::with_capacity(num_txns);
506            let mut fx_digests = Vec::with_capacity(num_txns);
507
508            full_contents
509                .into_iter()
510                .zip(checkpoint_contents.iter())
511                .for_each(|(execution_data, digests)| {
512                    let tx_digest = digests.transaction;
513                    let fx_digest = digests.effects;
514                    debug_assert_eq!(tx_digest, *execution_data.transaction.digest());
515                    debug_assert_eq!(fx_digest, execution_data.effects.digest());
516
517                    tx_digests.push(tx_digest);
518                    transactions.push(VerifiedExecutableTransaction::new_from_checkpoint(
519                        VerifiedTransaction::new_unchecked(execution_data.transaction),
520                        epoch,
521                        seq,
522                    ));
523                    effects.push(execution_data.effects);
524                    fx_digests.push(fx_digest);
525                });
526
527            let executed_fx_digests = self
528                .transaction_cache_reader
529                .multi_get_executed_effects_digests(&tx_digests);
530
531            CheckpointExecutionState::new(
532                CheckpointExecutionData {
533                    checkpoint,
534                    checkpoint_contents,
535                    tx_digests,
536                    fx_digests,
537                    transactions,
538                    effects,
539                },
540                executed_fx_digests,
541            )
542        } else {
543            // load items one-by-one
544
545            let digests = checkpoint_contents.inner();
546
547            let (tx_digests, fx_digests): (Vec<_>, Vec<_>) =
548                digests.iter().map(|d| (d.transaction, d.effects)).unzip();
549            let transactions = self
550                .transaction_cache_reader
551                .multi_get_transaction_blocks(&tx_digests)
552                .into_iter()
553                .enumerate()
554                .map(|(i, tx)| {
555                    let tx = tx
556                        .unwrap_or_else(|| fatal!("transaction not found for {:?}", tx_digests[i]));
557                    let tx = Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone());
558                    VerifiedExecutableTransaction::new_from_checkpoint(tx, epoch, seq)
559                })
560                .collect();
561            let effects = self
562                .transaction_cache_reader
563                .multi_get_effects(&fx_digests)
564                .into_iter()
565                .enumerate()
566                .map(|(i, effect)| {
567                    effect.unwrap_or_else(|| {
568                        fatal!("checkpoint effect not found for {:?}", digests[i])
569                    })
570                })
571                .collect();
572
573            let executed_fx_digests = self
574                .transaction_cache_reader
575                .multi_get_executed_effects_digests(&tx_digests);
576
577            CheckpointExecutionState::new(
578                CheckpointExecutionData {
579                    checkpoint,
580                    checkpoint_contents,
581                    tx_digests,
582                    fx_digests,
583                    transactions,
584                    effects,
585                },
586                executed_fx_digests,
587            )
588        }
589    }
590
591    // Schedule all unexecuted transactions in the checkpoint for execution
592    fn schedule_transaction_execution(
593        &self,
594        ckpt_state: &CheckpointExecutionState,
595    ) -> Vec<TransactionDigest> {
596        // Find unexecuted transactions and their expected effects digests
597        let (unexecuted_tx_digests, unexecuted_txns, unexecuted_effects): (Vec<_>, Vec<_>, Vec<_>) =
598            itertools::multiunzip(
599                itertools::izip!(
600                    ckpt_state.data.transactions.iter(),
601                    ckpt_state.data.tx_digests.iter(),
602                    ckpt_state.data.fx_digests.iter(),
603                    ckpt_state.data.effects.iter(),
604                    ckpt_state.executed_fx_digests.iter()
605                )
606                .filter_map(
607                    |(txn, tx_digest, expected_fx_digest, effects, executed_fx_digest)| {
608                        if let Some(executed_fx_digest) = executed_fx_digest {
609                            assert_not_forked(
610                                &ckpt_state.data.checkpoint,
611                                tx_digest,
612                                expected_fx_digest,
613                                executed_fx_digest,
614                                &*self.transaction_cache_reader,
615                            );
616                            None
617                        } else if txn.transaction_data().is_end_of_epoch_tx() {
618                            None
619                        } else {
620                            Some((tx_digest, (txn.clone(), *expected_fx_digest), effects))
621                        }
622                    },
623                ),
624            );
625
626        for ((tx, _), effects) in itertools::izip!(unexecuted_txns.iter(), unexecuted_effects) {
627            if tx.contains_shared_object() {
628                self.epoch_store
629                    .acquire_shared_version_assignments_from_effects(
630                        tx,
631                        effects,
632                        &*self.object_cache_reader,
633                    )
634                    .expect("failed to acquire shared version assignments");
635            }
636        }
637
638        // Enqueue unexecuted transactions with their expected effects digests
639        self.tx_manager
640            .enqueue_with_expected_effects_digest(unexecuted_txns, &self.epoch_store);
641
642        unexecuted_tx_digests
643    }
644
645    // Execute the change epoch txn
646    async fn execute_change_epoch_tx(&self, ckpt_state: &CheckpointExecutionState) {
647        let change_epoch_tx = ckpt_state.data.transactions.last().unwrap();
648        let change_epoch_fx = ckpt_state.data.effects.last().unwrap();
649        assert_eq!(
650            change_epoch_tx.digest(),
651            change_epoch_fx.transaction_digest()
652        );
653        assert!(
654            change_epoch_tx.transaction_data().is_end_of_epoch_tx(),
655            "final txn must be an end of epoch txn"
656        );
657
658        // Ordinarily we would assert that the change epoch txn has not been executed
659        // yet. However, during crash recovery, it is possible that we already
660        // passed this point and the txn has been executed. You can uncomment
661        // this assert if you are debugging a problem related to reconfig. If
662        // you hit this assert and it is not because of crash-recovery,
663        // it may indicate a bug in the checkpoint executor.
664        //
665        //     if self
666        //         .transaction_cache_reader
667        //         .get_executed_effects(change_epoch_tx.digest())
668        //         .is_some()
669        //     {
670        //         fatal!(
671        //             "end of epoch txn must not have been executed: {:?}",
672        //             change_epoch_tx.digest()
673        //         );
674        //     }
675
676        self.epoch_store
677            .acquire_shared_version_assignments_from_effects(
678                change_epoch_tx,
679                change_epoch_fx,
680                self.object_cache_reader.as_ref(),
681            )
682            .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
683
684        info!(
685            "scheduling change epoch txn with digest: {:?}, expected effects digest: {:?}",
686            change_epoch_tx.digest(),
687            change_epoch_fx.digest()
688        );
689        self.tx_manager.enqueue_with_expected_effects_digest(
690            vec![(change_epoch_tx.clone(), change_epoch_fx.digest())],
691            &self.epoch_store,
692        );
693
694        self.transaction_cache_reader
695            .notify_read_executed_effects_digests(&[*change_epoch_tx.digest()])
696            .await;
697    }
698
699    // Increment the highest executed checkpoint watermark and prune old
700    // full-checkpoint contents
701    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
702        // Ensure that we are not skipping checkpoints at any point
703        let seq = *checkpoint.sequence_number();
704        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
705        if let Some(prev_highest) = self
706            .checkpoint_store
707            .get_highest_executed_checkpoint_seq_number()
708            .unwrap()
709        {
710            assert_eq!(prev_highest + 1, seq);
711        } else {
712            assert_eq!(seq, 0);
713        }
714        if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
715            info!("Finished syncing and executing checkpoint {}", seq);
716        }
717
718        fail_point!("highest-executed-checkpoint");
719
720        // We store a fixed number of additional FullCheckpointContents after execution
721        // is complete for use in state sync.
722        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
723        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
724            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
725            if let Some(prune_checkpoint) = self
726                .checkpoint_store
727                .get_checkpoint_by_sequence_number(prune_seq)
728                .expect("Failed to fetch checkpoint")
729            {
730                self.checkpoint_store
731                    .delete_full_checkpoint_contents(prune_seq)
732                    .expect("Failed to delete full checkpoint contents");
733                self.checkpoint_store
734                    .delete_contents_digest_sequence_number_mapping(
735                        &prune_checkpoint.content_digest,
736                    )
737                    .expect("Failed to delete contents digest -> sequence number mapping");
738            } else {
739                // If this is directly after a snapshot restore with skiplisting,
740                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
741                // checkpoints.
742                debug!(
743                    "Failed to fetch checkpoint with sequence number {:?}",
744                    prune_seq
745                );
746            }
747        }
748
749        self.checkpoint_store
750            .update_highest_executed_checkpoint(checkpoint)
751            .unwrap();
752        self.metrics.last_executed_checkpoint.set(seq as i64);
753
754        self.metrics
755            .last_executed_checkpoint_timestamp_ms
756            .set(checkpoint.timestamp_ms as i64);
757        checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
758    }
759
760    /// If configured, commit the pending index updates for the provided
761    /// checkpoint
762    fn commit_index_updates(&self, checkpoint: CheckpointData) {
763        if let Some(rest_index) = &self.state.rest_index {
764            rest_index
765                .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
766                .expect("failed to update rest_indexes");
767        }
768    }
769
770    // Extract randomness rounds from the checkpoint version-specific data (if
771    // available). Otherwise, extract randomness rounds from the first
772    // transaction in the checkpoint
773    fn extract_randomness_rounds(
774        &self,
775        checkpoint: &VerifiedCheckpoint,
776        checkpoint_contents: &CheckpointContents,
777    ) -> Vec<RandomnessRound> {
778        if let Some(version_specific_data) = checkpoint
779            .version_specific_data(self.epoch_store.protocol_config())
780            .expect("unable to get version_specific_data")
781        {
782            // With version-specific data, randomness rounds are stored in checkpoint
783            // summary.
784            version_specific_data.into_v1().randomness_rounds
785        } else {
786            // Before version-specific data, checkpoint batching must be disabled. In this
787            // case, randomness state update tx must be first if it exists,
788            // because all other transactions in a checkpoint that includes a
789            // randomness state update are causally dependent on it.
790            assert_eq!(
791                0,
792                self.epoch_store
793                    .protocol_config()
794                    .min_checkpoint_interval_ms_as_option()
795                    .unwrap_or_default(),
796            );
797            if let Some(first_digest) = checkpoint_contents.inner().first() {
798                let maybe_randomness_tx = self.transaction_cache_reader.get_transaction_block(&first_digest.transaction)
799                .unwrap_or_else(||
800                    fatal!(
801                        "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
802                        checkpoint.sequence_number()
803                    )
804                );
805                if let TransactionKind::RandomnessStateUpdate(rsu) =
806                    maybe_randomness_tx.data().transaction_data().kind()
807                {
808                    vec![rsu.randomness_round]
809                } else {
810                    Vec::new()
811                }
812            } else {
813                Vec::new()
814            }
815        }
816    }
817}