iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{
25    collections::HashMap,
26    path::PathBuf,
27    sync::Arc,
28    time::{Duration, Instant},
29};
30
31use either::Either;
32use futures::stream::FuturesOrdered;
33use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
34use iota_macros::{fail_point, fail_point_async};
35use iota_metrics::spawn_monitored_task;
36use iota_types::{
37    accumulator::Accumulator,
38    base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
39    crypto::RandomnessRound,
40    effects::{TransactionEffects, TransactionEffectsAPI},
41    error::IotaResult,
42    executable_transaction::VerifiedExecutableTransaction,
43    full_checkpoint_content::CheckpointData,
44    inner_temporary_store::PackageStoreWithFallback,
45    message_envelope::Message,
46    messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
47    transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
48};
49use itertools::izip;
50use tap::{TapFallible, TapOptional};
51use tokio::{
52    sync::broadcast::{self, error::RecvError},
53    task::JoinHandle,
54    time::timeout,
55};
56use tokio_stream::StreamExt;
57use tracing::{debug, error, info, instrument, trace, warn};
58
59use self::metrics::CheckpointExecutorMetrics;
60use crate::{
61    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
62    checkpoints::{
63        CheckpointStore,
64        checkpoint_executor::data_ingestion_handler::{
65            load_checkpoint_data, store_checkpoint_locally,
66        },
67    },
68    execution_cache::{ObjectCacheRead, TransactionCacheRead},
69    state_accumulator::StateAccumulator,
70    transaction_manager::TransactionManager,
71};
72
73mod data_ingestion_handler;
74pub mod metrics;
75
76#[cfg(test)]
77pub(crate) mod tests;
78
79type CheckpointExecutionBuffer = FuturesOrdered<
80    JoinHandle<(
81        VerifiedCheckpoint,
82        Option<Accumulator>,
83        Option<CheckpointData>,
84        Vec<TransactionDigest>,
85    )>,
86>;
87
88/// The interval to log checkpoint progress, in # of checkpoints processed.
89const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
90
91#[derive(Debug, Clone, Copy)]
92pub struct CheckpointTimeoutConfig {
93    pub panic_timeout: Option<Duration>,
94    pub warning_timeout: Duration,
95}
96
97// We use a thread local so that the config can be overridden on a per-test
98// basis. This means that get_scheduling_timeout() can be called multiple times
99// in a multithreaded context, but the function is still very cheap to call so
100// this is okay.
101thread_local! {
102    static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
103        const { once_cell::sync::OnceCell::new() };
104}
105
106#[cfg(msim)]
107pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
108    SCHEDULING_TIMEOUT.with(|s| {
109        s.set(config).expect("SchedulingTimeoutConfig already set");
110    });
111}
112
113fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
114    fn inner() -> CheckpointTimeoutConfig {
115        let panic_timeout: Option<Duration> = if cfg!(msim) {
116            Some(Duration::from_secs(45))
117        } else {
118            std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
119                .ok()
120                .and_then(|s| s.parse::<u64>().ok())
121                .map(Duration::from_millis)
122        };
123
124        let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
125            .ok()
126            .and_then(|s| s.parse::<u64>().ok())
127            .map(Duration::from_millis)
128            .unwrap_or(Duration::from_secs(5));
129
130        CheckpointTimeoutConfig {
131            panic_timeout,
132            warning_timeout,
133        }
134    }
135
136    SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
137}
138
139#[derive(PartialEq, Eq, Debug)]
140pub enum StopReason {
141    EpochComplete,
142    RunWithRangeCondition,
143}
144
145pub struct CheckpointExecutor {
146    mailbox: broadcast::Receiver<VerifiedCheckpoint>,
147    state: Arc<AuthorityState>,
148    checkpoint_store: Arc<CheckpointStore>,
149    object_cache_reader: Arc<dyn ObjectCacheRead>,
150    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
151    tx_manager: Arc<TransactionManager>,
152    accumulator: Arc<StateAccumulator>,
153    config: CheckpointExecutorConfig,
154    metrics: Arc<CheckpointExecutorMetrics>,
155}
156
157impl CheckpointExecutor {
158    pub fn new(
159        mailbox: broadcast::Receiver<VerifiedCheckpoint>,
160        checkpoint_store: Arc<CheckpointStore>,
161        state: Arc<AuthorityState>,
162        accumulator: Arc<StateAccumulator>,
163        config: CheckpointExecutorConfig,
164        metrics: Arc<CheckpointExecutorMetrics>,
165    ) -> Self {
166        Self {
167            mailbox,
168            state: state.clone(),
169            checkpoint_store,
170            object_cache_reader: state.get_object_cache_reader().clone(),
171            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
172            tx_manager: state.transaction_manager().clone(),
173            accumulator,
174            config,
175            metrics,
176        }
177    }
178
179    pub fn new_for_tests(
180        mailbox: broadcast::Receiver<VerifiedCheckpoint>,
181        checkpoint_store: Arc<CheckpointStore>,
182        state: Arc<AuthorityState>,
183        accumulator: Arc<StateAccumulator>,
184    ) -> Self {
185        Self::new(
186            mailbox,
187            checkpoint_store,
188            state,
189            accumulator,
190            Default::default(),
191            CheckpointExecutorMetrics::new_for_tests(),
192        )
193    }
194
195    /// Ensure that all checkpoints in the current epoch will be executed.
196    /// We don't technically need &mut on self, but passing it to make sure only
197    /// one instance is running at one time.
198    pub async fn run_epoch(
199        &mut self,
200        epoch_store: Arc<AuthorityPerEpochStore>,
201        run_with_range: Option<RunWithRange>,
202    ) -> StopReason {
203        // check if we want to run this epoch based on RunWithRange condition value
204        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
205        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
206        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(epoch_store.epoch())) {
207            info!(
208                "RunWithRange condition satisfied at {:?}, run_epoch={:?}",
209                run_with_range,
210                epoch_store.epoch()
211            );
212            return StopReason::RunWithRangeCondition;
213        };
214
215        debug!(
216            "Checkpoint executor running for epoch {}",
217            epoch_store.epoch(),
218        );
219        self.metrics
220            .checkpoint_exec_epoch
221            .set(epoch_store.epoch() as i64);
222
223        // Decide the first checkpoint to schedule for execution.
224        // If we haven't executed anything in the past, we schedule checkpoint 0.
225        // Otherwise we schedule the one after highest executed.
226        let mut highest_executed = self
227            .checkpoint_store
228            .get_highest_executed_checkpoint()
229            .unwrap();
230
231        if let Some(highest_executed) = &highest_executed {
232            if epoch_store.epoch() == highest_executed.epoch()
233                && highest_executed.is_last_checkpoint_of_epoch()
234            {
235                // We can arrive at this point if we bump the highest_executed_checkpoint
236                // watermark, and then crash before completing reconfiguration.
237                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
238                return StopReason::EpochComplete;
239            }
240        }
241
242        let mut next_to_schedule = highest_executed
243            .as_ref()
244            .map(|c| c.sequence_number() + 1)
245            .unwrap_or_else(|| {
246                // TODO this invariant may no longer hold once we introduce snapshots
247                assert_eq!(epoch_store.epoch(), 0);
248                0
249            });
250        let mut pending: CheckpointExecutionBuffer = FuturesOrdered::new();
251
252        let mut now_time = Instant::now();
253        let mut now_transaction_num = highest_executed
254            .as_ref()
255            .map(|c| c.network_total_transactions)
256            .unwrap_or(0);
257        let scheduling_timeout_config = get_scheduling_timeout();
258
259        loop {
260            let schedule_scope = iota_metrics::monitored_scope("ScheduleCheckpointExecution");
261
262            // If we have executed the last checkpoint of the current epoch, stop.
263            // Note: when we arrive here with highest_executed == the final checkpoint of
264            // the epoch, we are in an edge case where highest_executed does not
265            // actually correspond to the watermark. The watermark is only
266            // bumped past the epoch final checkpoint after execution of the change
267            // epoch tx, and state accumulation.
268            if self
269                .check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
270                .await
271            {
272                self.checkpoint_store
273                    .prune_local_summaries()
274                    .tap_err(|e| error!("Failed to prune local summaries: {}", e))
275                    .ok();
276
277                // be extra careful to ensure we don't have orphans
278                assert!(
279                    pending.is_empty(),
280                    "Pending checkpoint execution buffer should be empty after processing last checkpoint of epoch",
281                );
282                fail_point_async!("crash");
283                debug!(epoch = epoch_store.epoch(), "finished epoch");
284                return StopReason::EpochComplete;
285            }
286
287            self.schedule_synced_checkpoints(
288                &mut pending,
289                // next_to_schedule will be updated to the next checkpoint to schedule.
290                // This makes sure we don't re-schedule the same checkpoint multiple times.
291                &mut next_to_schedule,
292                epoch_store.clone(),
293                run_with_range,
294            );
295
296            self.metrics
297                .checkpoint_exec_inflight
298                .set(pending.len() as i64);
299
300            let panic_timeout = scheduling_timeout_config.panic_timeout;
301            let warning_timeout = scheduling_timeout_config.warning_timeout;
302
303            drop(schedule_scope);
304            tokio::select! {
305                // Check for completed workers and ratchet the highest_checkpoint_executed
306                // watermark accordingly. Note that given that checkpoints are guaranteed to
307                // be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered
308                // guarantees that we will also ratchet the watermarks in order.
309                Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests))) = pending.next() => {
310                    let _process_scope = iota_metrics::monitored_scope("ProcessExecutedCheckpoint");
311
312                    self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests).await;
313                    highest_executed = Some(checkpoint.clone());
314
315                    // Estimate TPS every 10k transactions or 30 sec
316                    let elapsed = now_time.elapsed().as_millis();
317                    let current_transaction_num =  highest_executed.as_ref().map(|c| c.network_total_transactions).unwrap_or(0);
318                    if current_transaction_num - now_transaction_num > 10_000 || elapsed > 30_000 {
319                        let tps = (1000.0 * (current_transaction_num - now_transaction_num) as f64 / elapsed as f64) as i32;
320                        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
321                        now_time = Instant::now();
322                        now_transaction_num = current_transaction_num;
323                    }
324                     // we want to be inclusive of checkpoints in RunWithRange::Checkpoint type
325                    if run_with_range.is_some_and(|rwr| rwr.matches_checkpoint(checkpoint.sequence_number)) {
326                        info!(
327                            "RunWithRange condition satisfied after checkpoint sequence number {:?}",
328                            checkpoint.sequence_number
329                        );
330                        return StopReason::RunWithRangeCondition;
331                    }
332                }
333
334                received = self.mailbox.recv() => match received {
335                    Ok(checkpoint) => {
336                        debug!(
337                            sequence_number = ?checkpoint.sequence_number,
338                            "Received checkpoint summary from state sync"
339                        );
340                        checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
341                    },
342                    Err(RecvError::Lagged(num_skipped)) => {
343                        debug!(
344                            "Checkpoint Execution Recv channel overflowed with {:?} messages",
345                            num_skipped,
346                        );
347                    }
348                    Err(RecvError::Closed) => {
349                        panic!("Checkpoint Execution Sender (StateSync) closed channel unexpectedly");
350                    },
351                },
352
353                _ = tokio::time::sleep(warning_timeout) => {
354                    warn!(
355                        "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
356                    );
357                }
358
359                _ = panic_timeout
360                            .map(|d| Either::Left(tokio::time::sleep(d)))
361                            .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
362                    panic!("No new synced checkpoints received for {panic_timeout:?} on node {:?}", self.state.name);
363                },
364            }
365        }
366    }
367
368    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
369        // Ensure that we are not skipping checkpoints at any point
370        let seq = *checkpoint.sequence_number();
371        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
372        if let Some(prev_highest) = self
373            .checkpoint_store
374            .get_highest_executed_checkpoint_seq_number()
375            .unwrap()
376        {
377            assert_eq!(prev_highest + 1, seq);
378        } else {
379            assert_eq!(seq, 0);
380        }
381        if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
382            info!("Finished syncing and executing checkpoint {}", seq);
383        }
384
385        fail_point!("highest-executed-checkpoint");
386
387        // We store a fixed number of additional FullCheckpointContents after execution
388        // is complete for use in state sync.
389        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
390        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
391            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
392            if let Some(prune_checkpoint) = self
393                .checkpoint_store
394                .get_checkpoint_by_sequence_number(prune_seq)
395                .expect("Failed to fetch checkpoint")
396            {
397                self.checkpoint_store
398                    .delete_full_checkpoint_contents(prune_seq)
399                    .expect("Failed to delete full checkpoint contents");
400                self.checkpoint_store
401                    .delete_contents_digest_sequence_number_mapping(
402                        &prune_checkpoint.content_digest,
403                    )
404                    .expect("Failed to delete contents digest -> sequence number mapping");
405            } else {
406                // If this is directly after a snapshot restore with skiplisting,
407                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
408                // checkpoints.
409                debug!(
410                    "Failed to fetch checkpoint with sequence number {:?}",
411                    prune_seq
412                );
413            }
414        }
415
416        self.checkpoint_store
417            .update_highest_executed_checkpoint(checkpoint)
418            .unwrap();
419        self.metrics.last_executed_checkpoint.set(seq as i64);
420
421        self.metrics
422            .last_executed_checkpoint_timestamp_ms
423            .set(checkpoint.timestamp_ms as i64);
424        checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
425    }
426
427    /// Post processing and plumbing after we executed a checkpoint. This
428    /// function is guaranteed to be called in the order of checkpoint
429    /// sequence number.
430    #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
431    async fn process_executed_checkpoint(
432        &self,
433        epoch_store: &AuthorityPerEpochStore,
434        checkpoint: &VerifiedCheckpoint,
435        checkpoint_acc: Option<Accumulator>,
436        checkpoint_data: Option<CheckpointData>,
437        all_tx_digests: &[TransactionDigest],
438    ) {
439        // Commit all transaction effects to disk
440        let cache_commit = self.state.get_cache_commit();
441        debug!("committing checkpoint transactions to disk");
442        cache_commit
443            .commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
444            .await
445            .expect("commit_transaction_outputs cannot fail");
446
447        epoch_store
448            .handle_committed_transactions(all_tx_digests)
449            .expect("cannot fail");
450
451        if let Some(checkpoint_data) = checkpoint_data {
452            self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
453                .await;
454        }
455
456        if !checkpoint.is_last_checkpoint_of_epoch() {
457            self.accumulator
458                .accumulate_running_root(epoch_store, checkpoint.sequence_number, checkpoint_acc)
459                .await
460                .expect("Failed to accumulate running root");
461            self.bump_highest_executed_checkpoint(checkpoint);
462        }
463    }
464
465    /// If configured, commit the pending index updates for the provided
466    /// checkpoint as well as enqueuing the checkpoint to the subscription
467    /// service
468    async fn commit_index_updates_and_enqueue_to_subscription_service(
469        &self,
470        checkpoint: CheckpointData,
471    ) {
472        if let Some(rest_index) = &self.state.rest_index {
473            rest_index
474                .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
475                .expect("failed to update rest_indexes");
476        }
477    }
478
479    #[instrument(level = "debug", skip_all)]
480    fn schedule_synced_checkpoints(
481        &self,
482        pending: &mut CheckpointExecutionBuffer,
483        next_to_schedule: &mut CheckpointSequenceNumber,
484        epoch_store: Arc<AuthorityPerEpochStore>,
485        run_with_range: Option<RunWithRange>,
486    ) {
487        let Some(latest_synced_checkpoint) = self
488            .checkpoint_store
489            .get_highest_synced_checkpoint()
490            .expect("Failed to read highest synced checkpoint")
491        else {
492            debug!("No checkpoints to schedule, highest synced checkpoint is None",);
493            return;
494        };
495
496        while *next_to_schedule <= *latest_synced_checkpoint.sequence_number()
497            && pending.len() < self.config.checkpoint_execution_max_concurrency
498        {
499            let checkpoint = self
500                .checkpoint_store
501                .get_checkpoint_by_sequence_number(*next_to_schedule)
502                .unwrap()
503                .unwrap_or_else(|| {
504                    panic!(
505                        "Checkpoint sequence number {:?} does not exist in checkpoint store",
506                        *next_to_schedule
507                    )
508                });
509            if checkpoint.epoch() > epoch_store.epoch() {
510                return;
511            }
512            match run_with_range {
513                Some(RunWithRange::Checkpoint(seq)) if *next_to_schedule > seq => {
514                    debug!(
515                        "RunWithRange Checkpoint {} is set, not scheduling checkpoint {}",
516                        seq, *next_to_schedule
517                    );
518                    return;
519                }
520                _ => {
521                    self.schedule_checkpoint(checkpoint, pending, epoch_store.clone());
522                    *next_to_schedule += 1;
523                }
524            }
525        }
526    }
527
528    #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
529    fn schedule_checkpoint(
530        &self,
531        checkpoint: VerifiedCheckpoint,
532        pending: &mut CheckpointExecutionBuffer,
533        epoch_store: Arc<AuthorityPerEpochStore>,
534    ) {
535        debug!("Scheduling checkpoint for execution");
536
537        // Mismatch between node epoch and checkpoint epoch after startup
538        // crash recovery is invalid
539        let checkpoint_epoch = checkpoint.epoch();
540        assert_eq!(
541            checkpoint_epoch,
542            epoch_store.epoch(),
543            "Epoch mismatch after startup recovery. checkpoint epoch: {:?}, node epoch: {:?}",
544            checkpoint_epoch,
545            epoch_store.epoch(),
546        );
547
548        let metrics = self.metrics.clone();
549        let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
550        let data_ingestion_dir = self.config.data_ingestion_dir.clone();
551        let checkpoint_store = self.checkpoint_store.clone();
552        let object_cache_reader = self.object_cache_reader.clone();
553        let transaction_cache_reader = self.transaction_cache_reader.clone();
554        let tx_manager = self.tx_manager.clone();
555        let accumulator = self.accumulator.clone();
556        let state = self.state.clone();
557
558        epoch_store.notify_synced_checkpoint(*checkpoint.sequence_number());
559
560        pending.push_back(spawn_monitored_task!(async move {
561            let epoch_store = epoch_store.clone();
562            let (tx_digests, checkpoint_acc, checkpoint_data) = loop {
563                match execute_checkpoint(
564                    checkpoint.clone(),
565                    &state,
566                    object_cache_reader.as_ref(),
567                    transaction_cache_reader.as_ref(),
568                    checkpoint_store.clone(),
569                    epoch_store.clone(),
570                    tx_manager.clone(),
571                    accumulator.clone(),
572                    local_execution_timeout_sec,
573                    &metrics,
574                    data_ingestion_dir.clone(),
575                )
576                .await
577                {
578                    Err(err) => {
579                        error!(
580                            "Error while executing checkpoint, will retry in 1s: {:?}",
581                            err
582                        );
583                        tokio::time::sleep(Duration::from_secs(1)).await;
584                        metrics.checkpoint_exec_errors.inc();
585                    }
586                    Ok((tx_digests, checkpoint_acc, checkpoint_data)) => {
587                        break (tx_digests, checkpoint_acc, checkpoint_data);
588                    }
589                }
590            };
591            (checkpoint, checkpoint_acc, checkpoint_data, tx_digests)
592        }));
593    }
594
595    #[instrument(level = "info", skip_all)]
596    async fn execute_change_epoch_tx(
597        &self,
598        execution_digests: ExecutionDigests,
599        change_epoch_tx_digest: TransactionDigest,
600        change_epoch_tx: VerifiedExecutableTransaction,
601        epoch_store: Arc<AuthorityPerEpochStore>,
602        checkpoint: VerifiedCheckpoint,
603    ) {
604        let change_epoch_fx = self
605            .transaction_cache_reader
606            .get_effects(&execution_digests.effects)
607            .expect("Fetching effects for change_epoch tx cannot fail")
608            .expect("Change_epoch tx effects must exist");
609
610        if change_epoch_tx.contains_shared_object() {
611            epoch_store
612                .acquire_shared_version_assignments_from_effects(
613                    &change_epoch_tx,
614                    &change_epoch_fx,
615                    self.object_cache_reader.as_ref(),
616                )
617                .await
618                .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
619        }
620
621        self.tx_manager.enqueue_with_expected_effects_digest(
622            vec![(change_epoch_tx.clone(), execution_digests.effects)],
623            &epoch_store,
624        );
625        handle_execution_effects(
626            &self.state,
627            vec![execution_digests],
628            vec![change_epoch_tx_digest],
629            checkpoint.clone(),
630            self.checkpoint_store.clone(),
631            self.object_cache_reader.as_ref(),
632            self.transaction_cache_reader.as_ref(),
633            epoch_store.clone(),
634            self.tx_manager.clone(),
635            self.accumulator.clone(),
636            self.config.local_execution_timeout_sec,
637            self.config.data_ingestion_dir.clone(),
638        )
639        .await;
640    }
641
642    /// Check whether `checkpoint` is the last checkpoint of the current epoch.
643    /// If so, perform special case logic (execute change_epoch tx,
644    /// accumulate epoch, finalize transactions), then return true.
645    pub async fn check_epoch_last_checkpoint(
646        &self,
647        epoch_store: Arc<AuthorityPerEpochStore>,
648        checkpoint: &Option<VerifiedCheckpoint>,
649    ) -> bool {
650        let cur_epoch = epoch_store.epoch();
651
652        if let Some(checkpoint) = checkpoint {
653            if checkpoint.epoch() == cur_epoch {
654                if let Some((change_epoch_execution_digests, change_epoch_tx)) =
655                    extract_end_of_epoch_tx(
656                        checkpoint,
657                        self.transaction_cache_reader.as_ref(),
658                        self.checkpoint_store.clone(),
659                        epoch_store.clone(),
660                    )
661                {
662                    let change_epoch_tx_digest = change_epoch_execution_digests.transaction;
663
664                    info!(
665                        ended_epoch = cur_epoch,
666                        last_checkpoint = checkpoint.sequence_number(),
667                        "Reached end of epoch, executing change_epoch transaction",
668                    );
669
670                    self.execute_change_epoch_tx(
671                        change_epoch_execution_digests,
672                        change_epoch_tx_digest,
673                        change_epoch_tx,
674                        epoch_store.clone(),
675                        checkpoint.clone(),
676                    )
677                    .await;
678
679                    let cache_commit = self.state.get_cache_commit();
680                    cache_commit
681                        .commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
682                        .await
683                        .expect("commit_transaction_outputs cannot fail");
684                    fail_point_async!("prune-and-compact");
685
686                    // For finalizing the checkpoint, we need to pass in all checkpoint
687                    // transaction effects, not just the change_epoch tx effects. However,
688                    // we have already notify awaited all tx effects separately (once
689                    // for change_epoch tx, and once for all other txes). Therefore this
690                    // should be a fast operation
691                    let all_tx_digests: Vec<_> = self
692                        .checkpoint_store
693                        .get_checkpoint_contents(&checkpoint.content_digest)
694                        .expect("read cannot fail")
695                        .expect("Checkpoint contents should exist")
696                        .iter()
697                        .map(|digests| digests.transaction)
698                        .collect();
699
700                    let effects = self
701                        .transaction_cache_reader
702                        .notify_read_executed_effects(&all_tx_digests)
703                        .await
704                        .expect("Failed to get executed effects for finalizing checkpoint");
705
706                    let (_acc, checkpoint_data) = finalize_checkpoint(
707                        &self.state,
708                        self.object_cache_reader.as_ref(),
709                        self.transaction_cache_reader.as_ref(),
710                        self.checkpoint_store.clone(),
711                        &all_tx_digests,
712                        &epoch_store,
713                        checkpoint.clone(),
714                        self.accumulator.clone(),
715                        effects,
716                        self.config.data_ingestion_dir.clone(),
717                    )
718                    .await
719                    .expect("Finalizing checkpoint cannot fail");
720
721                    self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
722                        .await;
723
724                    self.checkpoint_store
725                        .insert_epoch_last_checkpoint(cur_epoch, checkpoint)
726                        .expect("Failed to insert epoch last checkpoint");
727
728                    self.accumulator
729                        .accumulate_running_root(&epoch_store, checkpoint.sequence_number, None)
730                        .await
731                        .expect("Failed to accumulate running root");
732                    self.accumulator
733                        .accumulate_epoch(epoch_store.clone(), *checkpoint.sequence_number())
734                        .await
735                        .expect("Accumulating epoch cannot fail");
736
737                    self.bump_highest_executed_checkpoint(checkpoint);
738
739                    return true;
740                }
741            }
742        }
743        false
744    }
745}
746
747// Logs within the function are annotated with the checkpoint sequence number
748// and epoch, from schedule_checkpoint().
749#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
750async fn execute_checkpoint(
751    checkpoint: VerifiedCheckpoint,
752    state: &AuthorityState,
753    object_cache_reader: &dyn ObjectCacheRead,
754    transaction_cache_reader: &dyn TransactionCacheRead,
755    checkpoint_store: Arc<CheckpointStore>,
756    epoch_store: Arc<AuthorityPerEpochStore>,
757    transaction_manager: Arc<TransactionManager>,
758    accumulator: Arc<StateAccumulator>,
759    local_execution_timeout_sec: u64,
760    metrics: &Arc<CheckpointExecutorMetrics>,
761    data_ingestion_dir: Option<PathBuf>,
762) -> IotaResult<(
763    Vec<TransactionDigest>,
764    Option<Accumulator>,
765    Option<CheckpointData>,
766)> {
767    debug!("Preparing checkpoint for execution",);
768    let prepare_start = Instant::now();
769
770    // this function must guarantee that all transactions in the checkpoint are
771    // executed before it returns. This invariant is enforced in two phases:
772    // - First, we filter out any already executed transactions from the checkpoint
773    //   in get_unexecuted_transactions()
774    // - Second, we execute all remaining transactions.
775
776    let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
777        get_unexecuted_transactions(
778            checkpoint.clone(),
779            transaction_cache_reader,
780            checkpoint_store.clone(),
781            epoch_store.clone(),
782        );
783
784    let tx_count = execution_digests.len();
785    debug!("Number of transactions in the checkpoint: {:?}", tx_count);
786    metrics
787        .checkpoint_transaction_count
788        .observe(tx_count as f64);
789
790    let (checkpoint_acc, checkpoint_data) = execute_transactions(
791        execution_digests,
792        all_tx_digests.clone(),
793        executable_txns,
794        state,
795        object_cache_reader,
796        transaction_cache_reader,
797        checkpoint_store.clone(),
798        epoch_store.clone(),
799        transaction_manager,
800        accumulator,
801        local_execution_timeout_sec,
802        checkpoint,
803        metrics,
804        prepare_start,
805        data_ingestion_dir,
806    )
807    .await?;
808
809    // Once execution is complete, we know that any randomness contained in this
810    // checkpoint has been successfully included in a checkpoint certified by
811    // quorum of validators. (RandomnessManager/RandomnessReporter is only
812    // present on validators.)
813    if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
814        for round in randomness_rounds {
815            debug!(
816                ?round,
817                "notifying RandomnessReporter that randomness update was executed in checkpoint"
818            );
819            randomness_reporter.notify_randomness_in_checkpoint(round)?;
820        }
821    }
822
823    Ok((all_tx_digests, checkpoint_acc, checkpoint_data))
824}
825
826#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
827async fn handle_execution_effects(
828    state: &AuthorityState,
829    execution_digests: Vec<ExecutionDigests>,
830    all_tx_digests: Vec<TransactionDigest>,
831    checkpoint: VerifiedCheckpoint,
832    checkpoint_store: Arc<CheckpointStore>,
833    object_cache_reader: &dyn ObjectCacheRead,
834    transaction_cache_reader: &dyn TransactionCacheRead,
835    epoch_store: Arc<AuthorityPerEpochStore>,
836    transaction_manager: Arc<TransactionManager>,
837    accumulator: Arc<StateAccumulator>,
838    local_execution_timeout_sec: u64,
839    data_ingestion_dir: Option<PathBuf>,
840) -> (Option<Accumulator>, Option<CheckpointData>) {
841    // Once synced_txns have been awaited, all txns should have effects committed.
842    let mut periods = 1;
843    let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
844    // Whether the checkpoint is next to execute and blocking additional executions.
845    let mut blocking_execution = false;
846    loop {
847        let effects_future = transaction_cache_reader.notify_read_executed_effects(&all_tx_digests);
848
849        match timeout(log_timeout_sec, effects_future).await {
850            Err(_elapsed) => {
851                // Reading this value every timeout should be ok.
852                let highest_seq = checkpoint_store
853                    .get_highest_executed_checkpoint_seq_number()
854                    .unwrap()
855                    .unwrap_or_default();
856                if checkpoint.sequence_number <= highest_seq {
857                    error!(
858                        "Re-executing checkpoint {} after higher checkpoint {} has executed!",
859                        checkpoint.sequence_number, highest_seq
860                    );
861                    continue;
862                }
863                if checkpoint.sequence_number > highest_seq + 1 {
864                    trace!(
865                        "Checkpoint {} is still executing. Highest executed = {}",
866                        checkpoint.sequence_number, highest_seq
867                    );
868                    continue;
869                }
870                if !blocking_execution {
871                    trace!(
872                        "Checkpoint {} is next to execute.",
873                        checkpoint.sequence_number
874                    );
875                    blocking_execution = true;
876                    continue;
877                }
878
879                // Only log details when the checkpoint is next to execute, but has not finished
880                // execution within log_timeout_sec.
881                let missing_digests: Vec<TransactionDigest> = transaction_cache_reader
882                    .multi_get_executed_effects_digests(&all_tx_digests)
883                    .expect("multi_get_executed_effects cannot fail")
884                    .iter()
885                    .zip(all_tx_digests.clone())
886                    .filter_map(
887                        |(fx, digest)| {
888                            if fx.is_none() { Some(digest) } else { None }
889                        },
890                    )
891                    .collect();
892
893                if missing_digests.is_empty() {
894                    // All effects just become available.
895                    continue;
896                }
897
898                warn!(
899                    "Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
900                    missing_digests,
901                    log_timeout_sec * periods,
902                );
903
904                // Print out more information for the 1st pending transaction, which should have
905                // all of its input available.
906                let pending_digest = missing_digests.first().unwrap();
907                if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
908                    warn!(
909                        "Transaction {pending_digest:?} has missing input objects {missing_input:?}",
910                    );
911                }
912                periods += 1;
913            }
914            Ok(Err(err)) => panic!("Failed to notify_read_executed_effects: {err:?}"),
915            Ok(Ok(effects)) => {
916                for (tx_digest, expected_digest, actual_effects) in
917                    izip!(&all_tx_digests, &execution_digests, &effects)
918                {
919                    let expected_effects_digest = &expected_digest.effects;
920                    assert_not_forked(
921                        &checkpoint,
922                        tx_digest,
923                        expected_effects_digest,
924                        &actual_effects.digest(),
925                        transaction_cache_reader,
926                    );
927                }
928
929                // if end of epoch checkpoint, we must finalize the checkpoint after executing
930                // the change epoch tx, which is done after all other checkpoint execution
931                if checkpoint.end_of_epoch_data.is_none() {
932                    let (checkpoint_acc, checkpoint_data) = finalize_checkpoint(
933                        state,
934                        object_cache_reader,
935                        transaction_cache_reader,
936                        checkpoint_store.clone(),
937                        &all_tx_digests,
938                        &epoch_store,
939                        checkpoint.clone(),
940                        accumulator.clone(),
941                        effects,
942                        data_ingestion_dir,
943                    )
944                    .await
945                    .expect("Finalizing checkpoint cannot fail");
946                    return (Some(checkpoint_acc), Some(checkpoint_data));
947                } else {
948                    return (None, None);
949                }
950            }
951        }
952    }
953}
954
955fn assert_not_forked(
956    checkpoint: &VerifiedCheckpoint,
957    tx_digest: &TransactionDigest,
958    expected_digest: &TransactionEffectsDigest,
959    actual_effects_digest: &TransactionEffectsDigest,
960    cache_reader: &dyn TransactionCacheRead,
961) {
962    if *expected_digest != *actual_effects_digest {
963        let actual_effects = cache_reader
964            .get_executed_effects(tx_digest)
965            .expect("get_executed_effects cannot fail")
966            .expect("actual effects should exist");
967
968        // log observed effects (too big for panic message) and then panic.
969        error!(
970            ?checkpoint,
971            ?tx_digest,
972            ?expected_digest,
973            ?actual_effects,
974            "fork detected!"
975        );
976        panic!(
977            "When executing checkpoint {}, transaction {} \
978            is expected to have effects digest {}, but got {}!",
979            checkpoint.sequence_number(),
980            tx_digest,
981            expected_digest,
982            actual_effects_digest,
983        );
984    }
985}
986
987// Given a checkpoint, find the end of epoch transaction, if it exists
988fn extract_end_of_epoch_tx(
989    checkpoint: &VerifiedCheckpoint,
990    cache_reader: &dyn TransactionCacheRead,
991    checkpoint_store: Arc<CheckpointStore>,
992    epoch_store: Arc<AuthorityPerEpochStore>,
993) -> Option<(ExecutionDigests, VerifiedExecutableTransaction)> {
994    checkpoint.end_of_epoch_data.as_ref()?;
995
996    // Last checkpoint must have the end of epoch transaction as the last
997    // transaction.
998
999    let checkpoint_sequence = checkpoint.sequence_number();
1000    let execution_digests = checkpoint_store
1001        .get_checkpoint_contents(&checkpoint.content_digest)
1002        .expect("Failed to get checkpoint contents from store")
1003        .unwrap_or_else(|| {
1004            panic!(
1005                "Checkpoint contents for digest {:?} does not exist",
1006                checkpoint.content_digest
1007            )
1008        })
1009        .into_inner();
1010
1011    let digests = execution_digests
1012        .last()
1013        .expect("Final checkpoint must have at least one transaction");
1014
1015    let change_epoch_tx = cache_reader
1016        .get_transaction_block(&digests.transaction)
1017        .expect("read cannot fail");
1018
1019    let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
1020        (*change_epoch_tx.unwrap_or_else(||
1021            panic!(
1022                "state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {checkpoint:?}"
1023            )
1024        )).clone(),
1025        epoch_store.epoch(),
1026        *checkpoint_sequence,
1027    );
1028
1029    assert!(
1030        change_epoch_tx
1031            .data()
1032            .intent_message()
1033            .value
1034            .is_end_of_epoch_tx()
1035    );
1036
1037    Some((*digests, change_epoch_tx))
1038}
1039
1040// Given a checkpoint, filter out any already executed transactions, then return
1041// the remaining execution digests, transaction digests, transactions to be
1042// executed, and randomness rounds (if any) included in the checkpoint.
1043#[expect(clippy::type_complexity)]
1044fn get_unexecuted_transactions(
1045    checkpoint: VerifiedCheckpoint,
1046    cache_reader: &dyn TransactionCacheRead,
1047    checkpoint_store: Arc<CheckpointStore>,
1048    epoch_store: Arc<AuthorityPerEpochStore>,
1049) -> (
1050    Vec<ExecutionDigests>,
1051    Vec<TransactionDigest>,
1052    Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1053    Vec<RandomnessRound>,
1054) {
1055    let checkpoint_sequence = checkpoint.sequence_number();
1056    let full_contents = checkpoint_store
1057        .get_full_checkpoint_contents_by_sequence_number(*checkpoint_sequence)
1058        .expect("Failed to get checkpoint contents from store")
1059        .tap_some(|_| {
1060            debug!("loaded full checkpoint contents in bulk for sequence {checkpoint_sequence}")
1061        });
1062
1063    let mut execution_digests = checkpoint_store
1064        .get_checkpoint_contents(&checkpoint.content_digest)
1065        .expect("Failed to get checkpoint contents from store")
1066        .unwrap_or_else(|| {
1067            panic!(
1068                "Checkpoint contents for digest {:?} does not exist",
1069                checkpoint.content_digest
1070            )
1071        })
1072        .into_inner();
1073
1074    let full_contents_txns = full_contents.map(|c| {
1075        c.into_iter()
1076            .zip(execution_digests.iter())
1077            .map(|(txn, digests)| (digests.transaction, txn))
1078            .collect::<HashMap<_, _>>()
1079    });
1080
1081    // Remove the change epoch transaction so that we can special case its
1082    // execution.
1083    checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
1084        let digests = execution_digests
1085            .pop()
1086            .expect("Final checkpoint must have at least one transaction");
1087
1088        let change_epoch_tx = cache_reader
1089            .get_transaction_block(&digests.transaction)
1090            .expect("read cannot fail")
1091            .unwrap_or_else(||
1092                panic!(
1093                    "state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {}",
1094                    checkpoint.sequence_number()
1095                )
1096            );
1097        assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
1098    });
1099
1100    let randomness_rounds = if let Some(version_specific_data) = checkpoint
1101        .version_specific_data(epoch_store.protocol_config())
1102        .expect("unable to get version_specific_data")
1103    {
1104        // With version-specific data, randomness rounds are stored in checkpoint
1105        // summary.
1106        version_specific_data.into_v1().randomness_rounds
1107    } else {
1108        // Before version-specific data, checkpoint batching must be disabled. In this
1109        // case, randomness state update tx must be first if it exists, because
1110        // all other transactions in a checkpoint that includes a randomness
1111        // state update are causally dependent on it.
1112        assert_eq!(
1113            0,
1114            epoch_store
1115                .protocol_config()
1116                .min_checkpoint_interval_ms_as_option()
1117                .unwrap_or_default(),
1118        );
1119        if let Some(first_digest) = execution_digests.first() {
1120            let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
1121            .expect("read cannot fail")
1122            .unwrap_or_else(||
1123                panic!(
1124                    "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1125                    checkpoint.sequence_number()
1126                )
1127            );
1128            if let TransactionKind::RandomnessStateUpdate(rsu) =
1129                maybe_randomness_tx.data().transaction_data().kind()
1130            {
1131                vec![rsu.randomness_round]
1132            } else {
1133                Vec::new()
1134            }
1135        } else {
1136            Vec::new()
1137        }
1138    };
1139
1140    let all_tx_digests: Vec<TransactionDigest> =
1141        execution_digests.iter().map(|tx| tx.transaction).collect();
1142
1143    let executed_effects_digests = cache_reader
1144        .multi_get_executed_effects_digests(&all_tx_digests)
1145        .expect("failed to read executed_effects from store");
1146
1147    let (unexecuted_txns, expected_effects_digests): (Vec<_>, Vec<_>) =
1148        izip!(execution_digests.iter(), executed_effects_digests.iter())
1149            .filter_map(|(digests, effects_digest)| match effects_digest {
1150                None => Some((digests.transaction, digests.effects)),
1151                Some(actual_effects_digest) => {
1152                    let tx_digest = &digests.transaction;
1153                    let effects_digest = &digests.effects;
1154                    trace!(
1155                        "Transaction with digest {:?} has already been executed",
1156                        tx_digest
1157                    );
1158                    assert_not_forked(
1159                        &checkpoint,
1160                        tx_digest,
1161                        effects_digest,
1162                        actual_effects_digest,
1163                        cache_reader,
1164                    );
1165                    None
1166                }
1167            })
1168            .unzip();
1169
1170    // read remaining unexecuted transactions from store
1171    let executable_txns: Vec<_> = if let Some(full_contents_txns) = full_contents_txns {
1172        unexecuted_txns
1173            .into_iter()
1174            .zip(expected_effects_digests)
1175            .map(|(tx_digest, expected_effects_digest)| {
1176                let tx = &full_contents_txns.get(&tx_digest).unwrap().transaction;
1177                (
1178                    VerifiedExecutableTransaction::new_from_checkpoint(
1179                        VerifiedTransaction::new_unchecked(tx.clone()),
1180                        epoch_store.epoch(),
1181                        *checkpoint_sequence,
1182                    ),
1183                    expected_effects_digest,
1184                )
1185            })
1186            .collect()
1187    } else {
1188        cache_reader
1189            .multi_get_transaction_blocks(&unexecuted_txns)
1190            .expect("Failed to get checkpoint txes from store")
1191            .into_iter()
1192            .zip(expected_effects_digests)
1193            .enumerate()
1194            .map(|(i, (tx, expected_effects_digest))| {
1195                let tx = tx.unwrap_or_else(||
1196                    panic!(
1197                        "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
1198                        unexecuted_txns[i]
1199                    )
1200                );
1201                // change epoch tx is handled specially in check_epoch_last_checkpoint
1202                assert!(!tx.data().intent_message().value.is_end_of_epoch_tx());
1203                (
1204                    VerifiedExecutableTransaction::new_from_checkpoint(
1205                        Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()),
1206                        epoch_store.epoch(),
1207                        *checkpoint_sequence,
1208                    ),
1209                    expected_effects_digest
1210                )
1211            })
1212            .collect()
1213    };
1214
1215    (
1216        execution_digests,
1217        all_tx_digests,
1218        executable_txns,
1219        randomness_rounds,
1220    )
1221}
1222
1223// Logs within the function are annotated with the checkpoint sequence number
1224// and epoch, from schedule_checkpoint().
1225#[instrument(level = "debug", skip_all)]
1226async fn execute_transactions(
1227    execution_digests: Vec<ExecutionDigests>,
1228    all_tx_digests: Vec<TransactionDigest>,
1229    executable_txns: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1230    state: &AuthorityState,
1231    object_cache_reader: &dyn ObjectCacheRead,
1232    transaction_cache_reader: &dyn TransactionCacheRead,
1233    checkpoint_store: Arc<CheckpointStore>,
1234    epoch_store: Arc<AuthorityPerEpochStore>,
1235    transaction_manager: Arc<TransactionManager>,
1236    state_accumulator: Arc<StateAccumulator>,
1237    local_execution_timeout_sec: u64,
1238    checkpoint: VerifiedCheckpoint,
1239    metrics: &Arc<CheckpointExecutorMetrics>,
1240    prepare_start: Instant,
1241    data_ingestion_dir: Option<PathBuf>,
1242) -> IotaResult<(Option<Accumulator>, Option<CheckpointData>)> {
1243    let effects_digests: HashMap<_, _> = execution_digests
1244        .iter()
1245        .map(|digest| (digest.transaction, digest.effects))
1246        .collect();
1247
1248    let shared_effects_digests = executable_txns
1249        .iter()
1250        .filter(|(tx, _)| tx.contains_shared_object())
1251        .map(|(tx, _)| {
1252            *effects_digests
1253                .get(tx.digest())
1254                .expect("Transaction digest not found in effects_digests")
1255        })
1256        .collect::<Vec<_>>();
1257
1258    let digest_to_effects: HashMap<TransactionDigest, TransactionEffects> =
1259        transaction_cache_reader
1260            .multi_get_effects(&shared_effects_digests)?
1261            .into_iter()
1262            .zip(shared_effects_digests)
1263            .map(|(fx, fx_digest)| {
1264                if fx.is_none() {
1265                    panic!(
1266                        "Transaction effects for effects digest {fx_digest:?} do not exist in effects table"
1267                    );
1268                }
1269                let fx = fx.unwrap();
1270                (*fx.transaction_digest(), fx)
1271            })
1272            .collect();
1273
1274    for (tx, _) in &executable_txns {
1275        if tx.contains_shared_object() {
1276            epoch_store
1277                .acquire_shared_version_assignments_from_effects(
1278                    tx,
1279                    digest_to_effects.get(tx.digest()).unwrap(),
1280                    object_cache_reader,
1281                )
1282                .await?;
1283        }
1284    }
1285
1286    let prepare_elapsed = prepare_start.elapsed();
1287    metrics
1288        .checkpoint_prepare_latency
1289        .observe(prepare_elapsed.as_secs_f64());
1290    if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1291        info!(
1292            "Checkpoint preparation for execution took {:?}",
1293            prepare_elapsed
1294        );
1295    }
1296
1297    let exec_start = Instant::now();
1298    transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);
1299
1300    let (checkpoint_acc, checkpoint_data) = handle_execution_effects(
1301        state,
1302        execution_digests,
1303        all_tx_digests,
1304        checkpoint.clone(),
1305        checkpoint_store,
1306        object_cache_reader,
1307        transaction_cache_reader,
1308        epoch_store,
1309        transaction_manager,
1310        state_accumulator,
1311        local_execution_timeout_sec,
1312        data_ingestion_dir,
1313    )
1314    .await;
1315
1316    let exec_elapsed = exec_start.elapsed();
1317    metrics
1318        .checkpoint_exec_latency
1319        .observe(exec_elapsed.as_secs_f64());
1320    if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1321        info!("Checkpoint execution took {:?}", exec_elapsed);
1322    }
1323
1324    Ok((checkpoint_acc, checkpoint_data))
1325}
1326
1327#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
1328async fn finalize_checkpoint(
1329    state: &AuthorityState,
1330    object_cache_reader: &dyn ObjectCacheRead,
1331    transaction_cache_reader: &dyn TransactionCacheRead,
1332    checkpoint_store: Arc<CheckpointStore>,
1333    tx_digests: &[TransactionDigest],
1334    epoch_store: &Arc<AuthorityPerEpochStore>,
1335    checkpoint: VerifiedCheckpoint,
1336    accumulator: Arc<StateAccumulator>,
1337    effects: Vec<TransactionEffects>,
1338    data_ingestion_dir: Option<PathBuf>,
1339) -> IotaResult<(Accumulator, CheckpointData)> {
1340    debug!("finalizing checkpoint");
1341    epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;
1342
1343    // TODO remove once we no longer need to support this table for read RPC
1344    state
1345        .get_checkpoint_cache()
1346        .insert_finalized_transactions_perpetual_checkpoints(
1347            tx_digests,
1348            epoch_store.epoch(),
1349            checkpoint.sequence_number,
1350        )?;
1351
1352    let checkpoint_acc =
1353        accumulator.accumulate_checkpoint(effects, checkpoint.sequence_number, epoch_store)?;
1354
1355    let checkpoint_data = load_checkpoint_data(
1356        checkpoint,
1357        object_cache_reader,
1358        transaction_cache_reader,
1359        checkpoint_store,
1360        tx_digests,
1361    )?;
1362
1363    if state.rest_index.is_some() || data_ingestion_dir.is_some() {
1364        // Index the checkpoint. this is done out of order and is not written and
1365        // committed to the DB until later (committing must be done in-order)
1366        if let Some(rest_index) = &state.rest_index {
1367            let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
1368                PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
1369            ));
1370
1371            rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
1372        }
1373
1374        if let Some(path) = data_ingestion_dir {
1375            store_checkpoint_locally(path, &checkpoint_data)?;
1376        }
1377    }
1378
1379    Ok((checkpoint_acc, checkpoint_data))
1380}