1use std::{
25 collections::HashMap,
26 path::PathBuf,
27 sync::Arc,
28 time::{Duration, Instant},
29};
30
31use either::Either;
32use futures::stream::FuturesOrdered;
33use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
34use iota_macros::{fail_point, fail_point_async};
35use iota_metrics::spawn_monitored_task;
36use iota_types::{
37 accumulator::Accumulator,
38 base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
39 crypto::RandomnessRound,
40 effects::{TransactionEffects, TransactionEffectsAPI},
41 error::IotaResult,
42 executable_transaction::VerifiedExecutableTransaction,
43 full_checkpoint_content::CheckpointData,
44 inner_temporary_store::PackageStoreWithFallback,
45 message_envelope::Message,
46 messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
47 transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
48};
49use itertools::izip;
50use tap::{TapFallible, TapOptional};
51use tokio::{
52 sync::broadcast::{self, error::RecvError},
53 task::JoinHandle,
54 time::timeout,
55};
56use tokio_stream::StreamExt;
57use tracing::{debug, error, info, instrument, trace, warn};
58
59use self::metrics::CheckpointExecutorMetrics;
60use crate::{
61 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
62 checkpoints::{
63 CheckpointStore,
64 checkpoint_executor::data_ingestion_handler::{
65 load_checkpoint_data, store_checkpoint_locally,
66 },
67 },
68 execution_cache::{ObjectCacheRead, TransactionCacheRead},
69 state_accumulator::StateAccumulator,
70 transaction_manager::TransactionManager,
71};
72
73mod data_ingestion_handler;
74pub mod metrics;
75
76#[cfg(test)]
77pub(crate) mod tests;
78
79type CheckpointExecutionBuffer = FuturesOrdered<
80 JoinHandle<(
81 VerifiedCheckpoint,
82 Option<Accumulator>,
83 Option<CheckpointData>,
84 Vec<TransactionDigest>,
85 )>,
86>;
87
88const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
90
91#[derive(Debug, Clone, Copy)]
92pub struct CheckpointTimeoutConfig {
93 pub panic_timeout: Option<Duration>,
94 pub warning_timeout: Duration,
95}
96
97thread_local! {
102 static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
103 const { once_cell::sync::OnceCell::new() };
104}
105
106#[cfg(msim)]
107pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
108 SCHEDULING_TIMEOUT.with(|s| {
109 s.set(config).expect("SchedulingTimeoutConfig already set");
110 });
111}
112
113fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
114 fn inner() -> CheckpointTimeoutConfig {
115 let panic_timeout: Option<Duration> = if cfg!(msim) {
116 Some(Duration::from_secs(45))
117 } else {
118 std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
119 .ok()
120 .and_then(|s| s.parse::<u64>().ok())
121 .map(Duration::from_millis)
122 };
123
124 let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
125 .ok()
126 .and_then(|s| s.parse::<u64>().ok())
127 .map(Duration::from_millis)
128 .unwrap_or(Duration::from_secs(5));
129
130 CheckpointTimeoutConfig {
131 panic_timeout,
132 warning_timeout,
133 }
134 }
135
136 SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
137}
138
139#[derive(PartialEq, Eq, Debug)]
140pub enum StopReason {
141 EpochComplete,
142 RunWithRangeCondition,
143}
144
145pub struct CheckpointExecutor {
146 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
147 state: Arc<AuthorityState>,
148 checkpoint_store: Arc<CheckpointStore>,
149 object_cache_reader: Arc<dyn ObjectCacheRead>,
150 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
151 tx_manager: Arc<TransactionManager>,
152 accumulator: Arc<StateAccumulator>,
153 config: CheckpointExecutorConfig,
154 metrics: Arc<CheckpointExecutorMetrics>,
155}
156
157impl CheckpointExecutor {
158 pub fn new(
159 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
160 checkpoint_store: Arc<CheckpointStore>,
161 state: Arc<AuthorityState>,
162 accumulator: Arc<StateAccumulator>,
163 config: CheckpointExecutorConfig,
164 metrics: Arc<CheckpointExecutorMetrics>,
165 ) -> Self {
166 Self {
167 mailbox,
168 state: state.clone(),
169 checkpoint_store,
170 object_cache_reader: state.get_object_cache_reader().clone(),
171 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
172 tx_manager: state.transaction_manager().clone(),
173 accumulator,
174 config,
175 metrics,
176 }
177 }
178
179 pub fn new_for_tests(
180 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
181 checkpoint_store: Arc<CheckpointStore>,
182 state: Arc<AuthorityState>,
183 accumulator: Arc<StateAccumulator>,
184 ) -> Self {
185 Self::new(
186 mailbox,
187 checkpoint_store,
188 state,
189 accumulator,
190 Default::default(),
191 CheckpointExecutorMetrics::new_for_tests(),
192 )
193 }
194
195 pub async fn run_epoch(
199 &mut self,
200 epoch_store: Arc<AuthorityPerEpochStore>,
201 run_with_range: Option<RunWithRange>,
202 ) -> StopReason {
203 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(epoch_store.epoch())) {
207 info!(
208 "RunWithRange condition satisfied at {:?}, run_epoch={:?}",
209 run_with_range,
210 epoch_store.epoch()
211 );
212 return StopReason::RunWithRangeCondition;
213 };
214
215 debug!(
216 "Checkpoint executor running for epoch {}",
217 epoch_store.epoch(),
218 );
219 self.metrics
220 .checkpoint_exec_epoch
221 .set(epoch_store.epoch() as i64);
222
223 let mut highest_executed = self
227 .checkpoint_store
228 .get_highest_executed_checkpoint()
229 .unwrap();
230
231 if let Some(highest_executed) = &highest_executed {
232 if epoch_store.epoch() == highest_executed.epoch()
233 && highest_executed.is_last_checkpoint_of_epoch()
234 {
235 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
238 return StopReason::EpochComplete;
239 }
240 }
241
242 let mut next_to_schedule = highest_executed
243 .as_ref()
244 .map(|c| c.sequence_number() + 1)
245 .unwrap_or_else(|| {
246 assert_eq!(epoch_store.epoch(), 0);
248 0
249 });
250 let mut pending: CheckpointExecutionBuffer = FuturesOrdered::new();
251
252 let mut now_time = Instant::now();
253 let mut now_transaction_num = highest_executed
254 .as_ref()
255 .map(|c| c.network_total_transactions)
256 .unwrap_or(0);
257 let scheduling_timeout_config = get_scheduling_timeout();
258
259 loop {
260 let schedule_scope = iota_metrics::monitored_scope("ScheduleCheckpointExecution");
261
262 if self
269 .check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
270 .await
271 {
272 self.checkpoint_store
273 .prune_local_summaries()
274 .tap_err(|e| error!("Failed to prune local summaries: {}", e))
275 .ok();
276
277 assert!(
279 pending.is_empty(),
280 "Pending checkpoint execution buffer should be empty after processing last checkpoint of epoch",
281 );
282 fail_point_async!("crash");
283 debug!(epoch = epoch_store.epoch(), "finished epoch");
284 return StopReason::EpochComplete;
285 }
286
287 self.schedule_synced_checkpoints(
288 &mut pending,
289 &mut next_to_schedule,
292 epoch_store.clone(),
293 run_with_range,
294 );
295
296 self.metrics
297 .checkpoint_exec_inflight
298 .set(pending.len() as i64);
299
300 let panic_timeout = scheduling_timeout_config.panic_timeout;
301 let warning_timeout = scheduling_timeout_config.warning_timeout;
302
303 drop(schedule_scope);
304 tokio::select! {
305 Some(Ok((checkpoint, checkpoint_acc, checkpoint_data, tx_digests))) = pending.next() => {
310 let _process_scope = iota_metrics::monitored_scope("ProcessExecutedCheckpoint");
311
312 self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, checkpoint_data, &tx_digests).await;
313 highest_executed = Some(checkpoint.clone());
314
315 let elapsed = now_time.elapsed().as_millis();
317 let current_transaction_num = highest_executed.as_ref().map(|c| c.network_total_transactions).unwrap_or(0);
318 if current_transaction_num - now_transaction_num > 10_000 || elapsed > 30_000 {
319 let tps = (1000.0 * (current_transaction_num - now_transaction_num) as f64 / elapsed as f64) as i32;
320 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
321 now_time = Instant::now();
322 now_transaction_num = current_transaction_num;
323 }
324 if run_with_range.is_some_and(|rwr| rwr.matches_checkpoint(checkpoint.sequence_number)) {
326 info!(
327 "RunWithRange condition satisfied after checkpoint sequence number {:?}",
328 checkpoint.sequence_number
329 );
330 return StopReason::RunWithRangeCondition;
331 }
332 }
333
334 received = self.mailbox.recv() => match received {
335 Ok(checkpoint) => {
336 debug!(
337 sequence_number = ?checkpoint.sequence_number,
338 "Received checkpoint summary from state sync"
339 );
340 checkpoint.report_checkpoint_age(&self.metrics.checkpoint_contents_age);
341 },
342 Err(RecvError::Lagged(num_skipped)) => {
343 debug!(
344 "Checkpoint Execution Recv channel overflowed with {:?} messages",
345 num_skipped,
346 );
347 }
348 Err(RecvError::Closed) => {
349 panic!("Checkpoint Execution Sender (StateSync) closed channel unexpectedly");
350 },
351 },
352
353 _ = tokio::time::sleep(warning_timeout) => {
354 warn!(
355 "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
356 );
357 }
358
359 _ = panic_timeout
360 .map(|d| Either::Left(tokio::time::sleep(d)))
361 .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
362 panic!("No new synced checkpoints received for {panic_timeout:?} on node {:?}", self.state.name);
363 },
364 }
365 }
366 }
367
368 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
369 let seq = *checkpoint.sequence_number();
371 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
372 if let Some(prev_highest) = self
373 .checkpoint_store
374 .get_highest_executed_checkpoint_seq_number()
375 .unwrap()
376 {
377 assert_eq!(prev_highest + 1, seq);
378 } else {
379 assert_eq!(seq, 0);
380 }
381 if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
382 info!("Finished syncing and executing checkpoint {}", seq);
383 }
384
385 fail_point!("highest-executed-checkpoint");
386
387 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
390 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
391 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
392 if let Some(prune_checkpoint) = self
393 .checkpoint_store
394 .get_checkpoint_by_sequence_number(prune_seq)
395 .expect("Failed to fetch checkpoint")
396 {
397 self.checkpoint_store
398 .delete_full_checkpoint_contents(prune_seq)
399 .expect("Failed to delete full checkpoint contents");
400 self.checkpoint_store
401 .delete_contents_digest_sequence_number_mapping(
402 &prune_checkpoint.content_digest,
403 )
404 .expect("Failed to delete contents digest -> sequence number mapping");
405 } else {
406 debug!(
410 "Failed to fetch checkpoint with sequence number {:?}",
411 prune_seq
412 );
413 }
414 }
415
416 self.checkpoint_store
417 .update_highest_executed_checkpoint(checkpoint)
418 .unwrap();
419 self.metrics.last_executed_checkpoint.set(seq as i64);
420
421 self.metrics
422 .last_executed_checkpoint_timestamp_ms
423 .set(checkpoint.timestamp_ms as i64);
424 checkpoint.report_checkpoint_age(&self.metrics.last_executed_checkpoint_age);
425 }
426
427 #[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number()))]
431 async fn process_executed_checkpoint(
432 &self,
433 epoch_store: &AuthorityPerEpochStore,
434 checkpoint: &VerifiedCheckpoint,
435 checkpoint_acc: Option<Accumulator>,
436 checkpoint_data: Option<CheckpointData>,
437 all_tx_digests: &[TransactionDigest],
438 ) {
439 let cache_commit = self.state.get_cache_commit();
441 debug!("committing checkpoint transactions to disk");
442 cache_commit
443 .commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
444 .await
445 .expect("commit_transaction_outputs cannot fail");
446
447 epoch_store
448 .handle_committed_transactions(all_tx_digests)
449 .expect("cannot fail");
450
451 if let Some(checkpoint_data) = checkpoint_data {
452 self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
453 .await;
454 }
455
456 if !checkpoint.is_last_checkpoint_of_epoch() {
457 self.accumulator
458 .accumulate_running_root(epoch_store, checkpoint.sequence_number, checkpoint_acc)
459 .await
460 .expect("Failed to accumulate running root");
461 self.bump_highest_executed_checkpoint(checkpoint);
462 }
463 }
464
465 async fn commit_index_updates_and_enqueue_to_subscription_service(
469 &self,
470 checkpoint: CheckpointData,
471 ) {
472 if let Some(rest_index) = &self.state.rest_index {
473 rest_index
474 .commit_update_for_checkpoint(checkpoint.checkpoint_summary.sequence_number)
475 .expect("failed to update rest_indexes");
476 }
477 }
478
479 #[instrument(level = "debug", skip_all)]
480 fn schedule_synced_checkpoints(
481 &self,
482 pending: &mut CheckpointExecutionBuffer,
483 next_to_schedule: &mut CheckpointSequenceNumber,
484 epoch_store: Arc<AuthorityPerEpochStore>,
485 run_with_range: Option<RunWithRange>,
486 ) {
487 let Some(latest_synced_checkpoint) = self
488 .checkpoint_store
489 .get_highest_synced_checkpoint()
490 .expect("Failed to read highest synced checkpoint")
491 else {
492 debug!("No checkpoints to schedule, highest synced checkpoint is None",);
493 return;
494 };
495
496 while *next_to_schedule <= *latest_synced_checkpoint.sequence_number()
497 && pending.len() < self.config.checkpoint_execution_max_concurrency
498 {
499 let checkpoint = self
500 .checkpoint_store
501 .get_checkpoint_by_sequence_number(*next_to_schedule)
502 .unwrap()
503 .unwrap_or_else(|| {
504 panic!(
505 "Checkpoint sequence number {:?} does not exist in checkpoint store",
506 *next_to_schedule
507 )
508 });
509 if checkpoint.epoch() > epoch_store.epoch() {
510 return;
511 }
512 match run_with_range {
513 Some(RunWithRange::Checkpoint(seq)) if *next_to_schedule > seq => {
514 debug!(
515 "RunWithRange Checkpoint {} is set, not scheduling checkpoint {}",
516 seq, *next_to_schedule
517 );
518 return;
519 }
520 _ => {
521 self.schedule_checkpoint(checkpoint, pending, epoch_store.clone());
522 *next_to_schedule += 1;
523 }
524 }
525 }
526 }
527
528 #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
529 fn schedule_checkpoint(
530 &self,
531 checkpoint: VerifiedCheckpoint,
532 pending: &mut CheckpointExecutionBuffer,
533 epoch_store: Arc<AuthorityPerEpochStore>,
534 ) {
535 debug!("Scheduling checkpoint for execution");
536
537 let checkpoint_epoch = checkpoint.epoch();
540 assert_eq!(
541 checkpoint_epoch,
542 epoch_store.epoch(),
543 "Epoch mismatch after startup recovery. checkpoint epoch: {:?}, node epoch: {:?}",
544 checkpoint_epoch,
545 epoch_store.epoch(),
546 );
547
548 let metrics = self.metrics.clone();
549 let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
550 let data_ingestion_dir = self.config.data_ingestion_dir.clone();
551 let checkpoint_store = self.checkpoint_store.clone();
552 let object_cache_reader = self.object_cache_reader.clone();
553 let transaction_cache_reader = self.transaction_cache_reader.clone();
554 let tx_manager = self.tx_manager.clone();
555 let accumulator = self.accumulator.clone();
556 let state = self.state.clone();
557
558 epoch_store.notify_synced_checkpoint(*checkpoint.sequence_number());
559
560 pending.push_back(spawn_monitored_task!(async move {
561 let epoch_store = epoch_store.clone();
562 let (tx_digests, checkpoint_acc, checkpoint_data) = loop {
563 match execute_checkpoint(
564 checkpoint.clone(),
565 &state,
566 object_cache_reader.as_ref(),
567 transaction_cache_reader.as_ref(),
568 checkpoint_store.clone(),
569 epoch_store.clone(),
570 tx_manager.clone(),
571 accumulator.clone(),
572 local_execution_timeout_sec,
573 &metrics,
574 data_ingestion_dir.clone(),
575 )
576 .await
577 {
578 Err(err) => {
579 error!(
580 "Error while executing checkpoint, will retry in 1s: {:?}",
581 err
582 );
583 tokio::time::sleep(Duration::from_secs(1)).await;
584 metrics.checkpoint_exec_errors.inc();
585 }
586 Ok((tx_digests, checkpoint_acc, checkpoint_data)) => {
587 break (tx_digests, checkpoint_acc, checkpoint_data);
588 }
589 }
590 };
591 (checkpoint, checkpoint_acc, checkpoint_data, tx_digests)
592 }));
593 }
594
595 #[instrument(level = "info", skip_all)]
596 async fn execute_change_epoch_tx(
597 &self,
598 execution_digests: ExecutionDigests,
599 change_epoch_tx_digest: TransactionDigest,
600 change_epoch_tx: VerifiedExecutableTransaction,
601 epoch_store: Arc<AuthorityPerEpochStore>,
602 checkpoint: VerifiedCheckpoint,
603 ) {
604 let change_epoch_fx = self
605 .transaction_cache_reader
606 .get_effects(&execution_digests.effects)
607 .expect("Fetching effects for change_epoch tx cannot fail")
608 .expect("Change_epoch tx effects must exist");
609
610 if change_epoch_tx.contains_shared_object() {
611 epoch_store
612 .acquire_shared_version_assignments_from_effects(
613 &change_epoch_tx,
614 &change_epoch_fx,
615 self.object_cache_reader.as_ref(),
616 )
617 .await
618 .expect("Acquiring shared version assignments for change_epoch tx cannot fail");
619 }
620
621 self.tx_manager.enqueue_with_expected_effects_digest(
622 vec![(change_epoch_tx.clone(), execution_digests.effects)],
623 &epoch_store,
624 );
625 handle_execution_effects(
626 &self.state,
627 vec![execution_digests],
628 vec![change_epoch_tx_digest],
629 checkpoint.clone(),
630 self.checkpoint_store.clone(),
631 self.object_cache_reader.as_ref(),
632 self.transaction_cache_reader.as_ref(),
633 epoch_store.clone(),
634 self.tx_manager.clone(),
635 self.accumulator.clone(),
636 self.config.local_execution_timeout_sec,
637 self.config.data_ingestion_dir.clone(),
638 )
639 .await;
640 }
641
642 pub async fn check_epoch_last_checkpoint(
646 &self,
647 epoch_store: Arc<AuthorityPerEpochStore>,
648 checkpoint: &Option<VerifiedCheckpoint>,
649 ) -> bool {
650 let cur_epoch = epoch_store.epoch();
651
652 if let Some(checkpoint) = checkpoint {
653 if checkpoint.epoch() == cur_epoch {
654 if let Some((change_epoch_execution_digests, change_epoch_tx)) =
655 extract_end_of_epoch_tx(
656 checkpoint,
657 self.transaction_cache_reader.as_ref(),
658 self.checkpoint_store.clone(),
659 epoch_store.clone(),
660 )
661 {
662 let change_epoch_tx_digest = change_epoch_execution_digests.transaction;
663
664 info!(
665 ended_epoch = cur_epoch,
666 last_checkpoint = checkpoint.sequence_number(),
667 "Reached end of epoch, executing change_epoch transaction",
668 );
669
670 self.execute_change_epoch_tx(
671 change_epoch_execution_digests,
672 change_epoch_tx_digest,
673 change_epoch_tx,
674 epoch_store.clone(),
675 checkpoint.clone(),
676 )
677 .await;
678
679 let cache_commit = self.state.get_cache_commit();
680 cache_commit
681 .commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
682 .await
683 .expect("commit_transaction_outputs cannot fail");
684 fail_point_async!("prune-and-compact");
685
686 let all_tx_digests: Vec<_> = self
692 .checkpoint_store
693 .get_checkpoint_contents(&checkpoint.content_digest)
694 .expect("read cannot fail")
695 .expect("Checkpoint contents should exist")
696 .iter()
697 .map(|digests| digests.transaction)
698 .collect();
699
700 let effects = self
701 .transaction_cache_reader
702 .notify_read_executed_effects(&all_tx_digests)
703 .await
704 .expect("Failed to get executed effects for finalizing checkpoint");
705
706 let (_acc, checkpoint_data) = finalize_checkpoint(
707 &self.state,
708 self.object_cache_reader.as_ref(),
709 self.transaction_cache_reader.as_ref(),
710 self.checkpoint_store.clone(),
711 &all_tx_digests,
712 &epoch_store,
713 checkpoint.clone(),
714 self.accumulator.clone(),
715 effects,
716 self.config.data_ingestion_dir.clone(),
717 )
718 .await
719 .expect("Finalizing checkpoint cannot fail");
720
721 self.commit_index_updates_and_enqueue_to_subscription_service(checkpoint_data)
722 .await;
723
724 self.checkpoint_store
725 .insert_epoch_last_checkpoint(cur_epoch, checkpoint)
726 .expect("Failed to insert epoch last checkpoint");
727
728 self.accumulator
729 .accumulate_running_root(&epoch_store, checkpoint.sequence_number, None)
730 .await
731 .expect("Failed to accumulate running root");
732 self.accumulator
733 .accumulate_epoch(epoch_store.clone(), *checkpoint.sequence_number())
734 .await
735 .expect("Accumulating epoch cannot fail");
736
737 self.bump_highest_executed_checkpoint(checkpoint);
738
739 return true;
740 }
741 }
742 }
743 false
744 }
745}
746
747#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
750async fn execute_checkpoint(
751 checkpoint: VerifiedCheckpoint,
752 state: &AuthorityState,
753 object_cache_reader: &dyn ObjectCacheRead,
754 transaction_cache_reader: &dyn TransactionCacheRead,
755 checkpoint_store: Arc<CheckpointStore>,
756 epoch_store: Arc<AuthorityPerEpochStore>,
757 transaction_manager: Arc<TransactionManager>,
758 accumulator: Arc<StateAccumulator>,
759 local_execution_timeout_sec: u64,
760 metrics: &Arc<CheckpointExecutorMetrics>,
761 data_ingestion_dir: Option<PathBuf>,
762) -> IotaResult<(
763 Vec<TransactionDigest>,
764 Option<Accumulator>,
765 Option<CheckpointData>,
766)> {
767 debug!("Preparing checkpoint for execution",);
768 let prepare_start = Instant::now();
769
770 let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
777 get_unexecuted_transactions(
778 checkpoint.clone(),
779 transaction_cache_reader,
780 checkpoint_store.clone(),
781 epoch_store.clone(),
782 );
783
784 let tx_count = execution_digests.len();
785 debug!("Number of transactions in the checkpoint: {:?}", tx_count);
786 metrics
787 .checkpoint_transaction_count
788 .observe(tx_count as f64);
789
790 let (checkpoint_acc, checkpoint_data) = execute_transactions(
791 execution_digests,
792 all_tx_digests.clone(),
793 executable_txns,
794 state,
795 object_cache_reader,
796 transaction_cache_reader,
797 checkpoint_store.clone(),
798 epoch_store.clone(),
799 transaction_manager,
800 accumulator,
801 local_execution_timeout_sec,
802 checkpoint,
803 metrics,
804 prepare_start,
805 data_ingestion_dir,
806 )
807 .await?;
808
809 if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
814 for round in randomness_rounds {
815 debug!(
816 ?round,
817 "notifying RandomnessReporter that randomness update was executed in checkpoint"
818 );
819 randomness_reporter.notify_randomness_in_checkpoint(round)?;
820 }
821 }
822
823 Ok((all_tx_digests, checkpoint_acc, checkpoint_data))
824}
825
826#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
827async fn handle_execution_effects(
828 state: &AuthorityState,
829 execution_digests: Vec<ExecutionDigests>,
830 all_tx_digests: Vec<TransactionDigest>,
831 checkpoint: VerifiedCheckpoint,
832 checkpoint_store: Arc<CheckpointStore>,
833 object_cache_reader: &dyn ObjectCacheRead,
834 transaction_cache_reader: &dyn TransactionCacheRead,
835 epoch_store: Arc<AuthorityPerEpochStore>,
836 transaction_manager: Arc<TransactionManager>,
837 accumulator: Arc<StateAccumulator>,
838 local_execution_timeout_sec: u64,
839 data_ingestion_dir: Option<PathBuf>,
840) -> (Option<Accumulator>, Option<CheckpointData>) {
841 let mut periods = 1;
843 let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
844 let mut blocking_execution = false;
846 loop {
847 let effects_future = transaction_cache_reader.notify_read_executed_effects(&all_tx_digests);
848
849 match timeout(log_timeout_sec, effects_future).await {
850 Err(_elapsed) => {
851 let highest_seq = checkpoint_store
853 .get_highest_executed_checkpoint_seq_number()
854 .unwrap()
855 .unwrap_or_default();
856 if checkpoint.sequence_number <= highest_seq {
857 error!(
858 "Re-executing checkpoint {} after higher checkpoint {} has executed!",
859 checkpoint.sequence_number, highest_seq
860 );
861 continue;
862 }
863 if checkpoint.sequence_number > highest_seq + 1 {
864 trace!(
865 "Checkpoint {} is still executing. Highest executed = {}",
866 checkpoint.sequence_number, highest_seq
867 );
868 continue;
869 }
870 if !blocking_execution {
871 trace!(
872 "Checkpoint {} is next to execute.",
873 checkpoint.sequence_number
874 );
875 blocking_execution = true;
876 continue;
877 }
878
879 let missing_digests: Vec<TransactionDigest> = transaction_cache_reader
882 .multi_get_executed_effects_digests(&all_tx_digests)
883 .expect("multi_get_executed_effects cannot fail")
884 .iter()
885 .zip(all_tx_digests.clone())
886 .filter_map(
887 |(fx, digest)| {
888 if fx.is_none() { Some(digest) } else { None }
889 },
890 )
891 .collect();
892
893 if missing_digests.is_empty() {
894 continue;
896 }
897
898 warn!(
899 "Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
900 missing_digests,
901 log_timeout_sec * periods,
902 );
903
904 let pending_digest = missing_digests.first().unwrap();
907 if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
908 warn!(
909 "Transaction {pending_digest:?} has missing input objects {missing_input:?}",
910 );
911 }
912 periods += 1;
913 }
914 Ok(Err(err)) => panic!("Failed to notify_read_executed_effects: {err:?}"),
915 Ok(Ok(effects)) => {
916 for (tx_digest, expected_digest, actual_effects) in
917 izip!(&all_tx_digests, &execution_digests, &effects)
918 {
919 let expected_effects_digest = &expected_digest.effects;
920 assert_not_forked(
921 &checkpoint,
922 tx_digest,
923 expected_effects_digest,
924 &actual_effects.digest(),
925 transaction_cache_reader,
926 );
927 }
928
929 if checkpoint.end_of_epoch_data.is_none() {
932 let (checkpoint_acc, checkpoint_data) = finalize_checkpoint(
933 state,
934 object_cache_reader,
935 transaction_cache_reader,
936 checkpoint_store.clone(),
937 &all_tx_digests,
938 &epoch_store,
939 checkpoint.clone(),
940 accumulator.clone(),
941 effects,
942 data_ingestion_dir,
943 )
944 .await
945 .expect("Finalizing checkpoint cannot fail");
946 return (Some(checkpoint_acc), Some(checkpoint_data));
947 } else {
948 return (None, None);
949 }
950 }
951 }
952 }
953}
954
955fn assert_not_forked(
956 checkpoint: &VerifiedCheckpoint,
957 tx_digest: &TransactionDigest,
958 expected_digest: &TransactionEffectsDigest,
959 actual_effects_digest: &TransactionEffectsDigest,
960 cache_reader: &dyn TransactionCacheRead,
961) {
962 if *expected_digest != *actual_effects_digest {
963 let actual_effects = cache_reader
964 .get_executed_effects(tx_digest)
965 .expect("get_executed_effects cannot fail")
966 .expect("actual effects should exist");
967
968 error!(
970 ?checkpoint,
971 ?tx_digest,
972 ?expected_digest,
973 ?actual_effects,
974 "fork detected!"
975 );
976 panic!(
977 "When executing checkpoint {}, transaction {} \
978 is expected to have effects digest {}, but got {}!",
979 checkpoint.sequence_number(),
980 tx_digest,
981 expected_digest,
982 actual_effects_digest,
983 );
984 }
985}
986
987fn extract_end_of_epoch_tx(
989 checkpoint: &VerifiedCheckpoint,
990 cache_reader: &dyn TransactionCacheRead,
991 checkpoint_store: Arc<CheckpointStore>,
992 epoch_store: Arc<AuthorityPerEpochStore>,
993) -> Option<(ExecutionDigests, VerifiedExecutableTransaction)> {
994 checkpoint.end_of_epoch_data.as_ref()?;
995
996 let checkpoint_sequence = checkpoint.sequence_number();
1000 let execution_digests = checkpoint_store
1001 .get_checkpoint_contents(&checkpoint.content_digest)
1002 .expect("Failed to get checkpoint contents from store")
1003 .unwrap_or_else(|| {
1004 panic!(
1005 "Checkpoint contents for digest {:?} does not exist",
1006 checkpoint.content_digest
1007 )
1008 })
1009 .into_inner();
1010
1011 let digests = execution_digests
1012 .last()
1013 .expect("Final checkpoint must have at least one transaction");
1014
1015 let change_epoch_tx = cache_reader
1016 .get_transaction_block(&digests.transaction)
1017 .expect("read cannot fail");
1018
1019 let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
1020 (*change_epoch_tx.unwrap_or_else(||
1021 panic!(
1022 "state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {checkpoint:?}"
1023 )
1024 )).clone(),
1025 epoch_store.epoch(),
1026 *checkpoint_sequence,
1027 );
1028
1029 assert!(
1030 change_epoch_tx
1031 .data()
1032 .intent_message()
1033 .value
1034 .is_end_of_epoch_tx()
1035 );
1036
1037 Some((*digests, change_epoch_tx))
1038}
1039
1040#[expect(clippy::type_complexity)]
1044fn get_unexecuted_transactions(
1045 checkpoint: VerifiedCheckpoint,
1046 cache_reader: &dyn TransactionCacheRead,
1047 checkpoint_store: Arc<CheckpointStore>,
1048 epoch_store: Arc<AuthorityPerEpochStore>,
1049) -> (
1050 Vec<ExecutionDigests>,
1051 Vec<TransactionDigest>,
1052 Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1053 Vec<RandomnessRound>,
1054) {
1055 let checkpoint_sequence = checkpoint.sequence_number();
1056 let full_contents = checkpoint_store
1057 .get_full_checkpoint_contents_by_sequence_number(*checkpoint_sequence)
1058 .expect("Failed to get checkpoint contents from store")
1059 .tap_some(|_| {
1060 debug!("loaded full checkpoint contents in bulk for sequence {checkpoint_sequence}")
1061 });
1062
1063 let mut execution_digests = checkpoint_store
1064 .get_checkpoint_contents(&checkpoint.content_digest)
1065 .expect("Failed to get checkpoint contents from store")
1066 .unwrap_or_else(|| {
1067 panic!(
1068 "Checkpoint contents for digest {:?} does not exist",
1069 checkpoint.content_digest
1070 )
1071 })
1072 .into_inner();
1073
1074 let full_contents_txns = full_contents.map(|c| {
1075 c.into_iter()
1076 .zip(execution_digests.iter())
1077 .map(|(txn, digests)| (digests.transaction, txn))
1078 .collect::<HashMap<_, _>>()
1079 });
1080
1081 checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
1084 let digests = execution_digests
1085 .pop()
1086 .expect("Final checkpoint must have at least one transaction");
1087
1088 let change_epoch_tx = cache_reader
1089 .get_transaction_block(&digests.transaction)
1090 .expect("read cannot fail")
1091 .unwrap_or_else(||
1092 panic!(
1093 "state-sync should have ensured that transaction with digests {digests:?} exists for checkpoint: {}",
1094 checkpoint.sequence_number()
1095 )
1096 );
1097 assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
1098 });
1099
1100 let randomness_rounds = if let Some(version_specific_data) = checkpoint
1101 .version_specific_data(epoch_store.protocol_config())
1102 .expect("unable to get version_specific_data")
1103 {
1104 version_specific_data.into_v1().randomness_rounds
1107 } else {
1108 assert_eq!(
1113 0,
1114 epoch_store
1115 .protocol_config()
1116 .min_checkpoint_interval_ms_as_option()
1117 .unwrap_or_default(),
1118 );
1119 if let Some(first_digest) = execution_digests.first() {
1120 let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
1121 .expect("read cannot fail")
1122 .unwrap_or_else(||
1123 panic!(
1124 "state-sync should have ensured that transaction with digests {first_digest:?} exists for checkpoint: {}",
1125 checkpoint.sequence_number()
1126 )
1127 );
1128 if let TransactionKind::RandomnessStateUpdate(rsu) =
1129 maybe_randomness_tx.data().transaction_data().kind()
1130 {
1131 vec![rsu.randomness_round]
1132 } else {
1133 Vec::new()
1134 }
1135 } else {
1136 Vec::new()
1137 }
1138 };
1139
1140 let all_tx_digests: Vec<TransactionDigest> =
1141 execution_digests.iter().map(|tx| tx.transaction).collect();
1142
1143 let executed_effects_digests = cache_reader
1144 .multi_get_executed_effects_digests(&all_tx_digests)
1145 .expect("failed to read executed_effects from store");
1146
1147 let (unexecuted_txns, expected_effects_digests): (Vec<_>, Vec<_>) =
1148 izip!(execution_digests.iter(), executed_effects_digests.iter())
1149 .filter_map(|(digests, effects_digest)| match effects_digest {
1150 None => Some((digests.transaction, digests.effects)),
1151 Some(actual_effects_digest) => {
1152 let tx_digest = &digests.transaction;
1153 let effects_digest = &digests.effects;
1154 trace!(
1155 "Transaction with digest {:?} has already been executed",
1156 tx_digest
1157 );
1158 assert_not_forked(
1159 &checkpoint,
1160 tx_digest,
1161 effects_digest,
1162 actual_effects_digest,
1163 cache_reader,
1164 );
1165 None
1166 }
1167 })
1168 .unzip();
1169
1170 let executable_txns: Vec<_> = if let Some(full_contents_txns) = full_contents_txns {
1172 unexecuted_txns
1173 .into_iter()
1174 .zip(expected_effects_digests)
1175 .map(|(tx_digest, expected_effects_digest)| {
1176 let tx = &full_contents_txns.get(&tx_digest).unwrap().transaction;
1177 (
1178 VerifiedExecutableTransaction::new_from_checkpoint(
1179 VerifiedTransaction::new_unchecked(tx.clone()),
1180 epoch_store.epoch(),
1181 *checkpoint_sequence,
1182 ),
1183 expected_effects_digest,
1184 )
1185 })
1186 .collect()
1187 } else {
1188 cache_reader
1189 .multi_get_transaction_blocks(&unexecuted_txns)
1190 .expect("Failed to get checkpoint txes from store")
1191 .into_iter()
1192 .zip(expected_effects_digests)
1193 .enumerate()
1194 .map(|(i, (tx, expected_effects_digest))| {
1195 let tx = tx.unwrap_or_else(||
1196 panic!(
1197 "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
1198 unexecuted_txns[i]
1199 )
1200 );
1201 assert!(!tx.data().intent_message().value.is_end_of_epoch_tx());
1203 (
1204 VerifiedExecutableTransaction::new_from_checkpoint(
1205 Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()),
1206 epoch_store.epoch(),
1207 *checkpoint_sequence,
1208 ),
1209 expected_effects_digest
1210 )
1211 })
1212 .collect()
1213 };
1214
1215 (
1216 execution_digests,
1217 all_tx_digests,
1218 executable_txns,
1219 randomness_rounds,
1220 )
1221}
1222
1223#[instrument(level = "debug", skip_all)]
1226async fn execute_transactions(
1227 execution_digests: Vec<ExecutionDigests>,
1228 all_tx_digests: Vec<TransactionDigest>,
1229 executable_txns: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1230 state: &AuthorityState,
1231 object_cache_reader: &dyn ObjectCacheRead,
1232 transaction_cache_reader: &dyn TransactionCacheRead,
1233 checkpoint_store: Arc<CheckpointStore>,
1234 epoch_store: Arc<AuthorityPerEpochStore>,
1235 transaction_manager: Arc<TransactionManager>,
1236 state_accumulator: Arc<StateAccumulator>,
1237 local_execution_timeout_sec: u64,
1238 checkpoint: VerifiedCheckpoint,
1239 metrics: &Arc<CheckpointExecutorMetrics>,
1240 prepare_start: Instant,
1241 data_ingestion_dir: Option<PathBuf>,
1242) -> IotaResult<(Option<Accumulator>, Option<CheckpointData>)> {
1243 let effects_digests: HashMap<_, _> = execution_digests
1244 .iter()
1245 .map(|digest| (digest.transaction, digest.effects))
1246 .collect();
1247
1248 let shared_effects_digests = executable_txns
1249 .iter()
1250 .filter(|(tx, _)| tx.contains_shared_object())
1251 .map(|(tx, _)| {
1252 *effects_digests
1253 .get(tx.digest())
1254 .expect("Transaction digest not found in effects_digests")
1255 })
1256 .collect::<Vec<_>>();
1257
1258 let digest_to_effects: HashMap<TransactionDigest, TransactionEffects> =
1259 transaction_cache_reader
1260 .multi_get_effects(&shared_effects_digests)?
1261 .into_iter()
1262 .zip(shared_effects_digests)
1263 .map(|(fx, fx_digest)| {
1264 if fx.is_none() {
1265 panic!(
1266 "Transaction effects for effects digest {fx_digest:?} do not exist in effects table"
1267 );
1268 }
1269 let fx = fx.unwrap();
1270 (*fx.transaction_digest(), fx)
1271 })
1272 .collect();
1273
1274 for (tx, _) in &executable_txns {
1275 if tx.contains_shared_object() {
1276 epoch_store
1277 .acquire_shared_version_assignments_from_effects(
1278 tx,
1279 digest_to_effects.get(tx.digest()).unwrap(),
1280 object_cache_reader,
1281 )
1282 .await?;
1283 }
1284 }
1285
1286 let prepare_elapsed = prepare_start.elapsed();
1287 metrics
1288 .checkpoint_prepare_latency
1289 .observe(prepare_elapsed.as_secs_f64());
1290 if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1291 info!(
1292 "Checkpoint preparation for execution took {:?}",
1293 prepare_elapsed
1294 );
1295 }
1296
1297 let exec_start = Instant::now();
1298 transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);
1299
1300 let (checkpoint_acc, checkpoint_data) = handle_execution_effects(
1301 state,
1302 execution_digests,
1303 all_tx_digests,
1304 checkpoint.clone(),
1305 checkpoint_store,
1306 object_cache_reader,
1307 transaction_cache_reader,
1308 epoch_store,
1309 transaction_manager,
1310 state_accumulator,
1311 local_execution_timeout_sec,
1312 data_ingestion_dir,
1313 )
1314 .await;
1315
1316 let exec_elapsed = exec_start.elapsed();
1317 metrics
1318 .checkpoint_exec_latency
1319 .observe(exec_elapsed.as_secs_f64());
1320 if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1321 info!("Checkpoint execution took {:?}", exec_elapsed);
1322 }
1323
1324 Ok((checkpoint_acc, checkpoint_data))
1325}
1326
1327#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
1328async fn finalize_checkpoint(
1329 state: &AuthorityState,
1330 object_cache_reader: &dyn ObjectCacheRead,
1331 transaction_cache_reader: &dyn TransactionCacheRead,
1332 checkpoint_store: Arc<CheckpointStore>,
1333 tx_digests: &[TransactionDigest],
1334 epoch_store: &Arc<AuthorityPerEpochStore>,
1335 checkpoint: VerifiedCheckpoint,
1336 accumulator: Arc<StateAccumulator>,
1337 effects: Vec<TransactionEffects>,
1338 data_ingestion_dir: Option<PathBuf>,
1339) -> IotaResult<(Accumulator, CheckpointData)> {
1340 debug!("finalizing checkpoint");
1341 epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;
1342
1343 state
1345 .get_checkpoint_cache()
1346 .insert_finalized_transactions_perpetual_checkpoints(
1347 tx_digests,
1348 epoch_store.epoch(),
1349 checkpoint.sequence_number,
1350 )?;
1351
1352 let checkpoint_acc =
1353 accumulator.accumulate_checkpoint(effects, checkpoint.sequence_number, epoch_store)?;
1354
1355 let checkpoint_data = load_checkpoint_data(
1356 checkpoint,
1357 object_cache_reader,
1358 transaction_cache_reader,
1359 checkpoint_store,
1360 tx_digests,
1361 )?;
1362
1363 if state.rest_index.is_some() || data_ingestion_dir.is_some() {
1364 if let Some(rest_index) = &state.rest_index {
1367 let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
1368 PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
1369 ));
1370
1371 rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
1372 }
1373
1374 if let Some(path) = data_ingestion_dir {
1375 store_checkpoint_locally(path, &checkpoint_data)?;
1376 }
1377 }
1378
1379 Ok((checkpoint_acc, checkpoint_data))
1380}