1use std::{
25 collections::HashMap,
26 path::PathBuf,
27 sync::Arc,
28 time::{Duration, Instant},
29};
30
31use either::Either;
32use futures::stream::FuturesOrdered;
33use iota_config::node::{CheckpointExecutorConfig, RunWithRange};
34use iota_macros::{fail_point, fail_point_async};
35use iota_metrics::spawn_monitored_task;
36use iota_types::{
37 accumulator::Accumulator,
38 base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
39 crypto::RandomnessRound,
40 effects::{TransactionEffects, TransactionEffectsAPI},
41 error::IotaResult,
42 executable_transaction::VerifiedExecutableTransaction,
43 inner_temporary_store::PackageStoreWithFallback,
44 message_envelope::Message,
45 messages_checkpoint::{CheckpointSequenceNumber, VerifiedCheckpoint},
46 transaction::{TransactionDataAPI, TransactionKind, VerifiedTransaction},
47};
48use itertools::izip;
49use tap::{TapFallible, TapOptional};
50use tokio::{
51 sync::broadcast::{self, error::RecvError},
52 task::JoinHandle,
53 time::timeout,
54};
55use tokio_stream::StreamExt;
56use tracing::{debug, error, info, instrument, trace, warn};
57
58use self::metrics::CheckpointExecutorMetrics;
59use crate::{
60 authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
61 checkpoints::{
62 CheckpointStore,
63 checkpoint_executor::data_ingestion_handler::{
64 load_checkpoint_data, store_checkpoint_locally,
65 },
66 },
67 execution_cache::{ObjectCacheRead, TransactionCacheRead},
68 state_accumulator::StateAccumulator,
69 transaction_manager::TransactionManager,
70};
71
72mod data_ingestion_handler;
73pub mod metrics;
74
75#[cfg(test)]
76pub(crate) mod tests;
77
78type CheckpointExecutionBuffer = FuturesOrdered<
79 JoinHandle<(
80 VerifiedCheckpoint,
81 Option<Accumulator>,
82 Vec<TransactionDigest>,
83 )>,
84>;
85
86const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;
88
89#[derive(Debug, Clone, Copy)]
90pub struct CheckpointTimeoutConfig {
91 pub panic_timeout: Option<Duration>,
92 pub warning_timeout: Duration,
93}
94
95thread_local! {
100 static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
101 const { once_cell::sync::OnceCell::new() };
102}
103
104#[cfg(msim)]
105pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
106 SCHEDULING_TIMEOUT.with(|s| {
107 s.set(config).expect("SchedulingTimeoutConfig already set");
108 });
109}
110
111fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
112 fn inner() -> CheckpointTimeoutConfig {
113 let panic_timeout: Option<Duration> = if cfg!(msim) {
114 Some(Duration::from_secs(45))
115 } else {
116 std::env::var("NEW_CHECKPOINT_PANIC_TIMEOUT_MS")
117 .ok()
118 .and_then(|s| s.parse::<u64>().ok())
119 .map(Duration::from_millis)
120 };
121
122 let warning_timeout: Duration = std::env::var("NEW_CHECKPOINT_WARNING_TIMEOUT_MS")
123 .ok()
124 .and_then(|s| s.parse::<u64>().ok())
125 .map(Duration::from_millis)
126 .unwrap_or(Duration::from_secs(5));
127
128 CheckpointTimeoutConfig {
129 panic_timeout,
130 warning_timeout,
131 }
132 }
133
134 SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
135}
136
137#[derive(PartialEq, Eq, Debug)]
138pub enum StopReason {
139 EpochComplete,
140 RunWithRangeCondition,
141}
142
143pub struct CheckpointExecutor {
144 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
145 state: Arc<AuthorityState>,
146 checkpoint_store: Arc<CheckpointStore>,
147 object_cache_reader: Arc<dyn ObjectCacheRead>,
148 transaction_cache_reader: Arc<dyn TransactionCacheRead>,
149 tx_manager: Arc<TransactionManager>,
150 accumulator: Arc<StateAccumulator>,
151 config: CheckpointExecutorConfig,
152 metrics: Arc<CheckpointExecutorMetrics>,
153}
154
155impl CheckpointExecutor {
156 pub fn new(
157 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
158 checkpoint_store: Arc<CheckpointStore>,
159 state: Arc<AuthorityState>,
160 accumulator: Arc<StateAccumulator>,
161 config: CheckpointExecutorConfig,
162 metrics: Arc<CheckpointExecutorMetrics>,
163 ) -> Self {
164 Self {
165 mailbox,
166 state: state.clone(),
167 checkpoint_store,
168 object_cache_reader: state.get_object_cache_reader().clone(),
169 transaction_cache_reader: state.get_transaction_cache_reader().clone(),
170 tx_manager: state.transaction_manager().clone(),
171 accumulator,
172 config,
173 metrics,
174 }
175 }
176
177 pub fn new_for_tests(
178 mailbox: broadcast::Receiver<VerifiedCheckpoint>,
179 checkpoint_store: Arc<CheckpointStore>,
180 state: Arc<AuthorityState>,
181 accumulator: Arc<StateAccumulator>,
182 ) -> Self {
183 Self::new(
184 mailbox,
185 checkpoint_store,
186 state,
187 accumulator,
188 Default::default(),
189 CheckpointExecutorMetrics::new_for_tests(),
190 )
191 }
192
193 pub async fn run_epoch(
197 &mut self,
198 epoch_store: Arc<AuthorityPerEpochStore>,
199 run_with_range: Option<RunWithRange>,
200 ) -> StopReason {
201 if run_with_range.is_some_and(|rwr| rwr.is_epoch_gt(epoch_store.epoch())) {
205 info!(
206 "RunWithRange condition satisfied at {:?}, run_epoch={:?}",
207 run_with_range,
208 epoch_store.epoch()
209 );
210 return StopReason::RunWithRangeCondition;
211 };
212
213 debug!(
214 "Checkpoint executor running for epoch {}",
215 epoch_store.epoch(),
216 );
217 self.metrics
218 .checkpoint_exec_epoch
219 .set(epoch_store.epoch() as i64);
220
221 let mut highest_executed = self
225 .checkpoint_store
226 .get_highest_executed_checkpoint()
227 .unwrap();
228
229 if let Some(highest_executed) = &highest_executed {
230 if epoch_store.epoch() == highest_executed.epoch()
231 && highest_executed.is_last_checkpoint_of_epoch()
232 {
233 info!(seq = ?highest_executed.sequence_number, "final checkpoint of epoch has already been executed");
236 return StopReason::EpochComplete;
237 }
238 }
239
240 let mut next_to_schedule = highest_executed
241 .as_ref()
242 .map(|c| c.sequence_number() + 1)
243 .unwrap_or_else(|| {
244 assert_eq!(epoch_store.epoch(), 0);
246 0
247 });
248 let mut pending: CheckpointExecutionBuffer = FuturesOrdered::new();
249
250 let mut now_time = Instant::now();
251 let mut now_transaction_num = highest_executed
252 .as_ref()
253 .map(|c| c.network_total_transactions)
254 .unwrap_or(0);
255 let scheduling_timeout_config = get_scheduling_timeout();
256
257 loop {
258 if self
265 .check_epoch_last_checkpoint(epoch_store.clone(), &highest_executed)
266 .await
267 {
268 self.checkpoint_store
269 .prune_local_summaries()
270 .tap_err(|e| error!("Failed to prune local summaries: {}", e))
271 .ok();
272
273 assert!(
275 pending.is_empty(),
276 "Pending checkpoint execution buffer should be empty after processing last checkpoint of epoch",
277 );
278 fail_point_async!("crash");
279 debug!(epoch = epoch_store.epoch(), "finished epoch");
280 return StopReason::EpochComplete;
281 }
282
283 self.schedule_synced_checkpoints(
284 &mut pending,
285 &mut next_to_schedule,
288 epoch_store.clone(),
289 run_with_range,
290 );
291
292 self.metrics
293 .checkpoint_exec_inflight
294 .set(pending.len() as i64);
295
296 let panic_timeout = scheduling_timeout_config.panic_timeout;
297 let warning_timeout = scheduling_timeout_config.warning_timeout;
298
299 tokio::select! {
300 Some(Ok((checkpoint, checkpoint_acc, tx_digests))) = pending.next() => {
305 self.process_executed_checkpoint(&epoch_store, &checkpoint, checkpoint_acc, &tx_digests).await;
306 highest_executed = Some(checkpoint.clone());
307
308 let elapsed = now_time.elapsed().as_millis();
310 let current_transaction_num = highest_executed.as_ref().map(|c| c.network_total_transactions).unwrap_or(0);
311 if current_transaction_num - now_transaction_num > 10_000 || elapsed > 30_000 {
312 let tps = (1000.0 * (current_transaction_num - now_transaction_num) as f64 / elapsed as f64) as i32;
313 self.metrics.checkpoint_exec_sync_tps.set(tps as i64);
314 now_time = Instant::now();
315 now_transaction_num = current_transaction_num;
316 }
317 if run_with_range.is_some_and(|rwr| rwr.matches_checkpoint(checkpoint.sequence_number)) {
319 info!(
320 "RunWithRange condition satisfied after checkpoint sequence number {:?}",
321 checkpoint.sequence_number
322 );
323 return StopReason::RunWithRangeCondition;
324 }
325 }
326
327 received = self.mailbox.recv() => match received {
328 Ok(checkpoint) => {
329 debug!(
330 sequence_number = ?checkpoint.sequence_number,
331 "Received checkpoint summary from state sync"
332 );
333 checkpoint.report_checkpoint_age_ms(&self.metrics.checkpoint_contents_age_ms);
334 },
335 Err(RecvError::Lagged(num_skipped)) => {
336 debug!(
337 "Checkpoint Execution Recv channel overflowed with {:?} messages",
338 num_skipped,
339 );
340 }
341 Err(RecvError::Closed) => {
342 panic!("Checkpoint Execution Sender (StateSync) closed channel unexpectedly");
343 },
344 },
345
346 _ = tokio::time::sleep(warning_timeout) => {
347 warn!(
348 "Received no new synced checkpoints for {warning_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
349 );
350 }
351
352 _ = panic_timeout
353 .map(|d| Either::Left(tokio::time::sleep(d)))
354 .unwrap_or_else(|| Either::Right(futures::future::pending())) => {
355 panic!("No new synced checkpoints received for {panic_timeout:?} on node {:?}", self.state.name);
356 },
357 }
358 }
359 }
360
361 fn bump_highest_executed_checkpoint(&self, checkpoint: &VerifiedCheckpoint) {
362 let seq = *checkpoint.sequence_number();
364 debug!("Bumping highest_executed_checkpoint watermark to {seq:?}");
365 if let Some(prev_highest) = self
366 .checkpoint_store
367 .get_highest_executed_checkpoint_seq_number()
368 .unwrap()
369 {
370 assert_eq!(prev_highest + 1, seq);
371 } else {
372 assert_eq!(seq, 0);
373 }
374 if seq % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
375 info!("Finished syncing and executing checkpoint {}", seq);
376 }
377
378 fail_point!("highest-executed-checkpoint");
379
380 const NUM_SAVED_FULL_CHECKPOINT_CONTENTS: u64 = 5_000;
383 if seq >= NUM_SAVED_FULL_CHECKPOINT_CONTENTS {
384 let prune_seq = seq - NUM_SAVED_FULL_CHECKPOINT_CONTENTS;
385 if let Some(prune_checkpoint) = self
386 .checkpoint_store
387 .get_checkpoint_by_sequence_number(prune_seq)
388 .expect("Failed to fetch checkpoint")
389 {
390 self.checkpoint_store
391 .delete_full_checkpoint_contents(prune_seq)
392 .expect("Failed to delete full checkpoint contents");
393 self.checkpoint_store
394 .delete_contents_digest_sequence_number_mapping(
395 &prune_checkpoint.content_digest,
396 )
397 .expect("Failed to delete contents digest -> sequence number mapping");
398 } else {
399 debug!(
403 "Failed to fetch checkpoint with sequence number {:?}",
404 prune_seq
405 );
406 }
407 }
408
409 self.checkpoint_store
410 .update_highest_executed_checkpoint(checkpoint)
411 .unwrap();
412 self.metrics.last_executed_checkpoint.set(seq as i64);
413
414 self.metrics
415 .last_executed_checkpoint_timestamp_ms
416 .set(checkpoint.timestamp_ms as i64);
417 checkpoint.report_checkpoint_age_ms(&self.metrics.last_executed_checkpoint_age_ms);
418 }
419
420 #[instrument(level = "debug", skip_all)]
424 async fn process_executed_checkpoint(
425 &self,
426 epoch_store: &AuthorityPerEpochStore,
427 checkpoint: &VerifiedCheckpoint,
428 checkpoint_acc: Option<Accumulator>,
429 all_tx_digests: &[TransactionDigest],
430 ) {
431 let cache_commit = self.state.get_cache_commit();
433 debug!(seq = ?checkpoint.sequence_number, "committing checkpoint transactions to disk");
434 cache_commit
435 .commit_transaction_outputs(epoch_store.epoch(), all_tx_digests)
436 .await
437 .expect("commit_transaction_outputs cannot fail");
438
439 epoch_store
440 .handle_committed_transactions(all_tx_digests)
441 .expect("cannot fail");
442
443 if !checkpoint.is_last_checkpoint_of_epoch() {
444 self.accumulator
445 .accumulate_running_root(epoch_store, checkpoint.sequence_number, checkpoint_acc)
446 .await
447 .expect("Failed to accumulate running root");
448 self.bump_highest_executed_checkpoint(checkpoint);
449 }
450 }
451
452 #[instrument(level = "debug", skip_all)]
453 fn schedule_synced_checkpoints(
454 &self,
455 pending: &mut CheckpointExecutionBuffer,
456 next_to_schedule: &mut CheckpointSequenceNumber,
457 epoch_store: Arc<AuthorityPerEpochStore>,
458 run_with_range: Option<RunWithRange>,
459 ) {
460 let Some(latest_synced_checkpoint) = self
461 .checkpoint_store
462 .get_highest_synced_checkpoint()
463 .expect("Failed to read highest synced checkpoint")
464 else {
465 debug!("No checkpoints to schedule, highest synced checkpoint is None",);
466 return;
467 };
468
469 while *next_to_schedule <= *latest_synced_checkpoint.sequence_number()
470 && pending.len() < self.config.checkpoint_execution_max_concurrency
471 {
472 let checkpoint = self
473 .checkpoint_store
474 .get_checkpoint_by_sequence_number(*next_to_schedule)
475 .unwrap()
476 .unwrap_or_else(|| {
477 panic!(
478 "Checkpoint sequence number {:?} does not exist in checkpoint store",
479 *next_to_schedule
480 )
481 });
482 if checkpoint.epoch() > epoch_store.epoch() {
483 return;
484 }
485 match run_with_range {
486 Some(RunWithRange::Checkpoint(seq)) if *next_to_schedule > seq => {
487 debug!(
488 "RunWithRange Checkpoint {} is set, not scheduling checkpoint {}",
489 seq, *next_to_schedule
490 );
491 return;
492 }
493 _ => {
494 self.schedule_checkpoint(checkpoint, pending, epoch_store.clone());
495 *next_to_schedule += 1;
496 }
497 }
498 }
499 }
500
501 #[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
502 fn schedule_checkpoint(
503 &self,
504 checkpoint: VerifiedCheckpoint,
505 pending: &mut CheckpointExecutionBuffer,
506 epoch_store: Arc<AuthorityPerEpochStore>,
507 ) {
508 debug!("Scheduling checkpoint for execution");
509
510 let checkpoint_epoch = checkpoint.epoch();
513 assert_eq!(
514 checkpoint_epoch,
515 epoch_store.epoch(),
516 "Epoch mismatch after startup recovery. checkpoint epoch: {:?}, node epoch: {:?}",
517 checkpoint_epoch,
518 epoch_store.epoch(),
519 );
520
521 let metrics = self.metrics.clone();
522 let local_execution_timeout_sec = self.config.local_execution_timeout_sec;
523 let data_ingestion_dir = self.config.data_ingestion_dir.clone();
524 let checkpoint_store = self.checkpoint_store.clone();
525 let object_cache_reader = self.object_cache_reader.clone();
526 let transaction_cache_reader = self.transaction_cache_reader.clone();
527 let tx_manager = self.tx_manager.clone();
528 let accumulator = self.accumulator.clone();
529 let state = self.state.clone();
530
531 epoch_store.notify_synced_checkpoint(*checkpoint.sequence_number());
532
533 pending.push_back(spawn_monitored_task!(async move {
534 let epoch_store = epoch_store.clone();
535 let (tx_digests, checkpoint_acc) = loop {
536 match execute_checkpoint(
537 checkpoint.clone(),
538 &state,
539 object_cache_reader.as_ref(),
540 transaction_cache_reader.as_ref(),
541 checkpoint_store.clone(),
542 epoch_store.clone(),
543 tx_manager.clone(),
544 accumulator.clone(),
545 local_execution_timeout_sec,
546 &metrics,
547 data_ingestion_dir.clone(),
548 )
549 .await
550 {
551 Err(err) => {
552 error!(
553 "Error while executing checkpoint, will retry in 1s: {:?}",
554 err
555 );
556 tokio::time::sleep(Duration::from_secs(1)).await;
557 metrics.checkpoint_exec_errors.inc();
558 }
559 Ok((tx_digests, checkpoint_acc)) => break (tx_digests, checkpoint_acc),
560 }
561 };
562 (checkpoint, checkpoint_acc, tx_digests)
563 }));
564 }
565
566 #[instrument(level = "info", skip_all)]
567 async fn execute_change_epoch_tx(
568 &self,
569 execution_digests: ExecutionDigests,
570 change_epoch_tx_digest: TransactionDigest,
571 change_epoch_tx: VerifiedExecutableTransaction,
572 epoch_store: Arc<AuthorityPerEpochStore>,
573 checkpoint: VerifiedCheckpoint,
574 ) {
575 let change_epoch_fx = self
576 .transaction_cache_reader
577 .get_effects(&execution_digests.effects)
578 .expect("Fetching effects for change_epoch tx cannot fail")
579 .expect("Change_epoch tx effects must exist");
580
581 if change_epoch_tx.contains_shared_object() {
582 epoch_store
583 .acquire_shared_locks_from_effects(
584 &change_epoch_tx,
585 &change_epoch_fx,
586 self.object_cache_reader.as_ref(),
587 )
588 .await
589 .expect("Acquiring shared locks for change_epoch tx cannot fail");
590 }
591
592 self.tx_manager.enqueue_with_expected_effects_digest(
593 vec![(change_epoch_tx.clone(), execution_digests.effects)],
594 &epoch_store,
595 );
596 handle_execution_effects(
597 &self.state,
598 vec![execution_digests],
599 vec![change_epoch_tx_digest],
600 checkpoint.clone(),
601 self.checkpoint_store.clone(),
602 self.object_cache_reader.as_ref(),
603 self.transaction_cache_reader.as_ref(),
604 epoch_store.clone(),
605 self.tx_manager.clone(),
606 self.accumulator.clone(),
607 self.config.local_execution_timeout_sec,
608 self.config.data_ingestion_dir.clone(),
609 )
610 .await;
611 }
612
613 pub async fn check_epoch_last_checkpoint(
617 &self,
618 epoch_store: Arc<AuthorityPerEpochStore>,
619 checkpoint: &Option<VerifiedCheckpoint>,
620 ) -> bool {
621 let cur_epoch = epoch_store.epoch();
622
623 if let Some(checkpoint) = checkpoint {
624 if checkpoint.epoch() == cur_epoch {
625 if let Some((change_epoch_execution_digests, change_epoch_tx)) =
626 extract_end_of_epoch_tx(
627 checkpoint,
628 self.transaction_cache_reader.as_ref(),
629 self.checkpoint_store.clone(),
630 epoch_store.clone(),
631 )
632 {
633 let change_epoch_tx_digest = change_epoch_execution_digests.transaction;
634
635 info!(
636 ended_epoch = cur_epoch,
637 last_checkpoint = checkpoint.sequence_number(),
638 "Reached end of epoch, executing change_epoch transaction",
639 );
640
641 self.execute_change_epoch_tx(
642 change_epoch_execution_digests,
643 change_epoch_tx_digest,
644 change_epoch_tx,
645 epoch_store.clone(),
646 checkpoint.clone(),
647 )
648 .await;
649
650 let cache_commit = self.state.get_cache_commit();
651 cache_commit
652 .commit_transaction_outputs(cur_epoch, &[change_epoch_tx_digest])
653 .await
654 .expect("commit_transaction_outputs cannot fail");
655 fail_point_async!("prune-and-compact");
656
657 let all_tx_digests: Vec<_> = self
663 .checkpoint_store
664 .get_checkpoint_contents(&checkpoint.content_digest)
665 .expect("read cannot fail")
666 .expect("Checkpoint contents should exist")
667 .iter()
668 .map(|digests| digests.transaction)
669 .collect();
670
671 let effects = self
672 .transaction_cache_reader
673 .notify_read_executed_effects(&all_tx_digests)
674 .await
675 .expect("Failed to get executed effects for finalizing checkpoint");
676
677 finalize_checkpoint(
678 &self.state,
679 self.object_cache_reader.as_ref(),
680 self.transaction_cache_reader.as_ref(),
681 self.checkpoint_store.clone(),
682 &all_tx_digests,
683 &epoch_store,
684 checkpoint.clone(),
685 self.accumulator.clone(),
686 effects,
687 self.config.data_ingestion_dir.clone(),
688 )
689 .await
690 .expect("Finalizing checkpoint cannot fail");
691
692 self.checkpoint_store
693 .insert_epoch_last_checkpoint(cur_epoch, checkpoint)
694 .expect("Failed to insert epoch last checkpoint");
695
696 self.accumulator
697 .accumulate_running_root(&epoch_store, checkpoint.sequence_number, None)
698 .await
699 .expect("Failed to accumulate running root");
700 self.accumulator
701 .accumulate_epoch(epoch_store.clone(), *checkpoint.sequence_number())
702 .await
703 .expect("Accumulating epoch cannot fail");
704
705 self.bump_highest_executed_checkpoint(checkpoint);
706
707 return true;
708 }
709 }
710 }
711 false
712 }
713}
714
715#[instrument(level = "debug", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
718async fn execute_checkpoint(
719 checkpoint: VerifiedCheckpoint,
720 state: &AuthorityState,
721 object_cache_reader: &dyn ObjectCacheRead,
722 transaction_cache_reader: &dyn TransactionCacheRead,
723 checkpoint_store: Arc<CheckpointStore>,
724 epoch_store: Arc<AuthorityPerEpochStore>,
725 transaction_manager: Arc<TransactionManager>,
726 accumulator: Arc<StateAccumulator>,
727 local_execution_timeout_sec: u64,
728 metrics: &Arc<CheckpointExecutorMetrics>,
729 data_ingestion_dir: Option<PathBuf>,
730) -> IotaResult<(Vec<TransactionDigest>, Option<Accumulator>)> {
731 debug!("Preparing checkpoint for execution",);
732 let prepare_start = Instant::now();
733
734 let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
741 get_unexecuted_transactions(
742 checkpoint.clone(),
743 transaction_cache_reader,
744 checkpoint_store.clone(),
745 epoch_store.clone(),
746 );
747
748 let tx_count = execution_digests.len();
749 debug!("Number of transactions in the checkpoint: {:?}", tx_count);
750 metrics.checkpoint_transaction_count.report(tx_count as u64);
751
752 let checkpoint_acc = execute_transactions(
753 execution_digests,
754 all_tx_digests.clone(),
755 executable_txns,
756 state,
757 object_cache_reader,
758 transaction_cache_reader,
759 checkpoint_store.clone(),
760 epoch_store.clone(),
761 transaction_manager,
762 accumulator,
763 local_execution_timeout_sec,
764 checkpoint,
765 metrics,
766 prepare_start,
767 data_ingestion_dir,
768 )
769 .await?;
770
771 if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
776 for round in randomness_rounds {
777 debug!(
778 ?round,
779 "notifying RandomnessReporter that randomness update was executed in checkpoint"
780 );
781 randomness_reporter.notify_randomness_in_checkpoint(round)?;
782 }
783 }
784
785 Ok((all_tx_digests, checkpoint_acc))
786}
787
788#[instrument(level = "error", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
789async fn handle_execution_effects(
790 state: &AuthorityState,
791 execution_digests: Vec<ExecutionDigests>,
792 all_tx_digests: Vec<TransactionDigest>,
793 checkpoint: VerifiedCheckpoint,
794 checkpoint_store: Arc<CheckpointStore>,
795 object_cache_reader: &dyn ObjectCacheRead,
796 transaction_cache_reader: &dyn TransactionCacheRead,
797 epoch_store: Arc<AuthorityPerEpochStore>,
798 transaction_manager: Arc<TransactionManager>,
799 accumulator: Arc<StateAccumulator>,
800 local_execution_timeout_sec: u64,
801 data_ingestion_dir: Option<PathBuf>,
802) -> Option<Accumulator> {
803 let mut periods = 1;
805 let log_timeout_sec = Duration::from_secs(local_execution_timeout_sec);
806 let mut blocking_execution = false;
808 loop {
809 let effects_future = transaction_cache_reader.notify_read_executed_effects(&all_tx_digests);
810
811 match timeout(log_timeout_sec, effects_future).await {
812 Err(_elapsed) => {
813 let highest_seq = checkpoint_store
815 .get_highest_executed_checkpoint_seq_number()
816 .unwrap()
817 .unwrap_or_default();
818 if checkpoint.sequence_number <= highest_seq {
819 error!(
820 "Re-executing checkpoint {} after higher checkpoint {} has executed!",
821 checkpoint.sequence_number, highest_seq
822 );
823 continue;
824 }
825 if checkpoint.sequence_number > highest_seq + 1 {
826 trace!(
827 "Checkpoint {} is still executing. Highest executed = {}",
828 checkpoint.sequence_number, highest_seq
829 );
830 continue;
831 }
832 if !blocking_execution {
833 trace!(
834 "Checkpoint {} is next to execute.",
835 checkpoint.sequence_number
836 );
837 blocking_execution = true;
838 continue;
839 }
840
841 let missing_digests: Vec<TransactionDigest> = transaction_cache_reader
844 .multi_get_executed_effects_digests(&all_tx_digests)
845 .expect("multi_get_executed_effects cannot fail")
846 .iter()
847 .zip(all_tx_digests.clone())
848 .filter_map(
849 |(fx, digest)| {
850 if fx.is_none() { Some(digest) } else { None }
851 },
852 )
853 .collect();
854
855 if missing_digests.is_empty() {
856 continue;
858 }
859
860 warn!(
861 "Transaction effects for checkpoint tx digests {:?} not present within {:?}. ",
862 missing_digests,
863 log_timeout_sec * periods,
864 );
865
866 let pending_digest = missing_digests.first().unwrap();
869 if let Some(missing_input) = transaction_manager.get_missing_input(pending_digest) {
870 warn!(
871 "Transaction {pending_digest:?} has missing input objects {missing_input:?}",
872 );
873 }
874 periods += 1;
875 }
876 Ok(Err(err)) => panic!("Failed to notify_read_executed_effects: {:?}", err),
877 Ok(Ok(effects)) => {
878 for (tx_digest, expected_digest, actual_effects) in
879 izip!(&all_tx_digests, &execution_digests, &effects)
880 {
881 let expected_effects_digest = &expected_digest.effects;
882 assert_not_forked(
883 &checkpoint,
884 tx_digest,
885 expected_effects_digest,
886 &actual_effects.digest(),
887 transaction_cache_reader,
888 );
889 }
890
891 if checkpoint.end_of_epoch_data.is_none() {
894 return Some(
895 finalize_checkpoint(
896 state,
897 object_cache_reader,
898 transaction_cache_reader,
899 checkpoint_store.clone(),
900 &all_tx_digests,
901 &epoch_store,
902 checkpoint.clone(),
903 accumulator.clone(),
904 effects,
905 data_ingestion_dir,
906 )
907 .await
908 .expect("Finalizing checkpoint cannot fail"),
909 );
910 } else {
911 return None;
912 }
913 }
914 }
915 }
916}
917
918fn assert_not_forked(
919 checkpoint: &VerifiedCheckpoint,
920 tx_digest: &TransactionDigest,
921 expected_digest: &TransactionEffectsDigest,
922 actual_effects_digest: &TransactionEffectsDigest,
923 cache_reader: &dyn TransactionCacheRead,
924) {
925 if *expected_digest != *actual_effects_digest {
926 let actual_effects = cache_reader
927 .get_executed_effects(tx_digest)
928 .expect("get_executed_effects cannot fail")
929 .expect("actual effects should exist");
930
931 error!(
933 ?checkpoint,
934 ?tx_digest,
935 ?expected_digest,
936 ?actual_effects,
937 "fork detected!"
938 );
939 panic!(
940 "When executing checkpoint {}, transaction {} \
941 is expected to have effects digest {}, but got {}!",
942 checkpoint.sequence_number(),
943 tx_digest,
944 expected_digest,
945 actual_effects_digest,
946 );
947 }
948}
949
950fn extract_end_of_epoch_tx(
952 checkpoint: &VerifiedCheckpoint,
953 cache_reader: &dyn TransactionCacheRead,
954 checkpoint_store: Arc<CheckpointStore>,
955 epoch_store: Arc<AuthorityPerEpochStore>,
956) -> Option<(ExecutionDigests, VerifiedExecutableTransaction)> {
957 checkpoint.end_of_epoch_data.as_ref()?;
958
959 let checkpoint_sequence = checkpoint.sequence_number();
963 let execution_digests = checkpoint_store
964 .get_checkpoint_contents(&checkpoint.content_digest)
965 .expect("Failed to get checkpoint contents from store")
966 .unwrap_or_else(|| {
967 panic!(
968 "Checkpoint contents for digest {:?} does not exist",
969 checkpoint.content_digest
970 )
971 })
972 .into_inner();
973
974 let digests = execution_digests
975 .last()
976 .expect("Final checkpoint must have at least one transaction");
977
978 let change_epoch_tx = cache_reader
979 .get_transaction_block(&digests.transaction)
980 .expect("read cannot fail");
981
982 let change_epoch_tx = VerifiedExecutableTransaction::new_from_checkpoint(
983 (*change_epoch_tx.unwrap_or_else(||
984 panic!(
985 "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
986 digests.transaction,
987 )
988 )).clone(),
989 epoch_store.epoch(),
990 *checkpoint_sequence,
991 );
992
993 assert!(
994 change_epoch_tx
995 .data()
996 .intent_message()
997 .value
998 .is_end_of_epoch_tx()
999 );
1000
1001 Some((*digests, change_epoch_tx))
1002}
1003
1004#[expect(clippy::type_complexity)]
1008fn get_unexecuted_transactions(
1009 checkpoint: VerifiedCheckpoint,
1010 cache_reader: &dyn TransactionCacheRead,
1011 checkpoint_store: Arc<CheckpointStore>,
1012 epoch_store: Arc<AuthorityPerEpochStore>,
1013) -> (
1014 Vec<ExecutionDigests>,
1015 Vec<TransactionDigest>,
1016 Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1017 Vec<RandomnessRound>,
1018) {
1019 let checkpoint_sequence = checkpoint.sequence_number();
1020 let full_contents = checkpoint_store
1021 .get_full_checkpoint_contents_by_sequence_number(*checkpoint_sequence)
1022 .expect("Failed to get checkpoint contents from store")
1023 .tap_some(|_| {
1024 debug!("loaded full checkpoint contents in bulk for sequence {checkpoint_sequence}")
1025 });
1026
1027 let mut execution_digests = checkpoint_store
1028 .get_checkpoint_contents(&checkpoint.content_digest)
1029 .expect("Failed to get checkpoint contents from store")
1030 .unwrap_or_else(|| {
1031 panic!(
1032 "Checkpoint contents for digest {:?} does not exist",
1033 checkpoint.content_digest
1034 )
1035 })
1036 .into_inner();
1037
1038 let full_contents_txns = full_contents.map(|c| {
1039 c.into_iter()
1040 .zip(execution_digests.iter())
1041 .map(|(txn, digests)| (digests.transaction, txn))
1042 .collect::<HashMap<_, _>>()
1043 });
1044
1045 checkpoint.end_of_epoch_data.as_ref().tap_some(|_| {
1048 let change_epoch_tx_digest = execution_digests
1049 .pop()
1050 .expect("Final checkpoint must have at least one transaction")
1051 .transaction;
1052
1053 let change_epoch_tx = cache_reader
1054 .get_transaction_block(&change_epoch_tx_digest)
1055 .expect("read cannot fail")
1056 .unwrap_or_else(||
1057 panic!(
1058 "state-sync should have ensured that transaction with digest {change_epoch_tx_digest:?} exists for checkpoint: {}",
1059 checkpoint.sequence_number()
1060 )
1061 );
1062 assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
1063 });
1064
1065 let randomness_rounds = if let Some(version_specific_data) = checkpoint
1066 .version_specific_data(epoch_store.protocol_config())
1067 .expect("unable to get version_specific_data")
1068 {
1069 version_specific_data.into_v1().randomness_rounds
1072 } else {
1073 assert_eq!(
1078 0,
1079 epoch_store
1080 .protocol_config()
1081 .min_checkpoint_interval_ms_as_option()
1082 .unwrap_or_default(),
1083 );
1084 if let Some(first_digest) = execution_digests.first() {
1085 let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
1086 .expect("read cannot fail")
1087 .unwrap_or_else(||
1088 panic!(
1089 "state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
1090 checkpoint.sequence_number()
1091 )
1092 );
1093 if let TransactionKind::RandomnessStateUpdate(rsu) =
1094 maybe_randomness_tx.data().transaction_data().kind()
1095 {
1096 vec![rsu.randomness_round]
1097 } else {
1098 Vec::new()
1099 }
1100 } else {
1101 Vec::new()
1102 }
1103 };
1104
1105 let all_tx_digests: Vec<TransactionDigest> =
1106 execution_digests.iter().map(|tx| tx.transaction).collect();
1107
1108 let executed_effects_digests = cache_reader
1109 .multi_get_executed_effects_digests(&all_tx_digests)
1110 .expect("failed to read executed_effects from store");
1111
1112 let (unexecuted_txns, expected_effects_digests): (Vec<_>, Vec<_>) =
1113 izip!(execution_digests.iter(), executed_effects_digests.iter())
1114 .filter_map(|(digests, effects_digest)| match effects_digest {
1115 None => Some((digests.transaction, digests.effects)),
1116 Some(actual_effects_digest) => {
1117 let tx_digest = &digests.transaction;
1118 let effects_digest = &digests.effects;
1119 trace!(
1120 "Transaction with digest {:?} has already been executed",
1121 tx_digest
1122 );
1123 assert_not_forked(
1124 &checkpoint,
1125 tx_digest,
1126 effects_digest,
1127 actual_effects_digest,
1128 cache_reader,
1129 );
1130 None
1131 }
1132 })
1133 .unzip();
1134
1135 let executable_txns: Vec<_> = if let Some(full_contents_txns) = full_contents_txns {
1137 unexecuted_txns
1138 .into_iter()
1139 .zip(expected_effects_digests)
1140 .map(|(tx_digest, expected_effects_digest)| {
1141 let tx = &full_contents_txns.get(&tx_digest).unwrap().transaction;
1142 (
1143 VerifiedExecutableTransaction::new_from_checkpoint(
1144 VerifiedTransaction::new_unchecked(tx.clone()),
1145 epoch_store.epoch(),
1146 *checkpoint_sequence,
1147 ),
1148 expected_effects_digest,
1149 )
1150 })
1151 .collect()
1152 } else {
1153 cache_reader
1154 .multi_get_transaction_blocks(&unexecuted_txns)
1155 .expect("Failed to get checkpoint txes from store")
1156 .into_iter()
1157 .zip(expected_effects_digests)
1158 .enumerate()
1159 .map(|(i, (tx, expected_effects_digest))| {
1160 let tx = tx.unwrap_or_else(||
1161 panic!(
1162 "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}",
1163 unexecuted_txns[i]
1164 )
1165 );
1166 assert!(!tx.data().intent_message().value.is_end_of_epoch_tx());
1168 (
1169 VerifiedExecutableTransaction::new_from_checkpoint(
1170 Arc::try_unwrap(tx).unwrap_or_else(|tx| (*tx).clone()),
1171 epoch_store.epoch(),
1172 *checkpoint_sequence,
1173 ),
1174 expected_effects_digest
1175 )
1176 })
1177 .collect()
1178 };
1179
1180 (
1181 execution_digests,
1182 all_tx_digests,
1183 executable_txns,
1184 randomness_rounds,
1185 )
1186}
1187
1188#[instrument(level = "debug", skip_all)]
1191async fn execute_transactions(
1192 execution_digests: Vec<ExecutionDigests>,
1193 all_tx_digests: Vec<TransactionDigest>,
1194 executable_txns: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
1195 state: &AuthorityState,
1196 object_cache_reader: &dyn ObjectCacheRead,
1197 transaction_cache_reader: &dyn TransactionCacheRead,
1198 checkpoint_store: Arc<CheckpointStore>,
1199 epoch_store: Arc<AuthorityPerEpochStore>,
1200 transaction_manager: Arc<TransactionManager>,
1201 state_accumulator: Arc<StateAccumulator>,
1202 local_execution_timeout_sec: u64,
1203 checkpoint: VerifiedCheckpoint,
1204 metrics: &Arc<CheckpointExecutorMetrics>,
1205 prepare_start: Instant,
1206 data_ingestion_dir: Option<PathBuf>,
1207) -> IotaResult<Option<Accumulator>> {
1208 let effects_digests: HashMap<_, _> = execution_digests
1209 .iter()
1210 .map(|digest| (digest.transaction, digest.effects))
1211 .collect();
1212
1213 let shared_effects_digests = executable_txns
1214 .iter()
1215 .filter(|(tx, _)| tx.contains_shared_object())
1216 .map(|(tx, _)| {
1217 *effects_digests
1218 .get(tx.digest())
1219 .expect("Transaction digest not found in effects_digests")
1220 })
1221 .collect::<Vec<_>>();
1222
1223 let digest_to_effects: HashMap<TransactionDigest, TransactionEffects> =
1224 transaction_cache_reader
1225 .multi_get_effects(&shared_effects_digests)?
1226 .into_iter()
1227 .zip(shared_effects_digests)
1228 .map(|(fx, fx_digest)| {
1229 if fx.is_none() {
1230 panic!(
1231 "Transaction effects for effects digest {:?} do not exist in effects table",
1232 fx_digest
1233 );
1234 }
1235 let fx = fx.unwrap();
1236 (*fx.transaction_digest(), fx)
1237 })
1238 .collect();
1239
1240 for (tx, _) in &executable_txns {
1241 if tx.contains_shared_object() {
1242 epoch_store
1243 .acquire_shared_locks_from_effects(
1244 tx,
1245 digest_to_effects.get(tx.digest()).unwrap(),
1246 object_cache_reader,
1247 )
1248 .await?;
1249 }
1250 }
1251
1252 let prepare_elapsed = prepare_start.elapsed();
1253 metrics
1254 .checkpoint_prepare_latency_us
1255 .report(prepare_elapsed.as_micros() as u64);
1256 if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1257 info!(
1258 "Checkpoint preparation for execution took {:?}",
1259 prepare_elapsed
1260 );
1261 }
1262
1263 let exec_start = Instant::now();
1264 transaction_manager.enqueue_with_expected_effects_digest(executable_txns.clone(), &epoch_store);
1265
1266 let checkpoint_acc = handle_execution_effects(
1267 state,
1268 execution_digests,
1269 all_tx_digests,
1270 checkpoint.clone(),
1271 checkpoint_store,
1272 object_cache_reader,
1273 transaction_cache_reader,
1274 epoch_store,
1275 transaction_manager,
1276 state_accumulator,
1277 local_execution_timeout_sec,
1278 data_ingestion_dir,
1279 )
1280 .await;
1281
1282 let exec_elapsed = exec_start.elapsed();
1283 metrics
1284 .checkpoint_exec_latency_us
1285 .report(exec_elapsed.as_micros() as u64);
1286 if checkpoint.sequence_number % CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL == 0 {
1287 info!("Checkpoint execution took {:?}", exec_elapsed);
1288 }
1289
1290 Ok(checkpoint_acc)
1291}
1292
1293#[instrument(level = "info", skip_all, fields(seq = ?checkpoint.sequence_number(), epoch = ?epoch_store.epoch()))]
1294async fn finalize_checkpoint(
1295 state: &AuthorityState,
1296 object_cache_reader: &dyn ObjectCacheRead,
1297 transaction_cache_reader: &dyn TransactionCacheRead,
1298 checkpoint_store: Arc<CheckpointStore>,
1299 tx_digests: &[TransactionDigest],
1300 epoch_store: &Arc<AuthorityPerEpochStore>,
1301 checkpoint: VerifiedCheckpoint,
1302 accumulator: Arc<StateAccumulator>,
1303 effects: Vec<TransactionEffects>,
1304 data_ingestion_dir: Option<PathBuf>,
1305) -> IotaResult<Accumulator> {
1306 debug!("finalizing checkpoint");
1307 epoch_store.insert_finalized_transactions(tx_digests, checkpoint.sequence_number)?;
1308
1309 state
1311 .get_checkpoint_cache()
1312 .insert_finalized_transactions_perpetual_checkpoints(
1313 tx_digests,
1314 epoch_store.epoch(),
1315 checkpoint.sequence_number,
1316 )?;
1317
1318 let checkpoint_acc =
1319 accumulator.accumulate_checkpoint(effects, checkpoint.sequence_number, epoch_store)?;
1320
1321 if data_ingestion_dir.is_some() || state.rest_index.is_some() {
1322 let checkpoint_data = load_checkpoint_data(
1323 checkpoint,
1324 object_cache_reader,
1325 transaction_cache_reader,
1326 checkpoint_store,
1327 tx_digests,
1328 )?;
1329
1330 if let Some(rest_index) = &state.rest_index {
1334 let mut layout_resolver = epoch_store.executor().type_layout_resolver(Box::new(
1335 PackageStoreWithFallback::new(state.get_backing_package_store(), &checkpoint_data),
1336 ));
1337
1338 rest_index.index_checkpoint(&checkpoint_data, layout_resolver.as_mut())?;
1339 }
1340
1341 if let Some(path) = data_ingestion_dir {
1342 store_checkpoint_locally(path, &checkpoint_data)?;
1343 }
1344 }
1345 Ok(checkpoint_acc)
1346}