iota_core/checkpoints/checkpoint_executor/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5//! CheckpointExecutor is a Node component that executes all checkpoints for the
6//! given epoch. It acts as a Consumer to StateSync
7//! for newly synced checkpoints, taking these checkpoints and
8//! scheduling and monitoring their execution. Its primary goal is to allow
9//! for catching up to the current checkpoint sequence number of the network
10//! as quickly as possible so that a newly joined, or recovering Node can
11//! participate in a timely manner. To that end, CheckpointExecutor attempts
12//! to saturate the CPU with executor tasks (one per checkpoint), each of which
13//! handle scheduling and awaiting checkpoint transaction execution.
14//!
15//! CheckpointExecutor is made recoverable in the event of Node shutdown by way
16//! of a watermark, highest_executed_checkpoint, which is guaranteed to be
17//! updated sequentially in order, despite checkpoints themselves potentially
18//! being executed nonsequentially and in parallel. CheckpointExecutor
19//! parallelizes checkpoints of the same epoch as much as possible.
20//! CheckpointExecutor enforces the invariant that if `run` returns
21//! successfully, we have reached the end of epoch. This allows us to use it as
22//! a signal for reconfig.
23
24use std::{
25    collections::HashMap,
26    path::PathBuf,
27    sync::Arc,
28    time::{Duration, Instant},
29};
30
31use either::Either;
32use futures::stream::FuturesOrdered;
33use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
34use iota_macros::{fail_point, fail_point_async};
35use iota_metrics::spawn_monitored_task;
36use iota_types::{
37    accumulator::Accumulator,
38    base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
39    crypto::RandomnessRound,
40    effects::{TransactionEffects, TransactionEffectsAPI},
41    error::IotaResult,
42    executable_transaction::VerifiedExecutableTransaction,
43    inner_temporary_store::PackageStoreWithFallback,
44    message_envelope::Message,
45    messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
46    transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
47};
48use itertools::izip;
49use tap::{TapFallible, TapOptional};
50use tokio::{
51    sync::broadcast::{self, error::RecvError},
52    task::JoinHandle,
53    time::timeout,
54};
55use tokio_stream::StreamExt;
56use tracing::{debug, error, info, instrument, trace, warn};
57
58use self::metrics::CheckpointExecutorMetrics;
59use crate::{
60    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
61    checkpoints::{
62        CheckpointStore,
63        checkpoint_executor::data_ingestion_handler::{
64            load_checkpoint_data, store_checkpoint_locally,
65        },
66    },
67    execution_cache::{ObjectCacheRead, TransactionCacheRead},
68    state_accumulator::StateAccumulator,
69    transaction_manager::TransactionManager,
70};
71
72mod data_ingestion_handler;
73pub mod metrics;
74
75#[cfg(test)]
76pub(crate) mod tests;
77
78type CheckpointExecutionBuffer = FuturesOrdered<
79    JoinHandle<(
80        VerifiedCheckpoint,
81        Option<Accumulator>,
82        Vec<TransactionDigest>,
83    )>,
84>;
85
86/// The interval to log checkpoint progress, in # of checkpoints processed.
87const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
88
89#[derive(Debug, Clone, Copy)]
90pub struct CheckpointTimeoutConfig {
91    pub panic_timeout: Option<Duration>,
92    pub warning_timeout: Duration,
93}
94
95// We use a thread local so that the config can be overridden on a per-test
96// basis. This means that get_scheduling_timeout() can be called multiple times
97// in a multithreaded context, but the function is still very cheap to call so
98// this is okay.
99thread_local! {
100    static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
101        const { once_cell::sync::OnceCell::new() };
102}
103
104#[cfg(msim)]
105pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
106    SCHEDULING_TIMEOUT.with(|s| {
107        s.set(config).expect("SchedulingTimeoutConfig already set");
108    });
109}
110
111fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
112    fn inner() -> CheckpointTimeoutConfig {
113        let panic_timeout: Option<Duration> = if cfg!(msim) {
114            Some(Duration::from_secs(45))
115        } else {
116            std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
117                .ok()
118                .and_then(|s| s.parse::<u64>().ok())
119                .map(Duration::from_millis)
120        };
121
122        let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
123            .ok()
124            .and_then(|s| s.parse::<u64>().ok())
125            .map(Duration::from_millis)
126            .unwrap_or(Duration::from_secs(5));
127
128        CheckpointTimeoutConfig {
129            panic_timeout,
130            warning_timeout,
131        }
132    }
133
134    SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
135}
136
137#[derive(PartialEq, Eq, Debug)]
138pub enum StopReason {
139    EpochComplete,
140    RunWithRangeCondition,
141}
142
143pub struct CheckpointExecutor {
144    mailbox: broadcast::Receiver<VerifiedCheckpoint>,
145    state: Arc<AuthorityState>,
146    checkpoint_store: Arc<CheckpointStore>,
147    object_cache_reader: Arc<dyn ObjectCacheRead>,
148    transaction_cache_reader: Arc<dyn TransactionCacheRead>,
149    tx_manager: Arc<TransactionManager>,
150    accumulator: Arc<StateAccumulator>,
151    config: CheckpointExecutorConfig,
152    metrics: Arc<CheckpointExecutorMetrics>,
153}
154
155impl CheckpointExecutor {
156    pub fn new(
157        mailbox: broadcast::Receiver<VerifiedCheckpoint>,
158        checkpoint_store: Arc<CheckpointStore>,
159        state: Arc<AuthorityState>,
160        accumulator: Arc<StateAccumulator>,
161        config: CheckpointExecutorConfig,
162        metrics: Arc<CheckpointExecutorMetrics>,
163    ) -> Self {
164        Self {
165            mailbox,
166            state: state.clone(),
167            checkpoint_store,
168            object_cache_reader: state.get_object_cache_reader().clone(),
169            transaction_cache_reader: state.get_transaction_cache_reader().clone(),
170            tx_manager: state.transaction_manager().clone(),
171            accumulator,
172            config,
173            metrics,
174        }
175    }
176
177    pub fn new_for_tests(
178        mailbox: broadcast::Receiver<VerifiedCheckpoint>,
179        checkpoint_store: Arc<CheckpointStore>,
180        state: Arc<AuthorityState>,
181        accumulator: Arc<StateAccumulator>,
182    ) -> Self {
183        Self::new(
184            mailbox,
185            checkpoint_store,
186            state,
187            accumulator,
188            Default::default(),
189            CheckpointExecutorMetrics::new_for_tests(),
190        )
191    }
192
193    /// Ensure that all checkpoints in the current epoch will be executed.
194    /// We don't technically need &mut on self, but passing it to make sure only
195    /// one instance is running at one time.
196    pub async fn run_epoch(
197        &mut self,
198        epoch_store: Arc<AuthorityPerEpochStore>,
199        run_with_range: Option<RunWithRange>,
200    ) -> StopReason {
201        // check if we want to run this epoch based on RunWithRange condition value
202        // we want to be inclusive of the defined RunWithRangeEpoch::Epoch
203        // i.e Epoch(N) means we will execute epoch N and stop when reaching N+1
204        if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(epoch_store.epoch())) {
205            info!(
206                "RunWithRange condition satisfied at {:?}, run_epoch={:?}",
207                run_with_range,
208                epoch_store.epoch()
209            );
210            return StopReason::RunWithRangeCondition;
211        };
212
213        debug!(
214            "Checkpoint executor running for epoch {}",
215            epoch_store.epoch(),
216        );
217        self.metrics
218            .checkpoint_exec_epoch
219            .set(epoch_store.epoch() as i64);
220
221        // Decide the first checkpoint to schedule for execution.
222        // If we haven't executed anything in the past, we schedule checkpoint 0.
223        // Otherwise we schedule the one after highest executed.
224        let mut highest_executed = self
225            .checkpoint_store
226            .get_highest_executed_checkpoint()
227            .unwrap();
228
229        if let Some(highest_executed) = &highest_executed {
230            if epoch_store.epoch() == highest_executed.epoch()
231                && highest_executed.is_last_checkpoint_of_epoch()
232            {
233                // We can arrive at this point if we bump the highest_executed_checkpoint
234                // watermark, and then crash before completing reconfiguration.
235                info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
236                return StopReason::EpochComplete;
237            }
238        }
239
240        let mut next_to_schedule = highest_executed
241            .as_ref()
242            .map(|c| c.sequence_number() + 1)
243            .unwrap_or_else(|| {
244                // TODO this invariant may no longer hold once we introduce snapshots
245                assert_eq!(epoch_store.epoch(), 0);
246                0
247            });
248        let mut pending: CheckpointExecutionBuffer = FuturesOrdered::new();
249
250        let mut now_time = Instant::now();
251        let mut now_transaction_num = highest_executed
252            .as_ref()
253            .map(|c| c.network_total_transactions)
254            .unwrap_or(0);
255        let scheduling_timeout_config = get_scheduling_timeout();
256
257        loop {
258            // If we have executed the last checkpoint of the current epoch, stop.
259            // Note: when we arrive here with highest_executed == the final checkpoint of
260            // the epoch, we are in an edge case where highest_executed does not
261            // actually correspond to the watermark. The watermark is only
262            // bumped past the epoch final checkpoint after execution of the change
263            // epoch tx, and state accumulation.
264            if self
265                .check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
266                .await
267            {
268                self.checkpoint_store
269                    .prune_local_summaries()
270                    .tap_err(|e| error!("Failed to prune local summaries: {}", e))
271                    .ok();
272
273                // be extra careful to ensure we don't have orphans
274                assert!(
275                    pending.is_empty(),
276                    "Pending checkpoint execution buffer should be empty after processing last checkpoint of epoch",
277                );
278                fail_point_async!("crash");
279                debug!(epoch = epoch_store.epoch(), "finished epoch");
280                return StopReason::EpochComplete;
281            }
282
283            self.schedule_synced_checkpoints(
284                &mut pending,
285                // next_to_schedule will be updated to the next checkpoint to schedule.
286                // This makes sure we don't re-schedule the same checkpoint multiple times.
287                &mut next_to_schedule,
288                epoch_store.clone(),
289                run_with_range,
290            );
291
292            self.metrics
293                .checkpoint_exec_inflight
294                .set(pending.len() as i64);
295
296            let panic_timeout = scheduling_timeout_config.panic_timeout;
297            let warning_timeout = scheduling_timeout_config.warning_timeout;
298
299            tokio::select! {
300                // Check for completed workers and ratchet the highest_checkpoint_executed
301                // watermark accordingly. Note that given that checkpoints are guaranteed to
302                // be processed (added to FuturesOrdered) in seq_number order, using FuturesOrdered
303                // guarantees that we will also ratchet the watermarks in order.
304                Some(Ok((checkpoint, checkpoint_acc, tx_digests))) = pending.next() => {
305                    self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, &tx_digests).await;
306                    highest_executed = Some(checkpoint.clone());
307
308                    // Estimate TPS every 10k transactions or 30 sec
309                    let elapsed = now_time.elapsed().as_millis();
310                    let current_transaction_num =  highest_executed.as_ref().map(|c| c.network_total_transactions).unwrap_or(0);
311                    if current_transaction_num - now_transaction_num > 10_000 || elapsed > 30_000 {
312                        let tps = (1000.0 * (current_transaction_num - now_transaction_num) as f64 / elapsed as f64) as i32;
313                        self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
314                        now_time = Instant::now();
315                        now_transaction_num = current_transaction_num;
316                    }
317                     // we want to be inclusive of checkpoints in RunWithRange::Checkpoint type
318                    if run_with_range.is_some_and(|rwr| rwr.matches_checkpoint(checkpoint.sequence_number)) {
319                        info!(
320                            "RunWithRange condition satisfied after checkpoint sequence number {:?}",
321                            checkpoint.sequence_number
322                        );
323                        return StopReason::RunWithRangeCondition;
324                    }
325                }
326
327                received = self.mailbox.recv() => match received {
328                    Ok(checkpoint) => {
329                        debug!(
330                            sequence_number = ?checkpoint.sequence_number,
331                            "Received checkpoint summary from state sync"
332                        );
333                        checkpoint.report_checkpoint_age_ms(&self.metrics.checkpoint_contents_age_ms);
334                    },
335                    Err(RecvError::Lagged(num_skipped)) => {
336                        debug!(
337                            "Checkpoint Execution Recv channel overflowed with {:?} messages",
338                            num_skipped,
339                        );
340                    }
341                    Err(RecvError::Closed) => {
342                        panic!("Checkpoint Execution Sender (StateSync) closed channel unexpectedly");
343                    },
344                },
345
346                _ = tokio::time::sleep(warning_timeout) => {
347                    warn!(
348                        "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
349                    );
350                }
351
352                _ = panic_timeout
353                            .map(|d| Either::Left(tokio::time::sleep(d)))
354                            .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
355                    panic!("No new synced checkpoints received for {panic_timeout:?} on node {:?}", self.state.name);
356                },
357            }
358        }
359    }
360
361    fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
362        // Ensure that we are not skipping checkpoints at any point
363        let seq = *checkpoint.sequence_number();
364        debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
365        if let Some(prev_highest) = self
366            .checkpoint_store
367            .get_highest_executed_checkpoint_seq_number()
368            .unwrap()
369        {
370            assert_eq!(prev_highest + 1, seq);
371        } else {
372            assert_eq!(seq, 0);
373        }
374        if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
375            info!("Finished syncing and executing checkpoint {}", seq);
376        }
377
378        fail_point!("highest-executed-checkpoint");
379
380        // We store a fixed number of additional FullCheckpointContents after execution
381        // is complete for use in state sync.
382        const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
383        if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
384            let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
385            if let Some(prune_checkpoint) = self
386                .checkpoint_store
387                .get_checkpoint_by_sequence_number(prune_seq)
388                .expect("Failed to fetch checkpoint")
389            {
390                self.checkpoint_store
391                    .delete_full_checkpoint_contents(prune_seq)
392                    .expect("Failed to delete full checkpoint contents");
393                self.checkpoint_store
394                    .delete_contents_digest_sequence_number_mapping(
395                        &prune_checkpoint.content_digest,
396                    )
397                    .expect("Failed to delete contents digest -> sequence number mapping");
398            } else {
399                // If this is directly after a snapshot restore with skiplisting,
400                // this is expected for the first `NUM_SAVED_FULL_CHECKPOINT_CONTENTS`
401                // checkpoints.
402                debug!(
403                    "Failed to fetch checkpoint with sequence number {:?}",
404                    prune_seq
405                );
406            }
407        }
408
409        self.checkpoint_store
410            .update_highest_executed_checkpoint(checkpoint)
411            .unwrap();
412        self.metrics.last_executed_checkpoint.set(seq as i64);
413
414        self.metrics
415            .last_executed_checkpoint_timestamp_ms
416            .set(checkpoint.timestamp_ms as i64);
417        checkpoint.report_checkpoint_age_ms(&self.metrics.last_executed_checkpoint_age_ms);
418    }
419
420    /// Post processing and plumbing after we executed a checkpoint. This
421    /// function is guaranteed to be called in the order of checkpoint
422    /// sequence number.
423    #[instrument(level = "debug", skip_all)]
424    async fn process_executed_checkpoint(
425        &self,
426        epoch_store: &AuthorityPerEpochStore,
427        checkpoint: &VerifiedCheckpoint,
428        checkpoint_acc: Option<Accumulator>,
429        all_tx_digests: &[TransactionDigest],
430    ) {
431        // Commit all transaction effects to disk
432        let cache_commit = self.state.get_cache_commit();
433        debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
434        cache_commit
435            .commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
436            .await
437            .expect("commit_transaction_outputs cannot fail");
438
439        epoch_store
440            .handle_committed_transactions(all_tx_digests)
441            .expect("cannot fail");
442
443        if !checkpoint.is_last_checkpoint_of_epoch() {
444            self.accumulator
445                .accumulate_running_root(epoch_store, checkpoint.sequence_number, checkpoint_acc)
446                .await
447                .expect("Failed to accumulate running root");
448            self.bump_highest_executed_checkpoint(checkpoint);
449        }
450    }
451
452    #[instrument(level = "debug", skip_all)]
453    fn schedule_synced_checkpoints(
454        &self,
455        pending: &mut CheckpointExecutionBuffer,
456        next_to_schedule: &mut CheckpointSequenceNumber,
457        epoch_store: Arc<AuthorityPerEpochStore>,
458        run_with_range: Option<RunWithRange>,
459    ) {
460        let Some(latest_synced_checkpoint) = self
461            .checkpoint_store
462            .get_highest_synced_checkpoint()
463            .expect("Failed to read highest synced checkpoint")
464        else {
465            debug!("No checkpoints to schedule, highest synced checkpoint is None",);
466            return;
467        };
468
469        while *next_to_schedule <= *latest_synced_checkpoint.sequence_number()
470            && pending.len() < self.config.checkpoint_execution_max_concurrency
471        {
472            let checkpoint = self
473                .checkpoint_store
474                .get_checkpoint_by_sequence_number(*next_to_schedule)
475                .unwrap()
476                .unwrap_or_else(|| {
477                    panic!(
478                        "Checkpoint sequence number {:?} does not exist in checkpoint store",
479                        *next_to_schedule
480                    )
481                });
482            if checkpoint.epoch() > epoch_store.epoch() {
483                return;
484            }
485            match run_with_range {
486                Some(RunWithRange::Checkpoint(seq)) if *next_to_schedule > seq => {
487                    debug!(
488                        "RunWithRange Checkpoint {} is set, not scheduling checkpoint {}",
489                        seq, *next_to_schedule
490                    );
491                    return;
492                }
493                _ => {
494                    self.schedule_checkpoint(checkpoint, pending, epoch_store.clone());
495                    *next_to_schedule += 1;
496                }
497            }
498        }
499    }
500
501    #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
502    fn schedule_checkpoint(
503        &self,
504        checkpoint: VerifiedCheckpoint,
505        pending: &mut CheckpointExecutionBuffer,
506        epoch_store: Arc<AuthorityPerEpochStore>,
507    ) {
508        debug!("Scheduling checkpoint for execution");
509
510        // Mismatch between node epoch and checkpoint epoch after startup
511        // crash recovery is invalid
512        let checkpoint_epoch = checkpoint.epoch();
513        assert_eq!(
514            checkpoint_epoch,
515            epoch_store.epoch(),
516            "Epoch mismatch after startup recovery. checkpoint epoch: {:?}, node epoch: {:?}",
517            checkpoint_epoch,
518            epoch_store.epoch(),
519        );
520
521        let metrics = self.metrics.clone();
522        let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
523        let data_ingestion_dir = self.config.data_ingestion_dir.clone();
524        let checkpoint_store = self.checkpoint_store.clone();
525        let object_cache_reader = self.object_cache_reader.clone();
526        let transaction_cache_reader = self.transaction_cache_reader.clone();
527        let tx_manager = self.tx_manager.clone();
528        let accumulator = self.accumulator.clone();
529        let state = self.state.clone();
530
531        epoch_store.notify_synced_checkpoint(*checkpoint.sequence_number());
532
533        pending.push_back(spawn_monitored_task!(async move {
534            let epoch_store = epoch_store.clone();
535            let (tx_digests, checkpoint_acc) = loop {
536                match execute_checkpoint(
537                    checkpoint.clone(),
538                    &state,
539                    object_cache_reader.as_ref(),
540                    transaction_cache_reader.as_ref(),
541                    checkpoint_store.clone(),
542                    epoch_store.clone(),
543                    tx_manager.clone(),
544                    accumulator.clone(),
545                    local_execution_timeout_sec,
546                    &metrics,
547                    data_ingestion_dir.clone(),
548                )
549                .await
550                {
551                    Err(err) => {
552                        error!(
553                            "Error while executing checkpoint, will retry in 1s: {:?}",
554                            err
555                        );
556                        tokio::time::sleep(Duration::from_secs(1)).await;
557                        metrics.checkpoint_exec_errors.inc();
558                    }
559                    Ok((tx_digests, checkpoint_acc)) => break (tx_digests, checkpoint_acc),
560                }
561            };
562            (checkpoint, checkpoint_acc, tx_digests)
563        }));
564    }
565
566    #[instrument(level = "info", skip_all)]
567    async fn execute_change_epoch_tx(
568        &self,
569        execution_digests: ExecutionDigests,
570        change_epoch_tx_digest: TransactionDigest,
571        change_epoch_tx: VerifiedExecutableTransaction,
572        epoch_store: Arc<AuthorityPerEpochStore>,
573        checkpoint: VerifiedCheckpoint,
574    ) {
575        let change_epoch_fx = self
576            .transaction_cache_reader
577            .get_effects(&execution_digests.effects)
578            .expect("Fetching effects for change_epoch tx cannot fail")
579            .expect("Change_epoch tx effects must exist");
580
581        if change_epoch_tx.contains_shared_object() {
582            epoch_store
583                .acquire_shared_locks_from_effects(
584                    &change_epoch_tx,
585                    &change_epoch_fx,
586                    self.object_cache_reader.as_ref(),
587                )
588                .await
589                .expect("Acquiring shared locks for change_epoch tx cannot fail");
590        }
591
592        self.tx_manager.enqueue_with_expected_effects_digest(
593            vec![(change_epoch_tx.clone(), execution_digests.effects)],
594            &epoch_store,
595        );
596        handle_execution_effects(
597            &self.state,
598            vec![execution_digests],
599            vec![change_epoch_tx_digest],
600            checkpoint.clone(),
601            self.checkpoint_store.clone(),
602            self.object_cache_reader.as_ref(),
603            self.transaction_cache_reader.as_ref(),
604            epoch_store.clone(),
605            self.tx_manager.clone(),
606            self.accumulator.clone(),
607            self.config.local_execution_timeout_sec,
608            self.config.data_ingestion_dir.clone(),
609        )
610        .await;
611    }
612
613    /// Check whether `checkpoint` is the last checkpoint of the current epoch.
614    /// If so, perform special case logic (execute change_epoch tx,
615    /// accumulate epoch, finalize transactions), then return true.
616    pub async fn check_epoch_last_checkpoint(
617        &self,
618        epoch_store: Arc<AuthorityPerEpochStore>,
619        checkpoint: &Option<VerifiedCheckpoint>,
620    ) -> bool {
621        let cur_epoch = epoch_store.epoch();
622
623        if let Some(checkpoint) = checkpoint {
624            if checkpoint.epoch() == cur_epoch {
625                if let Some((change_epoch_execution_digests, change_epoch_tx)) =
626                    extract_end_of_epoch_tx(
627                        checkpoint,
628                        self.transaction_cache_reader.as_ref(),
629                        self.checkpoint_store.clone(),
630                        epoch_store.clone(),
631                    )
632                {
633                    let change_epoch_tx_digest = change_epoch_execution_digests.transaction;
634
635                    info!(
636                        ended_epoch = cur_epoch,
637                        last_checkpoint = checkpoint.sequence_number(),
638                        "Reached end of epoch, executing change_epoch transaction",
639                    );
640
641                    self.execute_change_epoch_tx(
642                        change_epoch_execution_digests,
643                        change_epoch_tx_digest,
644                        change_epoch_tx,
645                        epoch_store.clone(),
646                        checkpoint.clone(),
647                    )
648                    .await;
649
650                    let cache_commit = self.state.get_cache_commit();
651                    cache_commit
652                        .commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
653                        .await
654                        .expect("commit_transaction_outputs cannot fail");
655                    fail_point_async!("prune-and-compact");
656
657                    // For finalizing the checkpoint, we need to pass in all checkpoint
658                    // transaction effects, not just the change_epoch tx effects. However,
659                    // we have already notify awaited all tx effects separately (once
660                    // for change_epoch tx, and once for all other txes). Therefore this
661                    // should be a fast operation
662                    let all_tx_digests: Vec<_> = self
663                        .checkpoint_store
664                        .get_checkpoint_contents(&checkpoint.content_digest)
665                        .expect("read cannot fail")
666                        .expect("Checkpoint contents should exist")
667                        .iter()
668                        .map(|digests| digests.transaction)
669                        .collect();
670
671                    let effects = self
672                        .transaction_cache_reader
673                        .notify_read_executed_effects(&all_tx_digests)
674                        .await
675                        .expect("Failed to get executed effects for finalizing checkpoint");
676
677                    finalize_checkpoint(
678                        &self.state,
679                        self.object_cache_reader.as_ref(),
680                        self.transaction_cache_reader.as_ref(),
681                        self.checkpoint_store.clone(),
682                        &all_tx_digests,
683                        &epoch_store,
684                        checkpoint.clone(),
685                        self.accumulator.clone(),
686                        effects,
687                        self.config.data_ingestion_dir.clone(),
688                    )
689                    .await
690                    .expect("Finalizing checkpoint cannot fail");
691
692                    self.checkpoint_store
693                        .insert_epoch_last_checkpoint(cur_epoch, checkpoint)
694                        .expect("Failed to insert epoch last checkpoint");
695
696                    self.accumulator
697                        .accumulate_running_root(&epoch_store, checkpoint.sequence_number, None)
698                        .await
699                        .expect("Failed to accumulate running root");
700                    self.accumulator
701                        .accumulate_epoch(epoch_store.clone(), *checkpoint.sequence_number())
702                        .await
703                        .expect("Accumulating epoch cannot fail");
704
705                    self.bump_highest_executed_checkpoint(checkpoint);
706
707                    return true;
708                }
709            }
710        }
711        false
712    }
713}
714
715// Logs within the function are annotated with the checkpoint sequence number
716// and epoch, from schedule_checkpoint().
717#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
718async fn execute_checkpoint(
719    checkpoint: VerifiedCheckpoint,
720    state: &AuthorityState,
721    object_cache_reader: &dyn ObjectCacheRead,
722    transaction_cache_reader: &dyn TransactionCacheRead,
723    checkpoint_store: Arc<CheckpointStore>,
724    epoch_store: Arc<AuthorityPerEpochStore>,
725    transaction_manager: Arc<TransactionManager>,
726    accumulator: Arc<StateAccumulator>,
727    local_execution_timeout_sec: u64,
728    metrics: &Arc<CheckpointExecutorMetrics>,
729    data_ingestion_dir: Option<PathBuf>,
730) -> IotaResult<(Vec<TransactionDigest>, Option<Accumulator>)> {
731    debug!("Preparing checkpoint for execution",);
732    let prepare_start = Instant::now();
733
734    // this function must guarantee that all transactions in the checkpoint are
735    // executed before it returns. This invariant is enforced in two phases:
736    // - First, we filter out any already executed transactions from the checkpoint
737    //   in get_unexecuted_transactions()
738    // - Second, we execute all remaining transactions.
739
740    let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
741        get_unexecuted_transactions(
742            checkpoint.clone(),
743            transaction_cache_reader,
744            checkpoint_store.clone(),
745            epoch_store.clone(),
746        );
747
748    let tx_count = execution_digests.len();
749    debug!("Number of transactions in the checkpoint: {:?}", tx_count);
750    metrics.checkpoint_transaction_count.report(tx_count as u64);
751
752    let checkpoint_acc = execute_transactions(
753        execution_digests,
754        all_tx_digests.clone(),
755        executable_txns,
756        state,
757        object_cache_reader,
758        transaction_cache_reader,
759        checkpoint_store.clone(),
760        epoch_store.clone(),
761        transaction_manager,
762        accumulator,
763        local_execution_timeout_sec,
764        checkpoint,
765        metrics,
766        prepare_start,
767        data_ingestion_dir,
768    )
769    .await?;
770
771    // Once execution is complete, we know that any randomness contained in this
772    // checkpoint has been successfully included in a checkpoint certified by
773    // quorum of validators. (RandomnessManager/RandomnessReporter is only
774    // present on validators.)
775    if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
776        for round in randomness_rounds {
777            debug!(
778                ?round,
779                "notifying RandomnessReporter that randomness update was executed in checkpoint"
780            );
781            randomness_reporter.notify_randomness_in_checkpoint(round)?;
782        }
783    }
784
785    Ok((all_tx_digests, checkpoint_acc))
786}
787
788#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
789async fn handle_execution_effects(
790    state: &AuthorityState,
791    execution_digests: Vec<ExecutionDigests>,
792    all_tx_digests: Vec<TransactionDigest>,
793    checkpoint: VerifiedCheckpoint,
794    checkpoint_store: Arc<CheckpointStore>,
795    object_cache_reader: &dyn ObjectCacheRead,
796    transaction_cache_reader: &dyn TransactionCacheRead,
797    epoch_store: Arc<AuthorityPerEpochStore>,
798    transaction_manager: Arc<TransactionManager>,
799    accumulator: Arc<StateAccumulator>,
800    local_execution_timeout_sec: u64,
801    data_ingestion_dir: Option<PathBuf>,
802) -> Option<Accumulator> {
803    // Once synced_txns have been awaited, all txns should have effects committed.
804    let mut periods = 1;
805    let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
806    // Whether the checkpoint is next to execute and blocking additional executions.
807    let mut blocking_execution = false;
808    loop {
809        let effects_future = transaction_cache_reader.notify_read_executed_effects(&all_tx_digests);
810
811        match timeout(log_timeout_sec, effects_future).await {
812            Err(_elapsed) => {
813                // Reading this value every timeout should be ok.
814                let highest_seq = checkpoint_store
815                    .get_highest_executed_checkpoint_seq_number()
816                    .unwrap()
817                    .unwrap_or_default();
818                if checkpoint.sequence_number <= highest_seq {
819                    error!(
820                        "Re-executing checkpoint {} after higher checkpoint {} has executed!",
821                        checkpoint.sequence_number, highest_seq
822                    );
823                    continue;
824                }
825                if checkpoint.sequence_number > highest_seq + 1 {
826                    trace!(
827                        "Checkpoint {} is still executing. Highest executed = {}",
828                        checkpoint.sequence_number, highest_seq
829                    );
830                    continue;
831                }
832                if !blocking_execution {
833                    trace!(
834                        "Checkpoint {} is next to execute.",
835                        checkpoint.sequence_number
836                    );
837                    blocking_execution = true;
838                    continue;
839                }
840
841                // Only log details when the checkpoint is next to execute, but has not finished
842                // execution within log_timeout_sec.
843                let missing_digests: Vec<TransactionDigest> = transaction_cache_reader
844                    .multi_get_executed_effects_digests(&all_tx_digests)
845                    .expect("multi_get_executed_effects cannot fail")
846                    .iter()
847                    .zip(all_tx_digests.clone())
848                    .filter_map(
849                        |(fx, digest)| {
850                            if fx.is_none() { Some(digest) } else { None }
851                        },
852                    )
853                    .collect();
854
855                if missing_digests.is_empty() {
856                    // All effects just become available.
857                    continue;
858                }
859
860                warn!(
861                    "Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
862                    missing_digests,
863                    log_timeout_sec * periods,
864                );
865
866                // Print out more information for the 1st pending transaction, which should have
867                // all of its input available.
868                let pending_digest = missing_digests.first().unwrap();
869                if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
870                    warn!(
871                        "Transaction {pending_digest:?} has missing input objects {missing_input:?}",
872                    );
873                }
874                periods += 1;
875            }
876            Ok(Err(err)) => panic!("Failed to notify_read_executed_effects: {:?}", err),
877            Ok(Ok(effects)) => {
878                for (tx_digest, expected_digest, actual_effects) in
879                    izip!(&all_tx_digests, &execution_digests, &effects)
880                {
881                    let expected_effects_digest = &expected_digest.effects;
882                    assert_not_forked(
883                        &checkpoint,
884                        tx_digest,
885                        expected_effects_digest,
886                        &actual_effects.digest(),
887                        transaction_cache_reader,
888                    );
889                }
890
891                // if end of epoch checkpoint, we must finalize the checkpoint after executing
892                // the change epoch tx, which is done after all other checkpoint execution
893                if checkpoint.end_of_epoch_data.is_none() {
894                    return Some(
895                        finalize_checkpoint(
896                            state,
897                            object_cache_reader,
898                            transaction_cache_reader,
899                            checkpoint_store.clone(),
900                            &all_tx_digests,
901                            &epoch_store,
902                            checkpoint.clone(),
903                            accumulator.clone(),
904                            effects,
905                            data_ingestion_dir,
906                        )
907                        .await
908                        .expect("Finalizing checkpoint cannot fail"),
909                    );
910                } else {
911                    return None;
912                }
913            }
914        }
915    }
916}
917
918fn assert_not_forked(
919    checkpoint: &VerifiedCheckpoint,
920    tx_digest: &TransactionDigest,
921    expected_digest: &TransactionEffectsDigest,
922    actual_effects_digest: &TransactionEffectsDigest,
923    cache_reader: &dyn TransactionCacheRead,
924) {
925    if *expected_digest != *actual_effects_digest {
926        let actual_effects = cache_reader
927            .get_executed_effects(tx_digest)
928            .expect("get_executed_effects cannot fail")
929            .expect("actual effects should exist");
930
931        // log observed effects (too big for panic message) and then panic.
932        error!(
933            ?checkpoint,
934            ?tx_digest,
935            ?expected_digest,
936            ?actual_effects,
937            "fork detected!"
938        );
939        panic!(
940            "When executing checkpoint {}, transaction {} \
941            is expected to have effects digest {}, but got {}!",
942            checkpoint.sequence_number(),
943            tx_digest,
944            expected_digest,
945            actual_effects_digest,
946        );
947    }
948}
949
950// Given a checkpoint, find the end of epoch transaction, if it exists
951fn extract_end_of_epoch_tx(
952    checkpoint: &VerifiedCheckpoint,
953    cache_reader: &dyn TransactionCacheRead,
954    checkpoint_store: Arc<CheckpointStore>,
955    epoch_store: Arc<AuthorityPerEpochStore>,
956) -> Option<(ExecutionDigests, VerifiedExecutableTransaction)> {
957    checkpoint.end_of_epoch_data.as_ref()?;
958
959    // Last checkpoint must have the end of epoch transaction as the last
960    // transaction.
961
962    let checkpoint_sequence = checkpoint.sequence_number();
963    let execution_digests = checkpoint_store
964        .get_checkpoint_contents(&checkpoint.content_digest)
965        .expect("Failed to get checkpoint contents from store")
966        .unwrap_or_else(|| {
967            panic!(
968                "Checkpoint contents for digest {:?} does not exist",
969                checkpoint.content_digest
970            )
971        })
972        .into_inner();
973
974    let digests = execution_digests
975        .last()
976        .expect("Final checkpoint must have at least one transaction");
977
978    let change_epoch_tx = cache_reader
979        .get_transaction_block(&digests.transaction)
980        .expect("read cannot fail");
981
982    let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
983        (*change_epoch_tx.unwrap_or_else(||
984            panic!(
985                "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
986                digests.transaction,
987            )
988        )).clone(),
989        epoch_store.epoch(),
990        *checkpoint_sequence,
991    );
992
993    assert!(
994        change_epoch_tx
995            .data()
996            .intent_message()
997            .value
998            .is_end_of_epoch_tx()
999    );
1000
1001    Some((*digests, change_epoch_tx))
1002}
1003
1004// Given a checkpoint, filter out any already executed transactions, then return
1005// the remaining execution digests, transaction digests, transactions to be
1006// executed, and randomness rounds (if any) included in the checkpoint.
1007#[expect(clippy::type_complexity)]
1008fn get_unexecuted_transactions(
1009    checkpoint: VerifiedCheckpoint,
1010    cache_reader: &dyn TransactionCacheRead,
1011    checkpoint_store: Arc<CheckpointStore>,
1012    epoch_store: Arc<AuthorityPerEpochStore>,
1013) -> (
1014    Vec<ExecutionDigests>,
1015    Vec<TransactionDigest>,
1016    Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1017    Vec<RandomnessRound>,
1018) {
1019    let checkpoint_sequence = checkpoint.sequence_number();
1020    let full_contents = checkpoint_store
1021        .get_full_checkpoint_contents_by_sequence_number(*checkpoint_sequence)
1022        .expect("Failed to get checkpoint contents from store")
1023        .tap_some(|_| {
1024            debug!("loaded full checkpoint contents in bulk for sequence {checkpoint_sequence}")
1025        });
1026
1027    let mut execution_digests = checkpoint_store
1028        .get_checkpoint_contents(&checkpoint.content_digest)
1029        .expect("Failed to get checkpoint contents from store")
1030        .unwrap_or_else(|| {
1031            panic!(
1032                "Checkpoint contents for digest {:?} does not exist",
1033                checkpoint.content_digest
1034            )
1035        })
1036        .into_inner();
1037
1038    let full_contents_txns = full_contents.map(|c| {
1039        c.into_iter()
1040            .zip(execution_digests.iter())
1041            .map(|(txn, digests)| (digests.transaction, txn))
1042            .collect::<HashMap<_, _>>()
1043    });
1044
1045    // Remove the change epoch transaction so that we can special case its
1046    // execution.
1047    checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
1048        let change_epoch_tx_digest = execution_digests
1049            .pop()
1050            .expect("Final checkpoint must have at least one transaction")
1051            .transaction;
1052
1053        let change_epoch_tx = cache_reader
1054            .get_transaction_block(&change_epoch_tx_digest)
1055            .expect("read cannot fail")
1056            .unwrap_or_else(||
1057                panic!(
1058                    "state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}",
1059                    checkpoint.sequence_number()
1060                )
1061            );
1062        assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
1063    });
1064
1065    let randomness_rounds = if let Some(version_specific_data) = checkpoint
1066        .version_specific_data(epoch_store.protocol_config())
1067        .expect("unable to get version_specific_data")
1068    {
1069        // With version-specific data, randomness rounds are stored in checkpoint
1070        // summary.
1071        version_specific_data.into_v1().randomness_rounds
1072    } else {
1073        // Before version-specific data, checkpoint batching must be disabled. In this
1074        // case, randomness state update tx must be first if it exists, because
1075        // all other transactions in a checkpoint that includes a randomness
1076        // state update are causally dependent on it.
1077        assert_eq!(
1078            0,
1079            epoch_store
1080                .protocol_config()
1081                .min_checkpoint_interval_ms_as_option()
1082                .unwrap_or_default(),
1083        );
1084        if let Some(first_digest) = execution_digests.first() {
1085            let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
1086            .expect("read cannot fail")
1087            .unwrap_or_else(||
1088                panic!(
1089                    "state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
1090                    checkpoint.sequence_number()
1091                )
1092            );
1093            if let TransactionKind::RandomnessStateUpdate(rsu) =
1094                maybe_randomness_tx.data().transaction_data().kind()
1095            {
1096                vec![rsu.randomness_round]
1097            } else {
1098                Vec::new()
1099            }
1100        } else {
1101            Vec::new()
1102        }
1103    };
1104
1105    let all_tx_digests: Vec<TransactionDigest> =
1106        execution_digests.iter().map(|tx| tx.transaction).collect();
1107
1108    let executed_effects_digests = cache_reader
1109        .multi_get_executed_effects_digests(&all_tx_digests)
1110        .expect("failed to read executed_effects from store");
1111
1112    let (unexecuted_txns, expected_effects_digests): (Vec<_>, Vec<_>) =
1113        izip!(execution_digests.iter(), executed_effects_digests.iter())
1114            .filter_map(|(digests, effects_digest)| match effects_digest {
1115                None => Some((digests.transaction, digests.effects)),
1116                Some(actual_effects_digest) => {
1117                    let tx_digest = &digests.transaction;
1118                    let effects_digest = &digests.effects;
1119                    trace!(
1120                        "Transaction with digest {:?} has already been executed",
1121                        tx_digest
1122                    );
1123                    assert_not_forked(
1124                        &checkpoint,
1125                        tx_digest,
1126                        effects_digest,
1127                        actual_effects_digest,
1128                        cache_reader,
1129                    );
1130                    None
1131                }
1132            })
1133            .unzip();
1134
1135    // read remaining unexecuted transactions from store
1136    let executable_txns: Vec<_> = if let Some(full_contents_txns) = full_contents_txns {
1137        unexecuted_txns
1138            .into_iter()
1139            .zip(expected_effects_digests)
1140            .map(|(tx_digest, expected_effects_digest)| {
1141                let tx = &full_contents_txns.get(&tx_digest).unwrap().transaction;
1142                (
1143                    VerifiedExecutableTransaction::new_from_checkpoint(
1144                        VerifiedTransaction::new_unchecked(tx.clone()),
1145                        epoch_store.epoch(),
1146                        *checkpoint_sequence,
1147                    ),
1148                    expected_effects_digest,
1149                )
1150            })
1151            .collect()
1152    } else {
1153        cache_reader
1154            .multi_get_transaction_blocks(&unexecuted_txns)
1155            .expect("Failed to get checkpoint txes from store")
1156            .into_iter()
1157            .zip(expected_effects_digests)
1158            .enumerate()
1159            .map(|(i, (tx, expected_effects_digest))| {
1160                let tx = tx.unwrap_or_else(||
1161                    panic!(
1162                        "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
1163                        unexecuted_txns[i]
1164                    )
1165                );
1166                // change epoch tx is handled specially in check_epoch_last_checkpoint
1167                assert!(!tx.data().intent_message().value.is_end_of_epoch_tx());
1168                (
1169                    VerifiedExecutableTransaction::new_from_checkpoint(
1170                        Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()),
1171                        epoch_store.epoch(),
1172                        *checkpoint_sequence,
1173                    ),
1174                    expected_effects_digest
1175                )
1176            })
1177            .collect()
1178    };
1179
1180    (
1181        execution_digests,
1182        all_tx_digests,
1183        executable_txns,
1184        randomness_rounds,
1185    )
1186}
1187
1188// Logs within the function are annotated with the checkpoint sequence number
1189// and epoch, from schedule_checkpoint().
1190#[instrument(level = "debug", skip_all)]
1191async fn execute_transactions(
1192    execution_digests: Vec<ExecutionDigests>,
1193    all_tx_digests: Vec<TransactionDigest>,
1194    executable_txns: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1195    state: &AuthorityState,
1196    object_cache_reader: &dyn ObjectCacheRead,
1197    transaction_cache_reader: &dyn TransactionCacheRead,
1198    checkpoint_store: Arc<CheckpointStore>,
1199    epoch_store: Arc<AuthorityPerEpochStore>,
1200    transaction_manager: Arc<TransactionManager>,
1201    state_accumulator: Arc<StateAccumulator>,
1202    local_execution_timeout_sec: u64,
1203    checkpoint: VerifiedCheckpoint,
1204    metrics: &Arc<CheckpointExecutorMetrics>,
1205    prepare_start: Instant,
1206    data_ingestion_dir: Option<PathBuf>,
1207) -> IotaResult<Option<Accumulator>> {
1208    let effects_digests: HashMap<_, _> = execution_digests
1209        .iter()
1210        .map(|digest| (digest.transaction, digest.effects))
1211        .collect();
1212
1213    let shared_effects_digests = executable_txns
1214        .iter()
1215        .filter(|(tx, _)| tx.contains_shared_object())
1216        .map(|(tx, _)| {
1217            *effects_digests
1218                .get(tx.digest())
1219                .expect("Transaction digest not found in effects_digests")
1220        })
1221        .collect::<Vec<_>>();
1222
1223    let digest_to_effects: HashMap<TransactionDigest, TransactionEffects> =
1224        transaction_cache_reader
1225            .multi_get_effects(&shared_effects_digests)?
1226            .into_iter()
1227            .zip(shared_effects_digests)
1228            .map(|(fx, fx_digest)| {
1229                if fx.is_none() {
1230                    panic!(
1231                        "Transaction effects for effects digest {:?} do not exist in effects table",
1232                        fx_digest
1233                    );
1234                }
1235                let fx = fx.unwrap();
1236                (*fx.transaction_digest(), fx)
1237            })
1238            .collect();
1239
1240    for (tx, _) in &executable_txns {
1241        if tx.contains_shared_object() {
1242            epoch_store
1243                .acquire_shared_locks_from_effects(
1244                    tx,
1245                    digest_to_effects.get(tx.digest()).unwrap(),
1246                    object_cache_reader,
1247                )
1248                .await?;
1249        }
1250    }
1251
1252    let prepare_elapsed = prepare_start.elapsed();
1253    metrics
1254        .checkpoint_prepare_latency_us
1255        .report(prepare_elapsed.as_micros() as u64);
1256    if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1257        info!(
1258            "Checkpoint preparation for execution took {:?}",
1259            prepare_elapsed
1260        );
1261    }
1262
1263    let exec_start = Instant::now();
1264    transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);
1265
1266    let checkpoint_acc = handle_execution_effects(
1267        state,
1268        execution_digests,
1269        all_tx_digests,
1270        checkpoint.clone(),
1271        checkpoint_store,
1272        object_cache_reader,
1273        transaction_cache_reader,
1274        epoch_store,
1275        transaction_manager,
1276        state_accumulator,
1277        local_execution_timeout_sec,
1278        data_ingestion_dir,
1279    )
1280    .await;
1281
1282    let exec_elapsed = exec_start.elapsed();
1283    metrics
1284        .checkpoint_exec_latency_us
1285        .report(exec_elapsed.as_micros() as u64);
1286    if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1287        info!("Checkpoint execution took {:?}", exec_elapsed);
1288    }
1289
1290    Ok(checkpoint_acc)
1291}
1292
1293#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
1294async fn finalize_checkpoint(
1295    state: &AuthorityState,
1296    object_cache_reader: &dyn ObjectCacheRead,
1297    transaction_cache_reader: &dyn TransactionCacheRead,
1298    checkpoint_store: Arc<CheckpointStore>,
1299    tx_digests: &[TransactionDigest],
1300    epoch_store: &Arc<AuthorityPerEpochStore>,
1301    checkpoint: VerifiedCheckpoint,
1302    accumulator: Arc<StateAccumulator>,
1303    effects: Vec<TransactionEffects>,
1304    data_ingestion_dir: Option<PathBuf>,
1305) -> IotaResult<Accumulator> {
1306    debug!("finalizing checkpoint");
1307    epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;
1308
1309    // TODO remove once we no longer need to support this table for read RPC
1310    state
1311        .get_checkpoint_cache()
1312        .insert_finalized_transactions_perpetual_checkpoints(
1313            tx_digests,
1314            epoch_store.epoch(),
1315            checkpoint.sequence_number,
1316        )?;
1317
1318    let checkpoint_acc =
1319        accumulator.accumulate_checkpoint(effects, checkpoint.sequence_number, epoch_store)?;
1320
1321    if data_ingestion_dir.is_some() || state.rest_index.is_some() {
1322        let checkpoint_data = load_checkpoint_data(
1323            checkpoint,
1324            object_cache_reader,
1325            transaction_cache_reader,
1326            checkpoint_store,
1327            tx_digests,
1328        )?;
1329
1330        // TODO(bmwill) discuss with team a better location for this indexing so that it
1331        // isn't on the critical path and the writes to the DB are done in
1332        // checkpoint order
1333        if let Some(rest_index) = &state.rest_index {
1334            let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
1335                PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
1336            ));
1337
1338            rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
1339        }
1340
1341        if let Some(path) = data_ingestion_dir {
1342            store_checkpoint_locally(path, &checkpoint_data)?;
1343        }
1344    }
1345    Ok(checkpoint_acc)
1346}