iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{
25    collections::HashMap,
26    path::PathBuf,
27    sync::Arc,
28    time::{Duration, Instant},
29};
30
31use either::Either;
32use futures::stream::FuturesOrdered;
33use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
34use iota_macros::{fail_point, fail_point_async};
35use iota_metrics::spawn_monitored_task;
36use iota_types::{
37    accumulator::Accumulator,
38    base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
39    crypto::RandomnessRound,
40    effects::{TransactionEffects, TransactionEffectsAPI},
41    error::IotaResult,
42    executable_transaction::VerifiedExecutableTransaction,
43    inner_temporary_store::PackageStoreWithFallback,
44    message_envelope::Message,
45    messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
46    transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
47};
48use itertools::izip;
49use tap::{TapFallible, TapOptional};
50use tokio::{
51    sync::broadcast::{self, error::RecvError},
52    task::JoinHandle,
53    time::timeout,
54};
55use tokio_stream::StreamExt;
56use tracing::{debug, error, info, instrument, trace, warn};
57
58use self::metrics::CheckpointExecutorMetrics;
59use crate::{
60    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
61    checkpoints::{
62        CheckpointStore,
63        checkpoint_executor::data_ingestion_handler::{
64            load_checkpoint_data, store_checkpoint_locally,
65        },
66    },
67    execution_cache::{ObjectCacheRead, TransactionCacheRead},
68    state_accumulator::StateAccumulator,
69    transaction_manager::TransactionManager,
70};
71
72mod data_ingestion_handler;
73pub mod metrics;
74
75#[cfg(test)]
76pub(crate) mod tests;
77
78type CheckpointExecutionBuffer = FuturesOrdered<
79    JoinHandle<(
80        VerifiedCheckpoint,
81        Option<Accumulator>,
82        Vec<TransactionDigest>,
83    )>,
84>;
85
86/// The interval to log checkpoint progress, in # of checkpoints processed.
87const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
88
89#[derive(Debug, Clone, Copy)]
90pub struct CheckpointTimeoutConfig {
91    pub panic_timeout: Option<Duration>,
92    pub warning_timeout: Duration,
93}
94
95// We use a thread local so that the config can be overridden on a per-test
96// basis. This means that get_scheduling_timeout() can be called multiple times
97// in a multithreaded context, but the function is still very cheap to call so
98// this is okay.
99thread_local! {
100    static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
101        const { once_cell::sync::OnceCell::new() };
102}
103
104#[cfg(msim)]
105pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
106    SCHEDULING_TIMEOUT.with(|s| {
107        s.set(config).expect("SchedulingTimeoutConfig already set");
108    });
109}
110
111fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
112    fn inner() -> CheckpointTimeoutConfig {
113        let panic_timeout: Option<Duration> = if cfg!(msim) {
114            Some(Duration::from_secs(45))
115        } else {
116            std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
117                .ok()
118                .and_then(|s| s.parse::<u64>().ok())
119                .map(Duration::from_millis)
120        };
121
122        let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
123            .ok()
124            .and_then(|s| s.parse::<u64>().ok())
125            .map(Duration::from_millis)
126            .unwrap_or(Duration::from_secs(5));
127
128        CheckpointTimeoutConfig {
129            panic_timeout,
130            warning_timeout,
131        }
132    }
133
134    SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
135}
136
137#[derive(PartialEq, Eq, Debug)]
138pub enum StopReason {
139    EpochComplete,
140    RunWithRangeCondition,
141}
142
143pub struct CheckpointExecutor {
144    mailbox: broadcast::Receiver<VerifiedCheckpoint>,
145    state: Arc<AuthorityState>,
146    checkpoint_store: Arc<CheckpointStore>,
147    object_cache_reader: Arc<dyn ObjectCacheRead>,
148    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
149    tx_manager: Arc<TransactionManager>,
150    accumulator: Arc<StateAccumulator>,
151    config: CheckpointExecutorConfig,
152    metrics: Arc<CheckpointExecutorMetrics>,
153}
154
155impl CheckpointExecutor {
156    pub fn new(
157        mailbox: broadcast::Receiver<VerifiedCheckpoint>,
158        checkpoint_store: Arc<CheckpointStore>,
159        state: Arc<AuthorityState>,
160        accumulator: Arc<StateAccumulator>,
161        config: CheckpointExecutorConfig,
162        metrics: Arc<CheckpointExecutorMetrics>,
163    ) -> Self {
164        Self {
165            mailbox,
166            state: state.clone(),
167            checkpoint_store,
168            object_cache_reader: state.get_object_cache_reader().clone(),
169            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
170            tx_manager: state.transaction_manager().clone(),
171            accumulator,
172            config,
173            metrics,
174        }
175    }
176
177    pub fn new_for_tests(
178        mailbox: broadcast::Receiver<VerifiedCheckpoint>,
179        checkpoint_store: Arc<CheckpointStore>,
180        state: Arc<AuthorityState>,
181        accumulator: Arc<StateAccumulator>,
182    ) -> Self {
183        Self::new(
184            mailbox,
185            checkpoint_store,
186            state,
187            accumulator,
188            Default::default(),
189            CheckpointExecutorMetrics::new_for_tests(),
190        )
191    }
192
193    /// Ensure that all checkpoints in the current epoch will be executed.
194    /// We don't technically need &mut on self, but passing it to make sure only
195    /// one instance is running at one time.
196    pub async fn run_epoch(
197        &mut self,
198        epoch_store: Arc<AuthorityPerEpochStore>,
199        run_with_range: Option<RunWithRange>,
200    ) -> StopReason {
201        // check if we want to run this epoch based on RunWithRange condition value
202        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
203        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
204        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(epoch_store.epoch())) {
205            info!(
206                "RunWithRange condition satisfied at {:?}, run_epoch={:?}",
207                run_with_range,
208                epoch_store.epoch()
209            );
210            return StopReason::RunWithRangeCondition;
211        };
212
213        debug!(
214            "Checkpoint executor running for epoch {}",
215            epoch_store.epoch(),
216        );
217        self.metrics
218            .checkpoint_exec_epoch
219            .set(epoch_store.epoch() as i64);
220
221        // Decide the first checkpoint to schedule for execution.
222        // If we haven't executed anything in the past, we schedule checkpoint 0.
223        // Otherwise we schedule the one after highest executed.
224        let mut highest_executed = self
225            .checkpoint_store
226            .get_highest_executed_checkpoint()
227            .unwrap();
228
229        if let Some(highest_executed) = &highest_executed {
230            if epoch_store.epoch() == highest_executed.epoch()
231                && highest_executed.is_last_checkpoint_of_epoch()
232            {
233                // We can arrive at this point if we bump the highest_executed_checkpoint
234                // watermark, and then crash before completing reconfiguration.
235                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
236                return StopReason::EpochComplete;
237            }
238        }
239
240        let mut next_to_schedule = highest_executed
241            .as_ref()
242            .map(|c| c.sequence_number() + 1)
243            .unwrap_or_else(|| {
244                // TODO this invariant may no longer hold once we introduce snapshots
245                assert_eq!(epoch_store.epoch(), 0);
246                0
247            });
248        let mut pending: CheckpointExecutionBuffer = FuturesOrdered::new();
249
250        let mut now_time = Instant::now();
251        let mut now_transaction_num = highest_executed
252            .as_ref()
253            .map(|c| c.network_total_transactions)
254            .unwrap_or(0);
255        let scheduling_timeout_config = get_scheduling_timeout();
256
257        loop {
258            let schedule_scope = iota_metrics::monitored_scope("ScheduleCheckpointExecution");
259
260            // If we have executed the last checkpoint of the current epoch, stop.
261            // Note: when we arrive here with highest_executed == the final checkpoint of
262            // the epoch, we are in an edge case where highest_executed does not
263            // actually correspond to the watermark. The watermark is only
264            // bumped past the epoch final checkpoint after execution of the change
265            // epoch tx, and state accumulation.
266            if self
267                .check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
268                .await
269            {
270                self.checkpoint_store
271                    .prune_local_summaries()
272                    .tap_err(|e| error!("Failed to prune local summaries: {}", e))
273                    .ok();
274
275                // be extra careful to ensure we don't have orphans
276                assert!(
277                    pending.is_empty(),
278                    "Pending checkpoint execution buffer should be empty after processing last checkpoint of epoch",
279                );
280                fail_point_async!("crash");
281                debug!(epoch = epoch_store.epoch(), "finished epoch");
282                return StopReason::EpochComplete;
283            }
284
285            self.schedule_synced_checkpoints(
286                &mut pending,
287                // next_to_schedule will be updated to the next checkpoint to schedule.
288                // This makes sure we don't re-schedule the same checkpoint multiple times.
289                &mut next_to_schedule,
290                epoch_store.clone(),
291                run_with_range,
292            );
293
294            self.metrics
295                .checkpoint_exec_inflight
296                .set(pending.len() as i64);
297
298            let panic_timeout = scheduling_timeout_config.panic_timeout;
299            let warning_timeout = scheduling_timeout_config.warning_timeout;
300
301            drop(schedule_scope);
302            tokio::select! {
303                // Check for completed workers and ratchet the highest_checkpoint_executed
304                // watermark accordingly. Note that given that checkpoints are guaranteed to
305                // be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered
306                // guarantees that we will also ratchet the watermarks in order.
307                Some(Ok((checkpoint, checkpoint_acc, tx_digests))) = pending.next() => {
308                    let _process_scope = iota_metrics::monitored_scope("ProcessExecutedCheckpoint");
309
310                    self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, &tx_digests).await;
311                    highest_executed = Some(checkpoint.clone());
312
313                    // Estimate TPS every 10k transactions or 30 sec
314                    let elapsed = now_time.elapsed().as_millis();
315                    let current_transaction_num =  highest_executed.as_ref().map(|c| c.network_total_transactions).unwrap_or(0);
316                    if current_transaction_num - now_transaction_num > 10_000 || elapsed > 30_000 {
317                        let tps = (1000.0 * (current_transaction_num - now_transaction_num) as f64 / elapsed as f64) as i32;
318                        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
319                        now_time = Instant::now();
320                        now_transaction_num = current_transaction_num;
321                    }
322                     // we want to be inclusive of checkpoints in RunWithRange::Checkpoint type
323                    if run_with_range.is_some_and(|rwr| rwr.matches_checkpoint(checkpoint.sequence_number)) {
324                        info!(
325                            "RunWithRange condition satisfied after checkpoint sequence number {:?}",
326                            checkpoint.sequence_number
327                        );
328                        return StopReason::RunWithRangeCondition;
329                    }
330                }
331
332                received = self.mailbox.recv() => match received {
333                    Ok(checkpoint) => {
334                        debug!(
335                            sequence_number = ?checkpoint.sequence_number,
336                            "Received checkpoint summary from state sync"
337                        );
338                        checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
339                    },
340                    Err(RecvError::Lagged(num_skipped)) => {
341                        debug!(
342                            "Checkpoint Execution Recv channel overflowed with {:?} messages",
343                            num_skipped,
344                        );
345                    }
346                    Err(RecvError::Closed) => {
347                        panic!("Checkpoint Execution Sender (StateSync) closed channel unexpectedly");
348                    },
349                },
350
351                _ = tokio::time::sleep(warning_timeout) => {
352                    warn!(
353                        "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
354                    );
355                }
356
357                _ = panic_timeout
358                            .map(|d| Either::Left(tokio::time::sleep(d)))
359                            .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
360                    panic!("No new synced checkpoints received for {panic_timeout:?} on node {:?}", self.state.name);
361                },
362            }
363        }
364    }
365
366    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
367        // Ensure that we are not skipping checkpoints at any point
368        let seq = *checkpoint.sequence_number();
369        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
370        if let Some(prev_highest) = self
371            .checkpoint_store
372            .get_highest_executed_checkpoint_seq_number()
373            .unwrap()
374        {
375            assert_eq!(prev_highest + 1, seq);
376        } else {
377            assert_eq!(seq, 0);
378        }
379        if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
380            info!("Finished syncing and executing checkpoint {}", seq);
381        }
382
383        fail_point!("highest-executed-checkpoint");
384
385        // We store a fixed number of additional FullCheckpointContents after execution
386        // is complete for use in state sync.
387        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
388        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
389            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
390            if let Some(prune_checkpoint) = self
391                .checkpoint_store
392                .get_checkpoint_by_sequence_number(prune_seq)
393                .expect("Failed to fetch checkpoint")
394            {
395                self.checkpoint_store
396                    .delete_full_checkpoint_contents(prune_seq)
397                    .expect("Failed to delete full checkpoint contents");
398                self.checkpoint_store
399                    .delete_contents_digest_sequence_number_mapping(
400                        &prune_checkpoint.content_digest,
401                    )
402                    .expect("Failed to delete contents digest -> sequence number mapping");
403            } else {
404                // If this is directly after a snapshot restore with skiplisting,
405                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
406                // checkpoints.
407                debug!(
408                    "Failed to fetch checkpoint with sequence number {:?}",
409                    prune_seq
410                );
411            }
412        }
413
414        self.checkpoint_store
415            .update_highest_executed_checkpoint(checkpoint)
416            .unwrap();
417        self.metrics.last_executed_checkpoint.set(seq as i64);
418
419        self.metrics
420            .last_executed_checkpoint_timestamp_ms
421            .set(checkpoint.timestamp_ms as i64);
422        checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
423    }
424
425    /// Post processing and plumbing after we executed a checkpoint. This
426    /// function is guaranteed to be called in the order of checkpoint
427    /// sequence number.
428    #[instrument(level = "debug", skip_all)]
429    async fn process_executed_checkpoint(
430        &self,
431        epoch_store: &AuthorityPerEpochStore,
432        checkpoint: &VerifiedCheckpoint,
433        checkpoint_acc: Option<Accumulator>,
434        all_tx_digests: &[TransactionDigest],
435    ) {
436        // Commit all transaction effects to disk
437        let cache_commit = self.state.get_cache_commit();
438        debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
439        cache_commit
440            .commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
441            .await
442            .expect("commit_transaction_outputs cannot fail");
443
444        epoch_store
445            .handle_committed_transactions(all_tx_digests)
446            .expect("cannot fail");
447
448        if !checkpoint.is_last_checkpoint_of_epoch() {
449            self.accumulator
450                .accumulate_running_root(epoch_store, checkpoint.sequence_number, checkpoint_acc)
451                .await
452                .expect("Failed to accumulate running root");
453            self.bump_highest_executed_checkpoint(checkpoint);
454        }
455    }
456
457    #[instrument(level = "debug", skip_all)]
458    fn schedule_synced_checkpoints(
459        &self,
460        pending: &mut CheckpointExecutionBuffer,
461        next_to_schedule: &mut CheckpointSequenceNumber,
462        epoch_store: Arc<AuthorityPerEpochStore>,
463        run_with_range: Option<RunWithRange>,
464    ) {
465        let Some(latest_synced_checkpoint) = self
466            .checkpoint_store
467            .get_highest_synced_checkpoint()
468            .expect("Failed to read highest synced checkpoint")
469        else {
470            debug!("No checkpoints to schedule, highest synced checkpoint is None",);
471            return;
472        };
473
474        while *next_to_schedule <= *latest_synced_checkpoint.sequence_number()
475            && pending.len() < self.config.checkpoint_execution_max_concurrency
476        {
477            let checkpoint = self
478                .checkpoint_store
479                .get_checkpoint_by_sequence_number(*next_to_schedule)
480                .unwrap()
481                .unwrap_or_else(|| {
482                    panic!(
483                        "Checkpoint sequence number {:?} does not exist in checkpoint store",
484                        *next_to_schedule
485                    )
486                });
487            if checkpoint.epoch() > epoch_store.epoch() {
488                return;
489            }
490            match run_with_range {
491                Some(RunWithRange::Checkpoint(seq)) if *next_to_schedule > seq => {
492                    debug!(
493                        "RunWithRange Checkpoint {} is set, not scheduling checkpoint {}",
494                        seq, *next_to_schedule
495                    );
496                    return;
497                }
498                _ => {
499                    self.schedule_checkpoint(checkpoint, pending, epoch_store.clone());
500                    *next_to_schedule += 1;
501                }
502            }
503        }
504    }
505
506    #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
507    fn schedule_checkpoint(
508        &self,
509        checkpoint: VerifiedCheckpoint,
510        pending: &mut CheckpointExecutionBuffer,
511        epoch_store: Arc<AuthorityPerEpochStore>,
512    ) {
513        debug!("Scheduling checkpoint for execution");
514
515        // Mismatch between node epoch and checkpoint epoch after startup
516        // crash recovery is invalid
517        let checkpoint_epoch = checkpoint.epoch();
518        assert_eq!(
519            checkpoint_epoch,
520            epoch_store.epoch(),
521            "Epoch mismatch after startup recovery. checkpoint epoch: {:?}, node epoch: {:?}",
522            checkpoint_epoch,
523            epoch_store.epoch(),
524        );
525
526        let metrics = self.metrics.clone();
527        let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
528        let data_ingestion_dir = self.config.data_ingestion_dir.clone();
529        let checkpoint_store = self.checkpoint_store.clone();
530        let object_cache_reader = self.object_cache_reader.clone();
531        let transaction_cache_reader = self.transaction_cache_reader.clone();
532        let tx_manager = self.tx_manager.clone();
533        let accumulator = self.accumulator.clone();
534        let state = self.state.clone();
535
536        epoch_store.notify_synced_checkpoint(*checkpoint.sequence_number());
537
538        pending.push_back(spawn_monitored_task!(async move {
539            let epoch_store = epoch_store.clone();
540            let (tx_digests, checkpoint_acc) = loop {
541                match execute_checkpoint(
542                    checkpoint.clone(),
543                    &state,
544                    object_cache_reader.as_ref(),
545                    transaction_cache_reader.as_ref(),
546                    checkpoint_store.clone(),
547                    epoch_store.clone(),
548                    tx_manager.clone(),
549                    accumulator.clone(),
550                    local_execution_timeout_sec,
551                    &metrics,
552                    data_ingestion_dir.clone(),
553                )
554                .await
555                {
556                    Err(err) => {
557                        error!(
558                            "Error while executing checkpoint, will retry in 1s: {:?}",
559                            err
560                        );
561                        tokio::time::sleep(Duration::from_secs(1)).await;
562                        metrics.checkpoint_exec_errors.inc();
563                    }
564                    Ok((tx_digests, checkpoint_acc)) => break (tx_digests, checkpoint_acc),
565                }
566            };
567            (checkpoint, checkpoint_acc, tx_digests)
568        }));
569    }
570
571    #[instrument(level = "info", skip_all)]
572    async fn execute_change_epoch_tx(
573        &self,
574        execution_digests: ExecutionDigests,
575        change_epoch_tx_digest: TransactionDigest,
576        change_epoch_tx: VerifiedExecutableTransaction,
577        epoch_store: Arc<AuthorityPerEpochStore>,
578        checkpoint: VerifiedCheckpoint,
579    ) {
580        let change_epoch_fx = self
581            .transaction_cache_reader
582            .get_effects(&execution_digests.effects)
583            .expect("Fetching effects for change_epoch tx cannot fail")
584            .expect("Change_epoch tx effects must exist");
585
586        if change_epoch_tx.contains_shared_object() {
587            epoch_store
588                .acquire_shared_version_assignments_from_effects(
589                    &change_epoch_tx,
590                    &change_epoch_fx,
591                    self.object_cache_reader.as_ref(),
592                )
593                .await
594                .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
595        }
596
597        self.tx_manager.enqueue_with_expected_effects_digest(
598            vec![(change_epoch_tx.clone(), execution_digests.effects)],
599            &epoch_store,
600        );
601        handle_execution_effects(
602            &self.state,
603            vec![execution_digests],
604            vec![change_epoch_tx_digest],
605            checkpoint.clone(),
606            self.checkpoint_store.clone(),
607            self.object_cache_reader.as_ref(),
608            self.transaction_cache_reader.as_ref(),
609            epoch_store.clone(),
610            self.tx_manager.clone(),
611            self.accumulator.clone(),
612            self.config.local_execution_timeout_sec,
613            self.config.data_ingestion_dir.clone(),
614        )
615        .await;
616    }
617
618    /// Check whether `checkpoint` is the last checkpoint of the current epoch.
619    /// If so, perform special case logic (execute change_epoch tx,
620    /// accumulate epoch, finalize transactions), then return true.
621    pub async fn check_epoch_last_checkpoint(
622        &self,
623        epoch_store: Arc<AuthorityPerEpochStore>,
624        checkpoint: &Option<VerifiedCheckpoint>,
625    ) -> bool {
626        let cur_epoch = epoch_store.epoch();
627
628        if let Some(checkpoint) = checkpoint {
629            if checkpoint.epoch() == cur_epoch {
630                if let Some((change_epoch_execution_digests, change_epoch_tx)) =
631                    extract_end_of_epoch_tx(
632                        checkpoint,
633                        self.transaction_cache_reader.as_ref(),
634                        self.checkpoint_store.clone(),
635                        epoch_store.clone(),
636                    )
637                {
638                    let change_epoch_tx_digest = change_epoch_execution_digests.transaction;
639
640                    info!(
641                        ended_epoch = cur_epoch,
642                        last_checkpoint = checkpoint.sequence_number(),
643                        "Reached end of epoch, executing change_epoch transaction",
644                    );
645
646                    self.execute_change_epoch_tx(
647                        change_epoch_execution_digests,
648                        change_epoch_tx_digest,
649                        change_epoch_tx,
650                        epoch_store.clone(),
651                        checkpoint.clone(),
652                    )
653                    .await;
654
655                    let cache_commit = self.state.get_cache_commit();
656                    cache_commit
657                        .commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
658                        .await
659                        .expect("commit_transaction_outputs cannot fail");
660                    fail_point_async!("prune-and-compact");
661
662                    // For finalizing the checkpoint, we need to pass in all checkpoint
663                    // transaction effects, not just the change_epoch tx effects. However,
664                    // we have already notify awaited all tx effects separately (once
665                    // for change_epoch tx, and once for all other txes). Therefore this
666                    // should be a fast operation
667                    let all_tx_digests: Vec<_> = self
668                        .checkpoint_store
669                        .get_checkpoint_contents(&checkpoint.content_digest)
670                        .expect("read cannot fail")
671                        .expect("Checkpoint contents should exist")
672                        .iter()
673                        .map(|digests| digests.transaction)
674                        .collect();
675
676                    let effects = self
677                        .transaction_cache_reader
678                        .notify_read_executed_effects(&all_tx_digests)
679                        .await
680                        .expect("Failed to get executed effects for finalizing checkpoint");
681
682                    finalize_checkpoint(
683                        &self.state,
684                        self.object_cache_reader.as_ref(),
685                        self.transaction_cache_reader.as_ref(),
686                        self.checkpoint_store.clone(),
687                        &all_tx_digests,
688                        &epoch_store,
689                        checkpoint.clone(),
690                        self.accumulator.clone(),
691                        effects,
692                        self.config.data_ingestion_dir.clone(),
693                    )
694                    .await
695                    .expect("Finalizing checkpoint cannot fail");
696
697                    self.checkpoint_store
698                        .insert_epoch_last_checkpoint(cur_epoch, checkpoint)
699                        .expect("Failed to insert epoch last checkpoint");
700
701                    self.accumulator
702                        .accumulate_running_root(&epoch_store, checkpoint.sequence_number, None)
703                        .await
704                        .expect("Failed to accumulate running root");
705                    self.accumulator
706                        .accumulate_epoch(epoch_store.clone(), *checkpoint.sequence_number())
707                        .await
708                        .expect("Accumulating epoch cannot fail");
709
710                    self.bump_highest_executed_checkpoint(checkpoint);
711
712                    return true;
713                }
714            }
715        }
716        false
717    }
718}
719
720// Logs within the function are annotated with the checkpoint sequence number
721// and epoch, from schedule_checkpoint().
722#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
723async fn execute_checkpoint(
724    checkpoint: VerifiedCheckpoint,
725    state: &AuthorityState,
726    object_cache_reader: &dyn ObjectCacheRead,
727    transaction_cache_reader: &dyn TransactionCacheRead,
728    checkpoint_store: Arc<CheckpointStore>,
729    epoch_store: Arc<AuthorityPerEpochStore>,
730    transaction_manager: Arc<TransactionManager>,
731    accumulator: Arc<StateAccumulator>,
732    local_execution_timeout_sec: u64,
733    metrics: &Arc<CheckpointExecutorMetrics>,
734    data_ingestion_dir: Option<PathBuf>,
735) -> IotaResult<(Vec<TransactionDigest>, Option<Accumulator>)> {
736    debug!("Preparing checkpoint for execution",);
737    let prepare_start = Instant::now();
738
739    // this function must guarantee that all transactions in the checkpoint are
740    // executed before it returns. This invariant is enforced in two phases:
741    // - First, we filter out any already executed transactions from the checkpoint
742    //   in get_unexecuted_transactions()
743    // - Second, we execute all remaining transactions.
744
745    let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
746        get_unexecuted_transactions(
747            checkpoint.clone(),
748            transaction_cache_reader,
749            checkpoint_store.clone(),
750            epoch_store.clone(),
751        );
752
753    let tx_count = execution_digests.len();
754    debug!("Number of transactions in the checkpoint: {:?}", tx_count);
755    metrics
756        .checkpoint_transaction_count
757        .observe(tx_count as f64);
758
759    let checkpoint_acc = execute_transactions(
760        execution_digests,
761        all_tx_digests.clone(),
762        executable_txns,
763        state,
764        object_cache_reader,
765        transaction_cache_reader,
766        checkpoint_store.clone(),
767        epoch_store.clone(),
768        transaction_manager,
769        accumulator,
770        local_execution_timeout_sec,
771        checkpoint,
772        metrics,
773        prepare_start,
774        data_ingestion_dir,
775    )
776    .await?;
777
778    // Once execution is complete, we know that any randomness contained in this
779    // checkpoint has been successfully included in a checkpoint certified by
780    // quorum of validators. (RandomnessManager/RandomnessReporter is only
781    // present on validators.)
782    if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
783        for round in randomness_rounds {
784            debug!(
785                ?round,
786                "notifying RandomnessReporter that randomness update was executed in checkpoint"
787            );
788            randomness_reporter.notify_randomness_in_checkpoint(round)?;
789        }
790    }
791
792    Ok((all_tx_digests, checkpoint_acc))
793}
794
795#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
796async fn handle_execution_effects(
797    state: &AuthorityState,
798    execution_digests: Vec<ExecutionDigests>,
799    all_tx_digests: Vec<TransactionDigest>,
800    checkpoint: VerifiedCheckpoint,
801    checkpoint_store: Arc<CheckpointStore>,
802    object_cache_reader: &dyn ObjectCacheRead,
803    transaction_cache_reader: &dyn TransactionCacheRead,
804    epoch_store: Arc<AuthorityPerEpochStore>,
805    transaction_manager: Arc<TransactionManager>,
806    accumulator: Arc<StateAccumulator>,
807    local_execution_timeout_sec: u64,
808    data_ingestion_dir: Option<PathBuf>,
809) -> Option<Accumulator> {
810    // Once synced_txns have been awaited, all txns should have effects committed.
811    let mut periods = 1;
812    let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
813    // Whether the checkpoint is next to execute and blocking additional executions.
814    let mut blocking_execution = false;
815    loop {
816        let effects_future = transaction_cache_reader.notify_read_executed_effects(&all_tx_digests);
817
818        match timeout(log_timeout_sec, effects_future).await {
819            Err(_elapsed) => {
820                // Reading this value every timeout should be ok.
821                let highest_seq = checkpoint_store
822                    .get_highest_executed_checkpoint_seq_number()
823                    .unwrap()
824                    .unwrap_or_default();
825                if checkpoint.sequence_number <= highest_seq {
826                    error!(
827                        "Re-executing checkpoint {} after higher checkpoint {} has executed!",
828                        checkpoint.sequence_number, highest_seq
829                    );
830                    continue;
831                }
832                if checkpoint.sequence_number > highest_seq + 1 {
833                    trace!(
834                        "Checkpoint {} is still executing. Highest executed = {}",
835                        checkpoint.sequence_number, highest_seq
836                    );
837                    continue;
838                }
839                if !blocking_execution {
840                    trace!(
841                        "Checkpoint {} is next to execute.",
842                        checkpoint.sequence_number
843                    );
844                    blocking_execution = true;
845                    continue;
846                }
847
848                // Only log details when the checkpoint is next to execute, but has not finished
849                // execution within log_timeout_sec.
850                let missing_digests: Vec<TransactionDigest> = transaction_cache_reader
851                    .multi_get_executed_effects_digests(&all_tx_digests)
852                    .expect("multi_get_executed_effects cannot fail")
853                    .iter()
854                    .zip(all_tx_digests.clone())
855                    .filter_map(
856                        |(fx, digest)| {
857                            if fx.is_none() { Some(digest) } else { None }
858                        },
859                    )
860                    .collect();
861
862                if missing_digests.is_empty() {
863                    // All effects just become available.
864                    continue;
865                }
866
867                warn!(
868                    "Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
869                    missing_digests,
870                    log_timeout_sec * periods,
871                );
872
873                // Print out more information for the 1st pending transaction, which should have
874                // all of its input available.
875                let pending_digest = missing_digests.first().unwrap();
876                if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
877                    warn!(
878                        "Transaction {pending_digest:?} has missing input objects {missing_input:?}",
879                    );
880                }
881                periods += 1;
882            }
883            Ok(Err(err)) => panic!("Failed to notify_read_executed_effects: {:?}", err),
884            Ok(Ok(effects)) => {
885                for (tx_digest, expected_digest, actual_effects) in
886                    izip!(&all_tx_digests, &execution_digests, &effects)
887                {
888                    let expected_effects_digest = &expected_digest.effects;
889                    assert_not_forked(
890                        &checkpoint,
891                        tx_digest,
892                        expected_effects_digest,
893                        &actual_effects.digest(),
894                        transaction_cache_reader,
895                    );
896                }
897
898                // if end of epoch checkpoint, we must finalize the checkpoint after executing
899                // the change epoch tx, which is done after all other checkpoint execution
900                if checkpoint.end_of_epoch_data.is_none() {
901                    return Some(
902                        finalize_checkpoint(
903                            state,
904                            object_cache_reader,
905                            transaction_cache_reader,
906                            checkpoint_store.clone(),
907                            &all_tx_digests,
908                            &epoch_store,
909                            checkpoint.clone(),
910                            accumulator.clone(),
911                            effects,
912                            data_ingestion_dir,
913                        )
914                        .await
915                        .expect("Finalizing checkpoint cannot fail"),
916                    );
917                } else {
918                    return None;
919                }
920            }
921        }
922    }
923}
924
925fn assert_not_forked(
926    checkpoint: &VerifiedCheckpoint,
927    tx_digest: &TransactionDigest,
928    expected_digest: &TransactionEffectsDigest,
929    actual_effects_digest: &TransactionEffectsDigest,
930    cache_reader: &dyn TransactionCacheRead,
931) {
932    if *expected_digest != *actual_effects_digest {
933        let actual_effects = cache_reader
934            .get_executed_effects(tx_digest)
935            .expect("get_executed_effects cannot fail")
936            .expect("actual effects should exist");
937
938        // log observed effects (too big for panic message) and then panic.
939        error!(
940            ?checkpoint,
941            ?tx_digest,
942            ?expected_digest,
943            ?actual_effects,
944            "fork detected!"
945        );
946        panic!(
947            "When executing checkpoint {}, transaction {} \
948            is expected to have effects digest {}, but got {}!",
949            checkpoint.sequence_number(),
950            tx_digest,
951            expected_digest,
952            actual_effects_digest,
953        );
954    }
955}
956
957// Given a checkpoint, find the end of epoch transaction, if it exists
958fn extract_end_of_epoch_tx(
959    checkpoint: &VerifiedCheckpoint,
960    cache_reader: &dyn TransactionCacheRead,
961    checkpoint_store: Arc<CheckpointStore>,
962    epoch_store: Arc<AuthorityPerEpochStore>,
963) -> Option<(ExecutionDigests, VerifiedExecutableTransaction)> {
964    checkpoint.end_of_epoch_data.as_ref()?;
965
966    // Last checkpoint must have the end of epoch transaction as the last
967    // transaction.
968
969    let checkpoint_sequence = checkpoint.sequence_number();
970    let execution_digests = checkpoint_store
971        .get_checkpoint_contents(&checkpoint.content_digest)
972        .expect("Failed to get checkpoint contents from store")
973        .unwrap_or_else(|| {
974            panic!(
975                "Checkpoint contents for digest {:?} does not exist",
976                checkpoint.content_digest
977            )
978        })
979        .into_inner();
980
981    let digests = execution_digests
982        .last()
983        .expect("Final checkpoint must have at least one transaction");
984
985    let change_epoch_tx = cache_reader
986        .get_transaction_block(&digests.transaction)
987        .expect("read cannot fail");
988
989    let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
990        (*change_epoch_tx.unwrap_or_else(||
991            panic!(
992                "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
993                digests.transaction,
994            )
995        )).clone(),
996        epoch_store.epoch(),
997        *checkpoint_sequence,
998    );
999
1000    assert!(
1001        change_epoch_tx
1002            .data()
1003            .intent_message()
1004            .value
1005            .is_end_of_epoch_tx()
1006    );
1007
1008    Some((*digests, change_epoch_tx))
1009}
1010
1011// Given a checkpoint, filter out any already executed transactions, then return
1012// the remaining execution digests, transaction digests, transactions to be
1013// executed, and randomness rounds (if any) included in the checkpoint.
1014#[expect(clippy::type_complexity)]
1015fn get_unexecuted_transactions(
1016    checkpoint: VerifiedCheckpoint,
1017    cache_reader: &dyn TransactionCacheRead,
1018    checkpoint_store: Arc<CheckpointStore>,
1019    epoch_store: Arc<AuthorityPerEpochStore>,
1020) -> (
1021    Vec<ExecutionDigests>,
1022    Vec<TransactionDigest>,
1023    Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1024    Vec<RandomnessRound>,
1025) {
1026    let checkpoint_sequence = checkpoint.sequence_number();
1027    let full_contents = checkpoint_store
1028        .get_full_checkpoint_contents_by_sequence_number(*checkpoint_sequence)
1029        .expect("Failed to get checkpoint contents from store")
1030        .tap_some(|_| {
1031            debug!("loaded full checkpoint contents in bulk for sequence {checkpoint_sequence}")
1032        });
1033
1034    let mut execution_digests = checkpoint_store
1035        .get_checkpoint_contents(&checkpoint.content_digest)
1036        .expect("Failed to get checkpoint contents from store")
1037        .unwrap_or_else(|| {
1038            panic!(
1039                "Checkpoint contents for digest {:?} does not exist",
1040                checkpoint.content_digest
1041            )
1042        })
1043        .into_inner();
1044
1045    let full_contents_txns = full_contents.map(|c| {
1046        c.into_iter()
1047            .zip(execution_digests.iter())
1048            .map(|(txn, digests)| (digests.transaction, txn))
1049            .collect::<HashMap<_, _>>()
1050    });
1051
1052    // Remove the change epoch transaction so that we can special case its
1053    // execution.
1054    checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
1055        let change_epoch_tx_digest = execution_digests
1056            .pop()
1057            .expect("Final checkpoint must have at least one transaction")
1058            .transaction;
1059
1060        let change_epoch_tx = cache_reader
1061            .get_transaction_block(&change_epoch_tx_digest)
1062            .expect("read cannot fail")
1063            .unwrap_or_else(||
1064                panic!(
1065                    "state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}",
1066                    checkpoint.sequence_number()
1067                )
1068            );
1069        assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
1070    });
1071
1072    let randomness_rounds = if let Some(version_specific_data) = checkpoint
1073        .version_specific_data(epoch_store.protocol_config())
1074        .expect("unable to get version_specific_data")
1075    {
1076        // With version-specific data, randomness rounds are stored in checkpoint
1077        // summary.
1078        version_specific_data.into_v1().randomness_rounds
1079    } else {
1080        // Before version-specific data, checkpoint batching must be disabled. In this
1081        // case, randomness state update tx must be first if it exists, because
1082        // all other transactions in a checkpoint that includes a randomness
1083        // state update are causally dependent on it.
1084        assert_eq!(
1085            0,
1086            epoch_store
1087                .protocol_config()
1088                .min_checkpoint_interval_ms_as_option()
1089                .unwrap_or_default(),
1090        );
1091        if let Some(first_digest) = execution_digests.first() {
1092            let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
1093            .expect("read cannot fail")
1094            .unwrap_or_else(||
1095                panic!(
1096                    "state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
1097                    checkpoint.sequence_number()
1098                )
1099            );
1100            if let TransactionKind::RandomnessStateUpdate(rsu) =
1101                maybe_randomness_tx.data().transaction_data().kind()
1102            {
1103                vec![rsu.randomness_round]
1104            } else {
1105                Vec::new()
1106            }
1107        } else {
1108            Vec::new()
1109        }
1110    };
1111
1112    let all_tx_digests: Vec<TransactionDigest> =
1113        execution_digests.iter().map(|tx| tx.transaction).collect();
1114
1115    let executed_effects_digests = cache_reader
1116        .multi_get_executed_effects_digests(&all_tx_digests)
1117        .expect("failed to read executed_effects from store");
1118
1119    let (unexecuted_txns, expected_effects_digests): (Vec<_>, Vec<_>) =
1120        izip!(execution_digests.iter(), executed_effects_digests.iter())
1121            .filter_map(|(digests, effects_digest)| match effects_digest {
1122                None => Some((digests.transaction, digests.effects)),
1123                Some(actual_effects_digest) => {
1124                    let tx_digest = &digests.transaction;
1125                    let effects_digest = &digests.effects;
1126                    trace!(
1127                        "Transaction with digest {:?} has already been executed",
1128                        tx_digest
1129                    );
1130                    assert_not_forked(
1131                        &checkpoint,
1132                        tx_digest,
1133                        effects_digest,
1134                        actual_effects_digest,
1135                        cache_reader,
1136                    );
1137                    None
1138                }
1139            })
1140            .unzip();
1141
1142    // read remaining unexecuted transactions from store
1143    let executable_txns: Vec<_> = if let Some(full_contents_txns) = full_contents_txns {
1144        unexecuted_txns
1145            .into_iter()
1146            .zip(expected_effects_digests)
1147            .map(|(tx_digest, expected_effects_digest)| {
1148                let tx = &full_contents_txns.get(&tx_digest).unwrap().transaction;
1149                (
1150                    VerifiedExecutableTransaction::new_from_checkpoint(
1151                        VerifiedTransaction::new_unchecked(tx.clone()),
1152                        epoch_store.epoch(),
1153                        *checkpoint_sequence,
1154                    ),
1155                    expected_effects_digest,
1156                )
1157            })
1158            .collect()
1159    } else {
1160        cache_reader
1161            .multi_get_transaction_blocks(&unexecuted_txns)
1162            .expect("Failed to get checkpoint txes from store")
1163            .into_iter()
1164            .zip(expected_effects_digests)
1165            .enumerate()
1166            .map(|(i, (tx, expected_effects_digest))| {
1167                let tx = tx.unwrap_or_else(||
1168                    panic!(
1169                        "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
1170                        unexecuted_txns[i]
1171                    )
1172                );
1173                // change epoch tx is handled specially in check_epoch_last_checkpoint
1174                assert!(!tx.data().intent_message().value.is_end_of_epoch_tx());
1175                (
1176                    VerifiedExecutableTransaction::new_from_checkpoint(
1177                        Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()),
1178                        epoch_store.epoch(),
1179                        *checkpoint_sequence,
1180                    ),
1181                    expected_effects_digest
1182                )
1183            })
1184            .collect()
1185    };
1186
1187    (
1188        execution_digests,
1189        all_tx_digests,
1190        executable_txns,
1191        randomness_rounds,
1192    )
1193}
1194
1195// Logs within the function are annotated with the checkpoint sequence number
1196// and epoch, from schedule_checkpoint().
1197#[instrument(level = "debug", skip_all)]
1198async fn execute_transactions(
1199    execution_digests: Vec<ExecutionDigests>,
1200    all_tx_digests: Vec<TransactionDigest>,
1201    executable_txns: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1202    state: &AuthorityState,
1203    object_cache_reader: &dyn ObjectCacheRead,
1204    transaction_cache_reader: &dyn TransactionCacheRead,
1205    checkpoint_store: Arc<CheckpointStore>,
1206    epoch_store: Arc<AuthorityPerEpochStore>,
1207    transaction_manager: Arc<TransactionManager>,
1208    state_accumulator: Arc<StateAccumulator>,
1209    local_execution_timeout_sec: u64,
1210    checkpoint: VerifiedCheckpoint,
1211    metrics: &Arc<CheckpointExecutorMetrics>,
1212    prepare_start: Instant,
1213    data_ingestion_dir: Option<PathBuf>,
1214) -> IotaResult<Option<Accumulator>> {
1215    let effects_digests: HashMap<_, _> = execution_digests
1216        .iter()
1217        .map(|digest| (digest.transaction, digest.effects))
1218        .collect();
1219
1220    let shared_effects_digests = executable_txns
1221        .iter()
1222        .filter(|(tx, _)| tx.contains_shared_object())
1223        .map(|(tx, _)| {
1224            *effects_digests
1225                .get(tx.digest())
1226                .expect("Transaction digest not found in effects_digests")
1227        })
1228        .collect::<Vec<_>>();
1229
1230    let digest_to_effects: HashMap<TransactionDigest, TransactionEffects> =
1231        transaction_cache_reader
1232            .multi_get_effects(&shared_effects_digests)?
1233            .into_iter()
1234            .zip(shared_effects_digests)
1235            .map(|(fx, fx_digest)| {
1236                if fx.is_none() {
1237                    panic!(
1238                        "Transaction effects for effects digest {:?} do not exist in effects table",
1239                        fx_digest
1240                    );
1241                }
1242                let fx = fx.unwrap();
1243                (*fx.transaction_digest(), fx)
1244            })
1245            .collect();
1246
1247    for (tx, _) in &executable_txns {
1248        if tx.contains_shared_object() {
1249            epoch_store
1250                .acquire_shared_version_assignments_from_effects(
1251                    tx,
1252                    digest_to_effects.get(tx.digest()).unwrap(),
1253                    object_cache_reader,
1254                )
1255                .await?;
1256        }
1257    }
1258
1259    let prepare_elapsed = prepare_start.elapsed();
1260    metrics
1261        .checkpoint_prepare_latency
1262        .observe(prepare_elapsed.as_secs_f64());
1263    if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1264        info!(
1265            "Checkpoint preparation for execution took {:?}",
1266            prepare_elapsed
1267        );
1268    }
1269
1270    let exec_start = Instant::now();
1271    transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);
1272
1273    let checkpoint_acc = handle_execution_effects(
1274        state,
1275        execution_digests,
1276        all_tx_digests,
1277        checkpoint.clone(),
1278        checkpoint_store,
1279        object_cache_reader,
1280        transaction_cache_reader,
1281        epoch_store,
1282        transaction_manager,
1283        state_accumulator,
1284        local_execution_timeout_sec,
1285        data_ingestion_dir,
1286    )
1287    .await;
1288
1289    let exec_elapsed = exec_start.elapsed();
1290    metrics
1291        .checkpoint_exec_latency
1292        .observe(exec_elapsed.as_secs_f64());
1293    if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1294        info!("Checkpoint execution took {:?}", exec_elapsed);
1295    }
1296
1297    Ok(checkpoint_acc)
1298}
1299
1300#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
1301async fn finalize_checkpoint(
1302    state: &AuthorityState,
1303    object_cache_reader: &dyn ObjectCacheRead,
1304    transaction_cache_reader: &dyn TransactionCacheRead,
1305    checkpoint_store: Arc<CheckpointStore>,
1306    tx_digests: &[TransactionDigest],
1307    epoch_store: &Arc<AuthorityPerEpochStore>,
1308    checkpoint: VerifiedCheckpoint,
1309    accumulator: Arc<StateAccumulator>,
1310    effects: Vec<TransactionEffects>,
1311    data_ingestion_dir: Option<PathBuf>,
1312) -> IotaResult<Accumulator> {
1313    debug!("finalizing checkpoint");
1314    epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;
1315
1316    // TODO remove once we no longer need to support this table for read RPC
1317    state
1318        .get_checkpoint_cache()
1319        .insert_finalized_transactions_perpetual_checkpoints(
1320            tx_digests,
1321            epoch_store.epoch(),
1322            checkpoint.sequence_number,
1323        )?;
1324
1325    let checkpoint_acc =
1326        accumulator.accumulate_checkpoint(effects, checkpoint.sequence_number, epoch_store)?;
1327
1328    if data_ingestion_dir.is_some() || state.rest_index.is_some() {
1329        let checkpoint_data = load_checkpoint_data(
1330            checkpoint,
1331            object_cache_reader,
1332            transaction_cache_reader,
1333            checkpoint_store,
1334            tx_digests,
1335        )?;
1336
1337        // TODO(bmwill) discuss with team a better location for this indexing so that it
1338        // isn't on the critical path and the writes to the DB are done in
1339        // checkpoint order
1340        if let Some(rest_index) = &state.rest_index {
1341            let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
1342                PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
1343            ));
1344
1345            rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
1346        }
1347
1348        if let Some(path) = data_ingestion_dir {
1349            store_checkpoint_locally(path, &checkpoint_data)?;
1350        }
1351    }
1352    Ok(checkpoint_acc)
1353}