1use std::{
25 collections::HashMap,
26 path::PathBuf,
27 sync::Arc,
28 time::{Duration, Instant},
29};
30
31use either::Either;
32use futures::stream::FuturesOrdered;
33use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
34use iota_macros::{fail_point, fail_point_async};
35use iota_metrics::spawn_monitored_task;
36use iota_types::{
37 accumulator::Accumulator,
38 base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
39 crypto::RandomnessRound,
40 effects::{TransactionEffects, TransactionEffectsAPI},
41 error::IotaResult,
42 executable_transaction::VerifiedExecutableTransaction,
43 inner_temporary_store::PackageStoreWithFallback,
44 message_envelope::Message,
45 messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
46 transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
47};
48use itertools::izip;
49use tap::{TapFallible, TapOptional};
50use tokio::{
51 sync::broadcast::{self, error::RecvError},
52 task::JoinHandle,
53 time::timeout,
54};
55use tokio_stream::StreamExt;
56use tracing::{debug, error, info, instrument, trace, warn};
57
58use self::metrics::CheckpointExecutorMetrics;
59use crate::{
60 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
61 checkpoints::{
62 CheckpointStore,
63 checkpoint_executor::data_ingestion_handler::{
64 load_checkpoint_data, store_checkpoint_locally,
65 },
66 },
67 execution_cache::{ObjectCacheRead, TransactionCacheRead},
68 state_accumulator::StateAccumulator,
69 transaction_manager::TransactionManager,
70};
71
72mod data_ingestion_handler;
73pub mod metrics;
74
75#[cfg(test)]
76pub(crate) mod tests;
77
78type CheckpointExecutionBuffer = FuturesOrdered<
79 JoinHandle<(
80 VerifiedCheckpoint,
81 Option<Accumulator>,
82 Vec<TransactionDigest>,
83 )>,
84>;
85
86const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
88
89#[derive(Debug, Clone, Copy)]
90pub struct CheckpointTimeoutConfig {
91 pub panic_timeout: Option<Duration>,
92 pub warning_timeout: Duration,
93}
94
95thread_local! {
100 static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
101 const { once_cell::sync::OnceCell::new() };
102}
103
104#[cfg(msim)]
105pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
106 SCHEDULING_TIMEOUT.with(|s| {
107 s.set(config).expect("SchedulingTimeoutConfig already set");
108 });
109}
110
111fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
112 fn inner() -> CheckpointTimeoutConfig {
113 let panic_timeout: Option<Duration> = if cfg!(msim) {
114 Some(Duration::from_secs(45))
115 } else {
116 std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
117 .ok()
118 .and_then(|s| s.parse::<u64>().ok())
119 .map(Duration::from_millis)
120 };
121
122 let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
123 .ok()
124 .and_then(|s| s.parse::<u64>().ok())
125 .map(Duration::from_millis)
126 .unwrap_or(Duration::from_secs(5));
127
128 CheckpointTimeoutConfig {
129 panic_timeout,
130 warning_timeout,
131 }
132 }
133
134 SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
135}
136
137#[derive(PartialEq, Eq, Debug)]
138pub enum StopReason {
139 EpochComplete,
140 RunWithRangeCondition,
141}
142
143pub struct CheckpointExecutor {
144 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
145 state: Arc<AuthorityState>,
146 checkpoint_store: Arc<CheckpointStore>,
147 object_cache_reader: Arc<dyn ObjectCacheRead>,
148 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
149 tx_manager: Arc<TransactionManager>,
150 accumulator: Arc<StateAccumulator>,
151 config: CheckpointExecutorConfig,
152 metrics: Arc<CheckpointExecutorMetrics>,
153}
154
155impl CheckpointExecutor {
156 pub fn new(
157 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
158 checkpoint_store: Arc<CheckpointStore>,
159 state: Arc<AuthorityState>,
160 accumulator: Arc<StateAccumulator>,
161 config: CheckpointExecutorConfig,
162 metrics: Arc<CheckpointExecutorMetrics>,
163 ) -> Self {
164 Self {
165 mailbox,
166 state: state.clone(),
167 checkpoint_store,
168 object_cache_reader: state.get_object_cache_reader().clone(),
169 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
170 tx_manager: state.transaction_manager().clone(),
171 accumulator,
172 config,
173 metrics,
174 }
175 }
176
177 pub fn new_for_tests(
178 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
179 checkpoint_store: Arc<CheckpointStore>,
180 state: Arc<AuthorityState>,
181 accumulator: Arc<StateAccumulator>,
182 ) -> Self {
183 Self::new(
184 mailbox,
185 checkpoint_store,
186 state,
187 accumulator,
188 Default::default(),
189 CheckpointExecutorMetrics::new_for_tests(),
190 )
191 }
192
193 pub async fn run_epoch(
197 &mut self,
198 epoch_store: Arc<AuthorityPerEpochStore>,
199 run_with_range: Option<RunWithRange>,
200 ) -> StopReason {
201 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(epoch_store.epoch())) {
205 info!(
206 "RunWithRange condition satisfied at {:?}, run_epoch={:?}",
207 run_with_range,
208 epoch_store.epoch()
209 );
210 return StopReason::RunWithRangeCondition;
211 };
212
213 debug!(
214 "Checkpoint executor running for epoch {}",
215 epoch_store.epoch(),
216 );
217 self.metrics
218 .checkpoint_exec_epoch
219 .set(epoch_store.epoch() as i64);
220
221 let mut highest_executed = self
225 .checkpoint_store
226 .get_highest_executed_checkpoint()
227 .unwrap();
228
229 if let Some(highest_executed) = &highest_executed {
230 if epoch_store.epoch() == highest_executed.epoch()
231 && highest_executed.is_last_checkpoint_of_epoch()
232 {
233 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
236 return StopReason::EpochComplete;
237 }
238 }
239
240 let mut next_to_schedule = highest_executed
241 .as_ref()
242 .map(|c| c.sequence_number() + 1)
243 .unwrap_or_else(|| {
244 assert_eq!(epoch_store.epoch(), 0);
246 0
247 });
248 let mut pending: CheckpointExecutionBuffer = FuturesOrdered::new();
249
250 let mut now_time = Instant::now();
251 let mut now_transaction_num = highest_executed
252 .as_ref()
253 .map(|c| c.network_total_transactions)
254 .unwrap_or(0);
255 let scheduling_timeout_config = get_scheduling_timeout();
256
257 loop {
258 let schedule_scope = iota_metrics::monitored_scope("ScheduleCheckpointExecution");
259
260 if self
267 .check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
268 .await
269 {
270 self.checkpoint_store
271 .prune_local_summaries()
272 .tap_err(|e| error!("Failed to prune local summaries: {}", e))
273 .ok();
274
275 assert!(
277 pending.is_empty(),
278 "Pending checkpoint execution buffer should be empty after processing last checkpoint of epoch",
279 );
280 fail_point_async!("crash");
281 debug!(epoch = epoch_store.epoch(), "finished epoch");
282 return StopReason::EpochComplete;
283 }
284
285 self.schedule_synced_checkpoints(
286 &mut pending,
287 &mut next_to_schedule,
290 epoch_store.clone(),
291 run_with_range,
292 );
293
294 self.metrics
295 .checkpoint_exec_inflight
296 .set(pending.len() as i64);
297
298 let panic_timeout = scheduling_timeout_config.panic_timeout;
299 let warning_timeout = scheduling_timeout_config.warning_timeout;
300
301 drop(schedule_scope);
302 tokio::select! {
303 Some(Ok((checkpoint, checkpoint_acc, tx_digests))) = pending.next() => {
308 let _process_scope = iota_metrics::monitored_scope("ProcessExecutedCheckpoint");
309
310 self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, &tx_digests).await;
311 highest_executed = Some(checkpoint.clone());
312
313 let elapsed = now_time.elapsed().as_millis();
315 let current_transaction_num = highest_executed.as_ref().map(|c| c.network_total_transactions).unwrap_or(0);
316 if current_transaction_num - now_transaction_num > 10_000 || elapsed > 30_000 {
317 let tps = (1000.0 * (current_transaction_num - now_transaction_num) as f64 / elapsed as f64) as i32;
318 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
319 now_time = Instant::now();
320 now_transaction_num = current_transaction_num;
321 }
322 if run_with_range.is_some_and(|rwr| rwr.matches_checkpoint(checkpoint.sequence_number)) {
324 info!(
325 "RunWithRange condition satisfied after checkpoint sequence number {:?}",
326 checkpoint.sequence_number
327 );
328 return StopReason::RunWithRangeCondition;
329 }
330 }
331
332 received = self.mailbox.recv() => match received {
333 Ok(checkpoint) => {
334 debug!(
335 sequence_number = ?checkpoint.sequence_number,
336 "Received checkpoint summary from state sync"
337 );
338 checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
339 },
340 Err(RecvError::Lagged(num_skipped)) => {
341 debug!(
342 "Checkpoint Execution Recv channel overflowed with {:?} messages",
343 num_skipped,
344 );
345 }
346 Err(RecvError::Closed) => {
347 panic!("Checkpoint Execution Sender (StateSync) closed channel unexpectedly");
348 },
349 },
350
351 _ = tokio::time::sleep(warning_timeout) => {
352 warn!(
353 "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
354 );
355 }
356
357 _ = panic_timeout
358 .map(|d| Either::Left(tokio::time::sleep(d)))
359 .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
360 panic!("No new synced checkpoints received for {panic_timeout:?} on node {:?}", self.state.name);
361 },
362 }
363 }
364 }
365
366 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
367 let seq = *checkpoint.sequence_number();
369 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
370 if let Some(prev_highest) = self
371 .checkpoint_store
372 .get_highest_executed_checkpoint_seq_number()
373 .unwrap()
374 {
375 assert_eq!(prev_highest + 1, seq);
376 } else {
377 assert_eq!(seq, 0);
378 }
379 if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
380 info!("Finished syncing and executing checkpoint {}", seq);
381 }
382
383 fail_point!("highest-executed-checkpoint");
384
385 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
388 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
389 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
390 if let Some(prune_checkpoint) = self
391 .checkpoint_store
392 .get_checkpoint_by_sequence_number(prune_seq)
393 .expect("Failed to fetch checkpoint")
394 {
395 self.checkpoint_store
396 .delete_full_checkpoint_contents(prune_seq)
397 .expect("Failed to delete full checkpoint contents");
398 self.checkpoint_store
399 .delete_contents_digest_sequence_number_mapping(
400 &prune_checkpoint.content_digest,
401 )
402 .expect("Failed to delete contents digest -> sequence number mapping");
403 } else {
404 debug!(
408 "Failed to fetch checkpoint with sequence number {:?}",
409 prune_seq
410 );
411 }
412 }
413
414 self.checkpoint_store
415 .update_highest_executed_checkpoint(checkpoint)
416 .unwrap();
417 self.metrics.last_executed_checkpoint.set(seq as i64);
418
419 self.metrics
420 .last_executed_checkpoint_timestamp_ms
421 .set(checkpoint.timestamp_ms as i64);
422 checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
423 }
424
425 #[instrument(level = "debug", skip_all)]
429 async fn process_executed_checkpoint(
430 &self,
431 epoch_store: &AuthorityPerEpochStore,
432 checkpoint: &VerifiedCheckpoint,
433 checkpoint_acc: Option<Accumulator>,
434 all_tx_digests: &[TransactionDigest],
435 ) {
436 let cache_commit = self.state.get_cache_commit();
438 debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
439 cache_commit
440 .commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
441 .await
442 .expect("commit_transaction_outputs cannot fail");
443
444 epoch_store
445 .handle_committed_transactions(all_tx_digests)
446 .expect("cannot fail");
447
448 if !checkpoint.is_last_checkpoint_of_epoch() {
449 self.accumulator
450 .accumulate_running_root(epoch_store, checkpoint.sequence_number, checkpoint_acc)
451 .await
452 .expect("Failed to accumulate running root");
453 self.bump_highest_executed_checkpoint(checkpoint);
454 }
455 }
456
457 #[instrument(level = "debug", skip_all)]
458 fn schedule_synced_checkpoints(
459 &self,
460 pending: &mut CheckpointExecutionBuffer,
461 next_to_schedule: &mut CheckpointSequenceNumber,
462 epoch_store: Arc<AuthorityPerEpochStore>,
463 run_with_range: Option<RunWithRange>,
464 ) {
465 let Some(latest_synced_checkpoint) = self
466 .checkpoint_store
467 .get_highest_synced_checkpoint()
468 .expect("Failed to read highest synced checkpoint")
469 else {
470 debug!("No checkpoints to schedule, highest synced checkpoint is None",);
471 return;
472 };
473
474 while *next_to_schedule <= *latest_synced_checkpoint.sequence_number()
475 && pending.len() < self.config.checkpoint_execution_max_concurrency
476 {
477 let checkpoint = self
478 .checkpoint_store
479 .get_checkpoint_by_sequence_number(*next_to_schedule)
480 .unwrap()
481 .unwrap_or_else(|| {
482 panic!(
483 "Checkpoint sequence number {:?} does not exist in checkpoint store",
484 *next_to_schedule
485 )
486 });
487 if checkpoint.epoch() > epoch_store.epoch() {
488 return;
489 }
490 match run_with_range {
491 Some(RunWithRange::Checkpoint(seq)) if *next_to_schedule > seq => {
492 debug!(
493 "RunWithRange Checkpoint {} is set, not scheduling checkpoint {}",
494 seq, *next_to_schedule
495 );
496 return;
497 }
498 _ => {
499 self.schedule_checkpoint(checkpoint, pending, epoch_store.clone());
500 *next_to_schedule += 1;
501 }
502 }
503 }
504 }
505
506 #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
507 fn schedule_checkpoint(
508 &self,
509 checkpoint: VerifiedCheckpoint,
510 pending: &mut CheckpointExecutionBuffer,
511 epoch_store: Arc<AuthorityPerEpochStore>,
512 ) {
513 debug!("Scheduling checkpoint for execution");
514
515 let checkpoint_epoch = checkpoint.epoch();
518 assert_eq!(
519 checkpoint_epoch,
520 epoch_store.epoch(),
521 "Epoch mismatch after startup recovery. checkpoint epoch: {:?}, node epoch: {:?}",
522 checkpoint_epoch,
523 epoch_store.epoch(),
524 );
525
526 let metrics = self.metrics.clone();
527 let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
528 let data_ingestion_dir = self.config.data_ingestion_dir.clone();
529 let checkpoint_store = self.checkpoint_store.clone();
530 let object_cache_reader = self.object_cache_reader.clone();
531 let transaction_cache_reader = self.transaction_cache_reader.clone();
532 let tx_manager = self.tx_manager.clone();
533 let accumulator = self.accumulator.clone();
534 let state = self.state.clone();
535
536 epoch_store.notify_synced_checkpoint(*checkpoint.sequence_number());
537
538 pending.push_back(spawn_monitored_task!(async move {
539 let epoch_store = epoch_store.clone();
540 let (tx_digests, checkpoint_acc) = loop {
541 match execute_checkpoint(
542 checkpoint.clone(),
543 &state,
544 object_cache_reader.as_ref(),
545 transaction_cache_reader.as_ref(),
546 checkpoint_store.clone(),
547 epoch_store.clone(),
548 tx_manager.clone(),
549 accumulator.clone(),
550 local_execution_timeout_sec,
551 &metrics,
552 data_ingestion_dir.clone(),
553 )
554 .await
555 {
556 Err(err) => {
557 error!(
558 "Error while executing checkpoint, will retry in 1s: {:?}",
559 err
560 );
561 tokio::time::sleep(Duration::from_secs(1)).await;
562 metrics.checkpoint_exec_errors.inc();
563 }
564 Ok((tx_digests, checkpoint_acc)) => break (tx_digests, checkpoint_acc),
565 }
566 };
567 (checkpoint, checkpoint_acc, tx_digests)
568 }));
569 }
570
571 #[instrument(level = "info", skip_all)]
572 async fn execute_change_epoch_tx(
573 &self,
574 execution_digests: ExecutionDigests,
575 change_epoch_tx_digest: TransactionDigest,
576 change_epoch_tx: VerifiedExecutableTransaction,
577 epoch_store: Arc<AuthorityPerEpochStore>,
578 checkpoint: VerifiedCheckpoint,
579 ) {
580 let change_epoch_fx = self
581 .transaction_cache_reader
582 .get_effects(&execution_digests.effects)
583 .expect("Fetching effects for change_epoch tx cannot fail")
584 .expect("Change_epoch tx effects must exist");
585
586 if change_epoch_tx.contains_shared_object() {
587 epoch_store
588 .acquire_shared_version_assignments_from_effects(
589 &change_epoch_tx,
590 &change_epoch_fx,
591 self.object_cache_reader.as_ref(),
592 )
593 .await
594 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
595 }
596
597 self.tx_manager.enqueue_with_expected_effects_digest(
598 vec![(change_epoch_tx.clone(), execution_digests.effects)],
599 &epoch_store,
600 );
601 handle_execution_effects(
602 &self.state,
603 vec![execution_digests],
604 vec![change_epoch_tx_digest],
605 checkpoint.clone(),
606 self.checkpoint_store.clone(),
607 self.object_cache_reader.as_ref(),
608 self.transaction_cache_reader.as_ref(),
609 epoch_store.clone(),
610 self.tx_manager.clone(),
611 self.accumulator.clone(),
612 self.config.local_execution_timeout_sec,
613 self.config.data_ingestion_dir.clone(),
614 )
615 .await;
616 }
617
618 pub async fn check_epoch_last_checkpoint(
622 &self,
623 epoch_store: Arc<AuthorityPerEpochStore>,
624 checkpoint: &Option<VerifiedCheckpoint>,
625 ) -> bool {
626 let cur_epoch = epoch_store.epoch();
627
628 if let Some(checkpoint) = checkpoint {
629 if checkpoint.epoch() == cur_epoch {
630 if let Some((change_epoch_execution_digests, change_epoch_tx)) =
631 extract_end_of_epoch_tx(
632 checkpoint,
633 self.transaction_cache_reader.as_ref(),
634 self.checkpoint_store.clone(),
635 epoch_store.clone(),
636 )
637 {
638 let change_epoch_tx_digest = change_epoch_execution_digests.transaction;
639
640 info!(
641 ended_epoch = cur_epoch,
642 last_checkpoint = checkpoint.sequence_number(),
643 "Reached end of epoch, executing change_epoch transaction",
644 );
645
646 self.execute_change_epoch_tx(
647 change_epoch_execution_digests,
648 change_epoch_tx_digest,
649 change_epoch_tx,
650 epoch_store.clone(),
651 checkpoint.clone(),
652 )
653 .await;
654
655 let cache_commit = self.state.get_cache_commit();
656 cache_commit
657 .commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
658 .await
659 .expect("commit_transaction_outputs cannot fail");
660 fail_point_async!("prune-and-compact");
661
662 let all_tx_digests: Vec<_> = self
668 .checkpoint_store
669 .get_checkpoint_contents(&checkpoint.content_digest)
670 .expect("read cannot fail")
671 .expect("Checkpoint contents should exist")
672 .iter()
673 .map(|digests| digests.transaction)
674 .collect();
675
676 let effects = self
677 .transaction_cache_reader
678 .notify_read_executed_effects(&all_tx_digests)
679 .await
680 .expect("Failed to get executed effects for finalizing checkpoint");
681
682 finalize_checkpoint(
683 &self.state,
684 self.object_cache_reader.as_ref(),
685 self.transaction_cache_reader.as_ref(),
686 self.checkpoint_store.clone(),
687 &all_tx_digests,
688 &epoch_store,
689 checkpoint.clone(),
690 self.accumulator.clone(),
691 effects,
692 self.config.data_ingestion_dir.clone(),
693 )
694 .await
695 .expect("Finalizing checkpoint cannot fail");
696
697 self.checkpoint_store
698 .insert_epoch_last_checkpoint(cur_epoch, checkpoint)
699 .expect("Failed to insert epoch last checkpoint");
700
701 self.accumulator
702 .accumulate_running_root(&epoch_store, checkpoint.sequence_number, None)
703 .await
704 .expect("Failed to accumulate running root");
705 self.accumulator
706 .accumulate_epoch(epoch_store.clone(), *checkpoint.sequence_number())
707 .await
708 .expect("Accumulating epoch cannot fail");
709
710 self.bump_highest_executed_checkpoint(checkpoint);
711
712 return true;
713 }
714 }
715 }
716 false
717 }
718}
719
720#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
723async fn execute_checkpoint(
724 checkpoint: VerifiedCheckpoint,
725 state: &AuthorityState,
726 object_cache_reader: &dyn ObjectCacheRead,
727 transaction_cache_reader: &dyn TransactionCacheRead,
728 checkpoint_store: Arc<CheckpointStore>,
729 epoch_store: Arc<AuthorityPerEpochStore>,
730 transaction_manager: Arc<TransactionManager>,
731 accumulator: Arc<StateAccumulator>,
732 local_execution_timeout_sec: u64,
733 metrics: &Arc<CheckpointExecutorMetrics>,
734 data_ingestion_dir: Option<PathBuf>,
735) -> IotaResult<(Vec<TransactionDigest>, Option<Accumulator>)> {
736 debug!("Preparing checkpoint for execution",);
737 let prepare_start = Instant::now();
738
739 let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
746 get_unexecuted_transactions(
747 checkpoint.clone(),
748 transaction_cache_reader,
749 checkpoint_store.clone(),
750 epoch_store.clone(),
751 );
752
753 let tx_count = execution_digests.len();
754 debug!("Number of transactions in the checkpoint: {:?}", tx_count);
755 metrics
756 .checkpoint_transaction_count
757 .observe(tx_count as f64);
758
759 let checkpoint_acc = execute_transactions(
760 execution_digests,
761 all_tx_digests.clone(),
762 executable_txns,
763 state,
764 object_cache_reader,
765 transaction_cache_reader,
766 checkpoint_store.clone(),
767 epoch_store.clone(),
768 transaction_manager,
769 accumulator,
770 local_execution_timeout_sec,
771 checkpoint,
772 metrics,
773 prepare_start,
774 data_ingestion_dir,
775 )
776 .await?;
777
778 if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
783 for round in randomness_rounds {
784 debug!(
785 ?round,
786 "notifying RandomnessReporter that randomness update was executed in checkpoint"
787 );
788 randomness_reporter.notify_randomness_in_checkpoint(round)?;
789 }
790 }
791
792 Ok((all_tx_digests, checkpoint_acc))
793}
794
795#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
796async fn handle_execution_effects(
797 state: &AuthorityState,
798 execution_digests: Vec<ExecutionDigests>,
799 all_tx_digests: Vec<TransactionDigest>,
800 checkpoint: VerifiedCheckpoint,
801 checkpoint_store: Arc<CheckpointStore>,
802 object_cache_reader: &dyn ObjectCacheRead,
803 transaction_cache_reader: &dyn TransactionCacheRead,
804 epoch_store: Arc<AuthorityPerEpochStore>,
805 transaction_manager: Arc<TransactionManager>,
806 accumulator: Arc<StateAccumulator>,
807 local_execution_timeout_sec: u64,
808 data_ingestion_dir: Option<PathBuf>,
809) -> Option<Accumulator> {
810 let mut periods = 1;
812 let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
813 let mut blocking_execution = false;
815 loop {
816 let effects_future = transaction_cache_reader.notify_read_executed_effects(&all_tx_digests);
817
818 match timeout(log_timeout_sec, effects_future).await {
819 Err(_elapsed) => {
820 let highest_seq = checkpoint_store
822 .get_highest_executed_checkpoint_seq_number()
823 .unwrap()
824 .unwrap_or_default();
825 if checkpoint.sequence_number <= highest_seq {
826 error!(
827 "Re-executing checkpoint {} after higher checkpoint {} has executed!",
828 checkpoint.sequence_number, highest_seq
829 );
830 continue;
831 }
832 if checkpoint.sequence_number > highest_seq + 1 {
833 trace!(
834 "Checkpoint {} is still executing. Highest executed = {}",
835 checkpoint.sequence_number, highest_seq
836 );
837 continue;
838 }
839 if !blocking_execution {
840 trace!(
841 "Checkpoint {} is next to execute.",
842 checkpoint.sequence_number
843 );
844 blocking_execution = true;
845 continue;
846 }
847
848 let missing_digests: Vec<TransactionDigest> = transaction_cache_reader
851 .multi_get_executed_effects_digests(&all_tx_digests)
852 .expect("multi_get_executed_effects cannot fail")
853 .iter()
854 .zip(all_tx_digests.clone())
855 .filter_map(
856 |(fx, digest)| {
857 if fx.is_none() { Some(digest) } else { None }
858 },
859 )
860 .collect();
861
862 if missing_digests.is_empty() {
863 continue;
865 }
866
867 warn!(
868 "Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
869 missing_digests,
870 log_timeout_sec * periods,
871 );
872
873 let pending_digest = missing_digests.first().unwrap();
876 if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
877 warn!(
878 "Transaction {pending_digest:?} has missing input objects {missing_input:?}",
879 );
880 }
881 periods += 1;
882 }
883 Ok(Err(err)) => panic!("Failed to notify_read_executed_effects: {:?}", err),
884 Ok(Ok(effects)) => {
885 for (tx_digest, expected_digest, actual_effects) in
886 izip!(&all_tx_digests, &execution_digests, &effects)
887 {
888 let expected_effects_digest = &expected_digest.effects;
889 assert_not_forked(
890 &checkpoint,
891 tx_digest,
892 expected_effects_digest,
893 &actual_effects.digest(),
894 transaction_cache_reader,
895 );
896 }
897
898 if checkpoint.end_of_epoch_data.is_none() {
901 return Some(
902 finalize_checkpoint(
903 state,
904 object_cache_reader,
905 transaction_cache_reader,
906 checkpoint_store.clone(),
907 &all_tx_digests,
908 &epoch_store,
909 checkpoint.clone(),
910 accumulator.clone(),
911 effects,
912 data_ingestion_dir,
913 )
914 .await
915 .expect("Finalizing checkpoint cannot fail"),
916 );
917 } else {
918 return None;
919 }
920 }
921 }
922 }
923}
924
925fn assert_not_forked(
926 checkpoint: &VerifiedCheckpoint,
927 tx_digest: &TransactionDigest,
928 expected_digest: &TransactionEffectsDigest,
929 actual_effects_digest: &TransactionEffectsDigest,
930 cache_reader: &dyn TransactionCacheRead,
931) {
932 if *expected_digest != *actual_effects_digest {
933 let actual_effects = cache_reader
934 .get_executed_effects(tx_digest)
935 .expect("get_executed_effects cannot fail")
936 .expect("actual effects should exist");
937
938 error!(
940 ?checkpoint,
941 ?tx_digest,
942 ?expected_digest,
943 ?actual_effects,
944 "fork detected!"
945 );
946 panic!(
947 "When executing checkpoint {}, transaction {} \
948 is expected to have effects digest {}, but got {}!",
949 checkpoint.sequence_number(),
950 tx_digest,
951 expected_digest,
952 actual_effects_digest,
953 );
954 }
955}
956
957fn extract_end_of_epoch_tx(
959 checkpoint: &VerifiedCheckpoint,
960 cache_reader: &dyn TransactionCacheRead,
961 checkpoint_store: Arc<CheckpointStore>,
962 epoch_store: Arc<AuthorityPerEpochStore>,
963) -> Option<(ExecutionDigests, VerifiedExecutableTransaction)> {
964 checkpoint.end_of_epoch_data.as_ref()?;
965
966 let checkpoint_sequence = checkpoint.sequence_number();
970 let execution_digests = checkpoint_store
971 .get_checkpoint_contents(&checkpoint.content_digest)
972 .expect("Failed to get checkpoint contents from store")
973 .unwrap_or_else(|| {
974 panic!(
975 "Checkpoint contents for digest {:?} does not exist",
976 checkpoint.content_digest
977 )
978 })
979 .into_inner();
980
981 let digests = execution_digests
982 .last()
983 .expect("Final checkpoint must have at least one transaction");
984
985 let change_epoch_tx = cache_reader
986 .get_transaction_block(&digests.transaction)
987 .expect("read cannot fail");
988
989 let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
990 (*change_epoch_tx.unwrap_or_else(||
991 panic!(
992 "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
993 digests.transaction,
994 )
995 )).clone(),
996 epoch_store.epoch(),
997 *checkpoint_sequence,
998 );
999
1000 assert!(
1001 change_epoch_tx
1002 .data()
1003 .intent_message()
1004 .value
1005 .is_end_of_epoch_tx()
1006 );
1007
1008 Some((*digests, change_epoch_tx))
1009}
1010
1011#[expect(clippy::type_complexity)]
1015fn get_unexecuted_transactions(
1016 checkpoint: VerifiedCheckpoint,
1017 cache_reader: &dyn TransactionCacheRead,
1018 checkpoint_store: Arc<CheckpointStore>,
1019 epoch_store: Arc<AuthorityPerEpochStore>,
1020) -> (
1021 Vec<ExecutionDigests>,
1022 Vec<TransactionDigest>,
1023 Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1024 Vec<RandomnessRound>,
1025) {
1026 let checkpoint_sequence = checkpoint.sequence_number();
1027 let full_contents = checkpoint_store
1028 .get_full_checkpoint_contents_by_sequence_number(*checkpoint_sequence)
1029 .expect("Failed to get checkpoint contents from store")
1030 .tap_some(|_| {
1031 debug!("loaded full checkpoint contents in bulk for sequence {checkpoint_sequence}")
1032 });
1033
1034 let mut execution_digests = checkpoint_store
1035 .get_checkpoint_contents(&checkpoint.content_digest)
1036 .expect("Failed to get checkpoint contents from store")
1037 .unwrap_or_else(|| {
1038 panic!(
1039 "Checkpoint contents for digest {:?} does not exist",
1040 checkpoint.content_digest
1041 )
1042 })
1043 .into_inner();
1044
1045 let full_contents_txns = full_contents.map(|c| {
1046 c.into_iter()
1047 .zip(execution_digests.iter())
1048 .map(|(txn, digests)| (digests.transaction, txn))
1049 .collect::<HashMap<_, _>>()
1050 });
1051
1052 checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
1055 let change_epoch_tx_digest = execution_digests
1056 .pop()
1057 .expect("Final checkpoint must have at least one transaction")
1058 .transaction;
1059
1060 let change_epoch_tx = cache_reader
1061 .get_transaction_block(&change_epoch_tx_digest)
1062 .expect("read cannot fail")
1063 .unwrap_or_else(||
1064 panic!(
1065 "state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}",
1066 checkpoint.sequence_number()
1067 )
1068 );
1069 assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
1070 });
1071
1072 let randomness_rounds = if let Some(version_specific_data) = checkpoint
1073 .version_specific_data(epoch_store.protocol_config())
1074 .expect("unable to get version_specific_data")
1075 {
1076 version_specific_data.into_v1().randomness_rounds
1079 } else {
1080 assert_eq!(
1085 0,
1086 epoch_store
1087 .protocol_config()
1088 .min_checkpoint_interval_ms_as_option()
1089 .unwrap_or_default(),
1090 );
1091 if let Some(first_digest) = execution_digests.first() {
1092 let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
1093 .expect("read cannot fail")
1094 .unwrap_or_else(||
1095 panic!(
1096 "state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
1097 checkpoint.sequence_number()
1098 )
1099 );
1100 if let TransactionKind::RandomnessStateUpdate(rsu) =
1101 maybe_randomness_tx.data().transaction_data().kind()
1102 {
1103 vec![rsu.randomness_round]
1104 } else {
1105 Vec::new()
1106 }
1107 } else {
1108 Vec::new()
1109 }
1110 };
1111
1112 let all_tx_digests: Vec<TransactionDigest> =
1113 execution_digests.iter().map(|tx| tx.transaction).collect();
1114
1115 let executed_effects_digests = cache_reader
1116 .multi_get_executed_effects_digests(&all_tx_digests)
1117 .expect("failed to read executed_effects from store");
1118
1119 let (unexecuted_txns, expected_effects_digests): (Vec<_>, Vec<_>) =
1120 izip!(execution_digests.iter(), executed_effects_digests.iter())
1121 .filter_map(|(digests, effects_digest)| match effects_digest {
1122 None => Some((digests.transaction, digests.effects)),
1123 Some(actual_effects_digest) => {
1124 let tx_digest = &digests.transaction;
1125 let effects_digest = &digests.effects;
1126 trace!(
1127 "Transaction with digest {:?} has already been executed",
1128 tx_digest
1129 );
1130 assert_not_forked(
1131 &checkpoint,
1132 tx_digest,
1133 effects_digest,
1134 actual_effects_digest,
1135 cache_reader,
1136 );
1137 None
1138 }
1139 })
1140 .unzip();
1141
1142 let executable_txns: Vec<_> = if let Some(full_contents_txns) = full_contents_txns {
1144 unexecuted_txns
1145 .into_iter()
1146 .zip(expected_effects_digests)
1147 .map(|(tx_digest, expected_effects_digest)| {
1148 let tx = &full_contents_txns.get(&tx_digest).unwrap().transaction;
1149 (
1150 VerifiedExecutableTransaction::new_from_checkpoint(
1151 VerifiedTransaction::new_unchecked(tx.clone()),
1152 epoch_store.epoch(),
1153 *checkpoint_sequence,
1154 ),
1155 expected_effects_digest,
1156 )
1157 })
1158 .collect()
1159 } else {
1160 cache_reader
1161 .multi_get_transaction_blocks(&unexecuted_txns)
1162 .expect("Failed to get checkpoint txes from store")
1163 .into_iter()
1164 .zip(expected_effects_digests)
1165 .enumerate()
1166 .map(|(i, (tx, expected_effects_digest))| {
1167 let tx = tx.unwrap_or_else(||
1168 panic!(
1169 "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
1170 unexecuted_txns[i]
1171 )
1172 );
1173 assert!(!tx.data().intent_message().value.is_end_of_epoch_tx());
1175 (
1176 VerifiedExecutableTransaction::new_from_checkpoint(
1177 Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()),
1178 epoch_store.epoch(),
1179 *checkpoint_sequence,
1180 ),
1181 expected_effects_digest
1182 )
1183 })
1184 .collect()
1185 };
1186
1187 (
1188 execution_digests,
1189 all_tx_digests,
1190 executable_txns,
1191 randomness_rounds,
1192 )
1193}
1194
1195#[instrument(level = "debug", skip_all)]
1198async fn execute_transactions(
1199 execution_digests: Vec<ExecutionDigests>,
1200 all_tx_digests: Vec<TransactionDigest>,
1201 executable_txns: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1202 state: &AuthorityState,
1203 object_cache_reader: &dyn ObjectCacheRead,
1204 transaction_cache_reader: &dyn TransactionCacheRead,
1205 checkpoint_store: Arc<CheckpointStore>,
1206 epoch_store: Arc<AuthorityPerEpochStore>,
1207 transaction_manager: Arc<TransactionManager>,
1208 state_accumulator: Arc<StateAccumulator>,
1209 local_execution_timeout_sec: u64,
1210 checkpoint: VerifiedCheckpoint,
1211 metrics: &Arc<CheckpointExecutorMetrics>,
1212 prepare_start: Instant,
1213 data_ingestion_dir: Option<PathBuf>,
1214) -> IotaResult<Option<Accumulator>> {
1215 let effects_digests: HashMap<_, _> = execution_digests
1216 .iter()
1217 .map(|digest| (digest.transaction, digest.effects))
1218 .collect();
1219
1220 let shared_effects_digests = executable_txns
1221 .iter()
1222 .filter(|(tx, _)| tx.contains_shared_object())
1223 .map(|(tx, _)| {
1224 *effects_digests
1225 .get(tx.digest())
1226 .expect("Transaction digest not found in effects_digests")
1227 })
1228 .collect::<Vec<_>>();
1229
1230 let digest_to_effects: HashMap<TransactionDigest, TransactionEffects> =
1231 transaction_cache_reader
1232 .multi_get_effects(&shared_effects_digests)?
1233 .into_iter()
1234 .zip(shared_effects_digests)
1235 .map(|(fx, fx_digest)| {
1236 if fx.is_none() {
1237 panic!(
1238 "Transaction effects for effects digest {:?} do not exist in effects table",
1239 fx_digest
1240 );
1241 }
1242 let fx = fx.unwrap();
1243 (*fx.transaction_digest(), fx)
1244 })
1245 .collect();
1246
1247 for (tx, _) in &executable_txns {
1248 if tx.contains_shared_object() {
1249 epoch_store
1250 .acquire_shared_version_assignments_from_effects(
1251 tx,
1252 digest_to_effects.get(tx.digest()).unwrap(),
1253 object_cache_reader,
1254 )
1255 .await?;
1256 }
1257 }
1258
1259 let prepare_elapsed = prepare_start.elapsed();
1260 metrics
1261 .checkpoint_prepare_latency
1262 .observe(prepare_elapsed.as_secs_f64());
1263 if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1264 info!(
1265 "Checkpoint preparation for execution took {:?}",
1266 prepare_elapsed
1267 );
1268 }
1269
1270 let exec_start = Instant::now();
1271 transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);
1272
1273 let checkpoint_acc = handle_execution_effects(
1274 state,
1275 execution_digests,
1276 all_tx_digests,
1277 checkpoint.clone(),
1278 checkpoint_store,
1279 object_cache_reader,
1280 transaction_cache_reader,
1281 epoch_store,
1282 transaction_manager,
1283 state_accumulator,
1284 local_execution_timeout_sec,
1285 data_ingestion_dir,
1286 )
1287 .await;
1288
1289 let exec_elapsed = exec_start.elapsed();
1290 metrics
1291 .checkpoint_exec_latency
1292 .observe(exec_elapsed.as_secs_f64());
1293 if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1294 info!("Checkpoint execution took {:?}", exec_elapsed);
1295 }
1296
1297 Ok(checkpoint_acc)
1298}
1299
1300#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
1301async fn finalize_checkpoint(
1302 state: &AuthorityState,
1303 object_cache_reader: &dyn ObjectCacheRead,
1304 transaction_cache_reader: &dyn TransactionCacheRead,
1305 checkpoint_store: Arc<CheckpointStore>,
1306 tx_digests: &[TransactionDigest],
1307 epoch_store: &Arc<AuthorityPerEpochStore>,
1308 checkpoint: VerifiedCheckpoint,
1309 accumulator: Arc<StateAccumulator>,
1310 effects: Vec<TransactionEffects>,
1311 data_ingestion_dir: Option<PathBuf>,
1312) -> IotaResult<Accumulator> {
1313 debug!("finalizing checkpoint");
1314 epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;
1315
1316 state
1318 .get_checkpoint_cache()
1319 .insert_finalized_transactions_perpetual_checkpoints(
1320 tx_digests,
1321 epoch_store.epoch(),
1322 checkpoint.sequence_number,
1323 )?;
1324
1325 let checkpoint_acc =
1326 accumulator.accumulate_checkpoint(effects, checkpoint.sequence_number, epoch_store)?;
1327
1328 if data_ingestion_dir.is_some() || state.rest_index.is_some() {
1329 let checkpoint_data = load_checkpoint_data(
1330 checkpoint,
1331 object_cache_reader,
1332 transaction_cache_reader,
1333 checkpoint_store,
1334 tx_digests,
1335 )?;
1336
1337 if let Some(rest_index) = &state.rest_index {
1341 let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
1342 PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
1343 ));
1344
1345 rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
1346 }
1347
1348 if let Some(path) = data_ingestion_dir {
1349 store_checkpoint_locally(path, &checkpoint_data)?;
1350 }
1351 }
1352 Ok(checkpoint_acc)
1353}