iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::Duration,
17};
18
19use chrono::Utc;
20use diffy::create_patch;
21use iota_common::{debug_fatal, fatal};
22use iota_macros::fail_point;
23use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
24use iota_network::default_iota_network_config;
25use iota_protocol_config::ProtocolVersion;
26use iota_types::{
27    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
28    committee::StakeUnit,
29    crypto::AuthorityStrongQuorumSignInfo,
30    digests::{CheckpointContentsDigest, CheckpointDigest},
31    effects::{TransactionEffects, TransactionEffectsAPI},
32    error::{IotaError, IotaResult},
33    event::SystemEpochInfoEvent,
34    executable_transaction::VerifiedExecutableTransaction,
35    gas::GasCostSummary,
36    iota_system_state::{
37        IotaSystemState, IotaSystemStateTrait,
38        epoch_start_iota_system_state::EpochStartSystemStateTrait,
39    },
40    message_envelope::Message,
41    messages_checkpoint::{
42        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
43        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
44        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
45        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
46        VerifiedCheckpointContents,
47    },
48    messages_consensus::ConsensusTransactionKey,
49    signature::GenericSignature,
50    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
51};
52use itertools::Itertools;
53use nonempty::NonEmpty;
54use parking_lot::Mutex;
55use rand::{rngs::OsRng, seq::SliceRandom};
56use serde::{Deserialize, Serialize};
57use tokio::{
58    sync::{Notify, watch},
59    task::JoinSet,
60    time::timeout,
61};
62use tracing::{debug, error, info, instrument, trace, warn};
63use typed_store::{
64    DBMapUtils, Map, TypedStoreError,
65    rocks::{DBMap, MetricConf},
66    traits::{TableSummary, TypedStoreDebug},
67};
68
69pub use crate::checkpoints::{
70    checkpoint_output::{
71        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
72    },
73    metrics::CheckpointMetrics,
74};
75use crate::{
76    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
77    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
78    checkpoints::{
79        causal_order::CausalOrder,
80        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
81    },
82    consensus_handler::SequencedConsensusTransactionKey,
83    execution_cache::TransactionCacheRead,
84    stake_aggregator::{InsertResult, MultiStakeAggregator},
85    state_accumulator::StateAccumulator,
86};
87
88pub type CheckpointHeight = u64;
89
90pub struct EpochStats {
91    pub checkpoint_count: u64,
92    pub transaction_count: u64,
93    pub total_gas_reward: u64,
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub struct PendingCheckpointInfo {
98    pub timestamp_ms: CheckpointTimestamp,
99    pub last_of_epoch: bool,
100    pub checkpoint_height: CheckpointHeight,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum PendingCheckpoint {
105    // This is an enum for future upgradability, though at the moment there is only one variant.
106    V1(PendingCheckpointContentsV1),
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct PendingCheckpointContentsV1 {
111    pub roots: Vec<TransactionKey>,
112    pub details: PendingCheckpointInfo,
113}
114
115impl PendingCheckpoint {
116    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
117        match self {
118            PendingCheckpoint::V1(contents) => contents,
119        }
120    }
121
122    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
123        match self {
124            PendingCheckpoint::V1(contents) => contents,
125        }
126    }
127
128    pub fn roots(&self) -> &Vec<TransactionKey> {
129        &self.as_v1().roots
130    }
131
132    pub fn details(&self) -> &PendingCheckpointInfo {
133        &self.as_v1().details
134    }
135
136    pub fn height(&self) -> CheckpointHeight {
137        self.details().checkpoint_height
138    }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
142pub struct BuilderCheckpointSummary {
143    pub summary: CheckpointSummary,
144    // Height at which this checkpoint summary was built. None for genesis checkpoint
145    pub checkpoint_height: Option<CheckpointHeight>,
146    pub position_in_commit: usize,
147}
148
149#[derive(DBMapUtils)]
150pub struct CheckpointStore {
151    /// Maps checkpoint contents digest to checkpoint contents
152    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
153
154    /// Maps checkpoint contents digest to checkpoint sequence number
155    pub(crate) checkpoint_sequence_by_contents_digest:
156        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
157
158    /// Stores entire checkpoint contents from state sync, indexed by sequence
159    /// number, for efficient reads of full checkpoints. Entries from this
160    /// table are deleted after state accumulation has completed.
161    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
162
163    /// Stores certified checkpoints
164    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
165    /// Map from checkpoint digest to certified checkpoint
166    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
167
168    /// Store locally computed checkpoint summaries so that we can detect forks
169    /// and log useful information. Can be pruned as soon as we verify that
170    /// we are in agreement with the latest certified checkpoint.
171    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
172
173    /// A map from epoch ID to the sequence number of the last checkpoint in
174    /// that epoch.
175    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
176
177    /// Watermarks used to determine the highest verified, fully synced, and
178    /// fully executed checkpoints
179    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
180}
181
182impl CheckpointStore {
183    pub fn new(path: &Path) -> Arc<Self> {
184        Arc::new(Self::open_tables_read_write(
185            path.to_path_buf(),
186            MetricConf::new("checkpoint"),
187            None,
188            None,
189        ))
190    }
191
192    pub fn open_readonly(path: &Path) -> CheckpointStoreReadOnly {
193        Self::get_read_only_handle(
194            path.to_path_buf(),
195            None,
196            None,
197            MetricConf::new("checkpoint_readonly"),
198        )
199    }
200
201    #[instrument(level = "info", skip_all)]
202    pub fn insert_genesis_checkpoint(
203        &self,
204        checkpoint: VerifiedCheckpoint,
205        contents: CheckpointContents,
206        epoch_store: &AuthorityPerEpochStore,
207    ) {
208        assert_eq!(
209            checkpoint.epoch(),
210            0,
211            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
212        );
213        assert_eq!(
214            *checkpoint.sequence_number(),
215            0,
216            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
217        );
218
219        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
220        // already
221        if self
222            .get_checkpoint_by_digest(checkpoint.digest())
223            .unwrap()
224            .is_none()
225        {
226            if epoch_store.epoch() == checkpoint.epoch {
227                epoch_store
228                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
229                    .unwrap();
230            } else {
231                debug!(
232                    validator_epoch =% epoch_store.epoch(),
233                    genesis_epoch =% checkpoint.epoch(),
234                    "Not inserting checkpoint builder data for genesis checkpoint",
235                );
236            }
237            self.insert_checkpoint_contents(contents).unwrap();
238            self.insert_verified_checkpoint(&checkpoint).unwrap();
239            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
240        }
241    }
242
243    pub fn get_checkpoint_by_digest(
244        &self,
245        digest: &CheckpointDigest,
246    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
247        self.checkpoint_by_digest
248            .get(digest)
249            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
250    }
251
252    pub fn get_checkpoint_by_sequence_number(
253        &self,
254        sequence_number: CheckpointSequenceNumber,
255    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
256        self.certified_checkpoints
257            .get(&sequence_number)
258            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
259    }
260
261    pub fn get_locally_computed_checkpoint(
262        &self,
263        sequence_number: CheckpointSequenceNumber,
264    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
265        self.locally_computed_checkpoints.get(&sequence_number)
266    }
267
268    pub fn get_sequence_number_by_contents_digest(
269        &self,
270        digest: &CheckpointContentsDigest,
271    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
272        self.checkpoint_sequence_by_contents_digest.get(digest)
273    }
274
275    pub fn delete_contents_digest_sequence_number_mapping(
276        &self,
277        digest: &CheckpointContentsDigest,
278    ) -> Result<(), TypedStoreError> {
279        self.checkpoint_sequence_by_contents_digest.remove(digest)
280    }
281
282    pub fn get_latest_certified_checkpoint(&self) -> Option<VerifiedCheckpoint> {
283        self.certified_checkpoints
284            .unbounded_iter()
285            .skip_to_last()
286            .next()
287            .map(|(_, v)| v.into())
288    }
289
290    pub fn get_latest_locally_computed_checkpoint(&self) -> Option<CheckpointSummary> {
291        self.locally_computed_checkpoints
292            .unbounded_iter()
293            .skip_to_last()
294            .next()
295            .map(|(_, v)| v)
296    }
297
298    pub fn multi_get_checkpoint_by_sequence_number(
299        &self,
300        sequence_numbers: &[CheckpointSequenceNumber],
301    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
302        let checkpoints = self
303            .certified_checkpoints
304            .multi_get(sequence_numbers)?
305            .into_iter()
306            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
307            .collect();
308
309        Ok(checkpoints)
310    }
311
312    pub fn multi_get_checkpoint_content(
313        &self,
314        contents_digest: &[CheckpointContentsDigest],
315    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
316        self.checkpoint_content.multi_get(contents_digest)
317    }
318
319    pub fn get_highest_verified_checkpoint(
320        &self,
321    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
322        let highest_verified = if let Some(highest_verified) =
323            self.watermarks.get(&CheckpointWatermark::HighestVerified)?
324        {
325            highest_verified
326        } else {
327            return Ok(None);
328        };
329        self.get_checkpoint_by_digest(&highest_verified.1)
330    }
331
332    pub fn get_highest_synced_checkpoint(
333        &self,
334    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
335        let highest_synced = if let Some(highest_synced) =
336            self.watermarks.get(&CheckpointWatermark::HighestSynced)?
337        {
338            highest_synced
339        } else {
340            return Ok(None);
341        };
342        self.get_checkpoint_by_digest(&highest_synced.1)
343    }
344
345    pub fn get_highest_executed_checkpoint_seq_number(
346        &self,
347    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
348        if let Some(highest_executed) =
349            self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
350        {
351            Ok(Some(highest_executed.0))
352        } else {
353            Ok(None)
354        }
355    }
356
357    pub fn get_highest_executed_checkpoint(
358        &self,
359    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
360        let highest_executed = if let Some(highest_executed) =
361            self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
362        {
363            highest_executed
364        } else {
365            return Ok(None);
366        };
367        self.get_checkpoint_by_digest(&highest_executed.1)
368    }
369
370    pub fn get_highest_pruned_checkpoint_seq_number(
371        &self,
372    ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
373        Ok(self
374            .watermarks
375            .get(&CheckpointWatermark::HighestPruned)?
376            .unwrap_or_default()
377            .0)
378    }
379
380    pub fn get_checkpoint_contents(
381        &self,
382        digest: &CheckpointContentsDigest,
383    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
384        self.checkpoint_content.get(digest)
385    }
386
387    pub fn get_full_checkpoint_contents_by_sequence_number(
388        &self,
389        seq: CheckpointSequenceNumber,
390    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
391        self.full_checkpoint_content.get(&seq)
392    }
393
394    fn prune_local_summaries(&self) -> IotaResult {
395        if let Some((last_local_summary, _)) = self
396            .locally_computed_checkpoints
397            .unbounded_iter()
398            .skip_to_last()
399            .next()
400        {
401            let mut batch = self.locally_computed_checkpoints.batch();
402            batch.schedule_delete_range(
403                &self.locally_computed_checkpoints,
404                &0,
405                &last_local_summary,
406            )?;
407            batch.write()?;
408            info!("Pruned local summaries up to {:?}", last_local_summary);
409        }
410        Ok(())
411    }
412
413    fn check_for_checkpoint_fork(
414        &self,
415        local_checkpoint: &CheckpointSummary,
416        verified_checkpoint: &VerifiedCheckpoint,
417    ) {
418        if local_checkpoint != verified_checkpoint.data() {
419            let verified_contents = self
420                .get_checkpoint_contents(&verified_checkpoint.content_digest)
421                .map(|opt_contents| {
422                    opt_contents
423                        .map(|contents| format!("{contents:?}"))
424                        .unwrap_or_else(|| {
425                            format!(
426                                "Verified checkpoint contents not found, digest: {:?}",
427                                verified_checkpoint.content_digest,
428                            )
429                        })
430                })
431                .map_err(|e| {
432                    format!(
433                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
434                        verified_checkpoint.content_digest, e
435                    )
436                })
437                .unwrap_or_else(|err_msg| err_msg);
438
439            let local_contents = self
440                .get_checkpoint_contents(&local_checkpoint.content_digest)
441                .map(|opt_contents| {
442                    opt_contents
443                        .map(|contents| format!("{contents:?}"))
444                        .unwrap_or_else(|| {
445                            format!(
446                                "Local checkpoint contents not found, digest: {:?}",
447                                local_checkpoint.content_digest
448                            )
449                        })
450                })
451                .map_err(|e| {
452                    format!(
453                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
454                        local_checkpoint.content_digest, e
455                    )
456                })
457                .unwrap_or_else(|err_msg| err_msg);
458
459            // checkpoint contents may be too large for panic message.
460            error!(
461                verified_checkpoint = ?verified_checkpoint.data(),
462                ?verified_contents,
463                ?local_checkpoint,
464                ?local_contents,
465                "Local checkpoint fork detected!",
466            );
467            fatal!(
468                "Local checkpoint fork detected for sequence number: {}",
469                local_checkpoint.sequence_number()
470            );
471        }
472    }
473
474    // Called by consensus (ConsensusAggregator).
475    // Different from `insert_verified_checkpoint`, it does not touch
476    // the highest_verified_checkpoint watermark such that state sync
477    // will have a chance to process this checkpoint and perform some
478    // state-sync only things.
479    pub fn insert_certified_checkpoint(
480        &self,
481        checkpoint: &VerifiedCheckpoint,
482    ) -> Result<(), TypedStoreError> {
483        debug!(
484            checkpoint_seq = checkpoint.sequence_number(),
485            "Inserting certified checkpoint",
486        );
487        let mut batch = self.certified_checkpoints.batch();
488        batch
489            .insert_batch(
490                &self.certified_checkpoints,
491                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
492            )?
493            .insert_batch(
494                &self.checkpoint_by_digest,
495                [(checkpoint.digest(), checkpoint.serializable_ref())],
496            )?;
497        if checkpoint.next_epoch_committee().is_some() {
498            batch.insert_batch(
499                &self.epoch_last_checkpoint_map,
500                [(&checkpoint.epoch(), checkpoint.sequence_number())],
501            )?;
502        }
503        batch.write()?;
504
505        if let Some(local_checkpoint) = self
506            .locally_computed_checkpoints
507            .get(checkpoint.sequence_number())?
508        {
509            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
510        }
511
512        Ok(())
513    }
514
515    // Called by state sync, apart from inserting the checkpoint and updating
516    // related tables, it also bumps the highest_verified_checkpoint watermark.
517    #[instrument(level = "debug", skip_all)]
518    pub fn insert_verified_checkpoint(
519        &self,
520        checkpoint: &VerifiedCheckpoint,
521    ) -> Result<(), TypedStoreError> {
522        self.insert_certified_checkpoint(checkpoint)?;
523        self.update_highest_verified_checkpoint(checkpoint)
524    }
525
526    pub fn update_highest_verified_checkpoint(
527        &self,
528        checkpoint: &VerifiedCheckpoint,
529    ) -> Result<(), TypedStoreError> {
530        if Some(*checkpoint.sequence_number())
531            > self
532                .get_highest_verified_checkpoint()?
533                .map(|x| *x.sequence_number())
534        {
535            debug!(
536                checkpoint_seq = checkpoint.sequence_number(),
537                "Updating highest verified checkpoint",
538            );
539            self.watermarks.insert(
540                &CheckpointWatermark::HighestVerified,
541                &(*checkpoint.sequence_number(), *checkpoint.digest()),
542            )?;
543        }
544
545        Ok(())
546    }
547
548    pub fn update_highest_synced_checkpoint(
549        &self,
550        checkpoint: &VerifiedCheckpoint,
551    ) -> Result<(), TypedStoreError> {
552        debug!(
553            checkpoint_seq = checkpoint.sequence_number(),
554            "Updating highest synced checkpoint",
555        );
556        self.watermarks.insert(
557            &CheckpointWatermark::HighestSynced,
558            &(*checkpoint.sequence_number(), *checkpoint.digest()),
559        )
560    }
561
562    pub fn update_highest_executed_checkpoint(
563        &self,
564        checkpoint: &VerifiedCheckpoint,
565    ) -> Result<(), TypedStoreError> {
566        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
567            if seq_number >= *checkpoint.sequence_number() {
568                return Ok(());
569            }
570            assert_eq!(
571                seq_number + 1,
572                *checkpoint.sequence_number(),
573                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
574                checkpoint.sequence_number(),
575                seq_number
576            );
577        }
578        debug!(
579            checkpoint_seq = checkpoint.sequence_number(),
580            "Updating highest executed checkpoint",
581        );
582        self.watermarks.insert(
583            &CheckpointWatermark::HighestExecuted,
584            &(*checkpoint.sequence_number(), *checkpoint.digest()),
585        )
586    }
587
588    pub fn update_highest_pruned_checkpoint(
589        &self,
590        checkpoint: &VerifiedCheckpoint,
591    ) -> Result<(), TypedStoreError> {
592        self.watermarks.insert(
593            &CheckpointWatermark::HighestPruned,
594            &(*checkpoint.sequence_number(), *checkpoint.digest()),
595        )
596    }
597
598    /// Sets highest executed checkpoint to any value.
599    ///
600    /// WARNING: This method is very subtle and can corrupt the database if used
601    /// incorrectly. It should only be used in one-off cases or tests after
602    /// fully understanding the risk.
603    pub fn set_highest_executed_checkpoint_subtle(
604        &self,
605        checkpoint: &VerifiedCheckpoint,
606    ) -> Result<(), TypedStoreError> {
607        self.watermarks.insert(
608            &CheckpointWatermark::HighestExecuted,
609            &(*checkpoint.sequence_number(), *checkpoint.digest()),
610        )
611    }
612
613    pub fn insert_checkpoint_contents(
614        &self,
615        contents: CheckpointContents,
616    ) -> Result<(), TypedStoreError> {
617        debug!(
618            checkpoint_seq = ?contents.digest(),
619            "Inserting checkpoint contents",
620        );
621        self.checkpoint_content.insert(contents.digest(), &contents)
622    }
623
624    pub fn insert_verified_checkpoint_contents(
625        &self,
626        checkpoint: &VerifiedCheckpoint,
627        full_contents: VerifiedCheckpointContents,
628    ) -> Result<(), TypedStoreError> {
629        let mut batch = self.full_checkpoint_content.batch();
630        batch.insert_batch(
631            &self.checkpoint_sequence_by_contents_digest,
632            [(&checkpoint.content_digest, checkpoint.sequence_number())],
633        )?;
634        let full_contents = full_contents.into_inner();
635        batch.insert_batch(
636            &self.full_checkpoint_content,
637            [(checkpoint.sequence_number(), &full_contents)],
638        )?;
639
640        let contents = full_contents.into_checkpoint_contents();
641        assert_eq!(&checkpoint.content_digest, contents.digest());
642
643        batch.insert_batch(&self.checkpoint_content, [(contents.digest(), &contents)])?;
644
645        batch.write()
646    }
647
648    pub fn delete_full_checkpoint_contents(
649        &self,
650        seq: CheckpointSequenceNumber,
651    ) -> Result<(), TypedStoreError> {
652        self.full_checkpoint_content.remove(&seq)
653    }
654
655    pub fn get_epoch_last_checkpoint(
656        &self,
657        epoch_id: EpochId,
658    ) -> IotaResult<Option<VerifiedCheckpoint>> {
659        let seq = self.epoch_last_checkpoint_map.get(&epoch_id)?;
660        let checkpoint = match seq {
661            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
662            None => None,
663        };
664        Ok(checkpoint)
665    }
666
667    pub fn insert_epoch_last_checkpoint(
668        &self,
669        epoch_id: EpochId,
670        checkpoint: &VerifiedCheckpoint,
671    ) -> IotaResult {
672        self.epoch_last_checkpoint_map
673            .insert(&epoch_id, checkpoint.sequence_number())?;
674        Ok(())
675    }
676
677    pub fn get_epoch_state_commitments(
678        &self,
679        epoch: EpochId,
680    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
681        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
682            checkpoint
683                .end_of_epoch_data
684                .as_ref()
685                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
686                .epoch_commitments
687                .clone()
688        });
689        Ok(commitments)
690    }
691
692    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
693    /// statistics of the epoch.
694    pub fn get_epoch_stats(
695        &self,
696        epoch: EpochId,
697        last_checkpoint: &CheckpointSummary,
698    ) -> Option<EpochStats> {
699        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
700            (0, 0)
701        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
702            (
703                checkpoint.sequence_number + 1,
704                checkpoint.network_total_transactions,
705            )
706        } else {
707            return None;
708        };
709        Some(EpochStats {
710            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
711            transaction_count: last_checkpoint.network_total_transactions
712                - prev_epoch_network_transactions,
713            total_gas_reward: last_checkpoint
714                .epoch_rolling_gas_cost_summary
715                .computation_cost,
716        })
717    }
718
719    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
720        // This checkpoints the entire db and not one column family
721        self.checkpoint_content
722            .checkpoint_db(path)
723            .map_err(Into::into)
724    }
725
726    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
727        let mut wb = self.watermarks.batch();
728        wb.delete_batch(
729            &self.watermarks,
730            std::iter::once(CheckpointWatermark::HighestExecuted),
731        )?;
732        wb.write()?;
733        Ok(())
734    }
735
736    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
737        self.delete_highest_executed_checkpoint_test_only()?;
738        self.watermarks.rocksdb.flush()?;
739        Ok(())
740    }
741
742    /// Re-executes all transactions from all local, uncertified checkpoints for
743    /// crash recovery. All transactions thus re-executed are guaranteed to
744    /// not have any missing dependencies, because we start from the highest
745    /// executed checkpoint, and proceed through checkpoints in order.
746    #[instrument(level = "debug", skip_all)]
747    pub async fn reexecute_local_checkpoints(
748        &self,
749        state: &AuthorityState,
750        epoch_store: &AuthorityPerEpochStore,
751    ) {
752        info!("rexecuting locally computed checkpoints for crash recovery");
753        let epoch = epoch_store.epoch();
754        let highest_executed = self
755            .get_highest_executed_checkpoint_seq_number()
756            .expect("get_highest_executed_checkpoint_seq_number should not fail")
757            .unwrap_or(0);
758
759        let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
760            info!("no locally built checkpoints to verify");
761            return;
762        };
763
764        for seq in highest_executed + 1..=*highest_built.sequence_number() {
765            info!(?seq, "Re-executing locally computed checkpoint");
766            let Some(checkpoint) = self
767                .get_locally_computed_checkpoint(seq)
768                .expect("get_locally_computed_checkpoint should not fail")
769            else {
770                panic!("locally computed checkpoint {seq:?} not found");
771            };
772
773            let Some(contents) = self
774                .get_checkpoint_contents(&checkpoint.content_digest)
775                .expect("get_checkpoint_contents should not fail")
776            else {
777                panic!(
778                    "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
779                    seq, checkpoint.content_digest
780                );
781            };
782
783            let cache = state.get_transaction_cache_reader();
784
785            let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
786            let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
787            let txns = cache
788                .multi_get_transaction_blocks(&tx_digests)
789                .expect("multi_get_transaction_blocks should not fail");
790            for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
791                if tx.is_none() {
792                    panic!("transaction {digest:?} not found");
793                }
794            }
795
796            let txns: Vec<_> = txns
797                .into_iter()
798                .map(|tx| tx.unwrap())
799                .zip(fx_digests.into_iter())
800                // end of epoch transaction can only be executed by CheckpointExecutor
801                .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
802                .map(|(tx, fx)| {
803                    (
804                        VerifiedExecutableTransaction::new_from_checkpoint(
805                            (*tx).clone(),
806                            epoch,
807                            seq,
808                        ),
809                        fx,
810                    )
811                })
812                .collect();
813
814            let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
815
816            info!(
817                ?seq,
818                ?tx_digests,
819                "Re-executing transactions for locally built checkpoint"
820            );
821            // this will panic if any re-execution diverges from the previously recorded
822            // effects digest
823            state.enqueue_with_expected_effects_digest(txns, epoch_store);
824
825            // a task that logs every so often until it is cancelled
826            // This should normally finish very quickly, so seeing this log more than once
827            // or twice is likely a sign of a problem.
828            let waiting_logger = tokio::task::spawn(async move {
829                let mut interval = tokio::time::interval(Duration::from_secs(1));
830                loop {
831                    interval.tick().await;
832                    warn!(?seq, "Still waiting for re-execution to complete");
833                }
834            });
835
836            cache
837                .notify_read_executed_effects_digests(&tx_digests)
838                .await
839                .expect("notify_read_executed_effects_digests should not fail");
840
841            waiting_logger.abort();
842            waiting_logger.await.ok();
843            info!(?seq, "Re-execution completed for locally built checkpoint");
844        }
845
846        info!("Re-execution of locally built checkpoints completed");
847    }
848}
849
850#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
851pub enum CheckpointWatermark {
852    HighestVerified,
853    HighestSynced,
854    HighestExecuted,
855    HighestPruned,
856}
857
858pub struct CheckpointBuilder {
859    state: Arc<AuthorityState>,
860    tables: Arc<CheckpointStore>,
861    epoch_store: Arc<AuthorityPerEpochStore>,
862    notify: Arc<Notify>,
863    notify_aggregator: Arc<Notify>,
864    last_built: watch::Sender<CheckpointSequenceNumber>,
865    effects_store: Arc<dyn TransactionCacheRead>,
866    accumulator: Weak<StateAccumulator>,
867    output: Box<dyn CheckpointOutput>,
868    metrics: Arc<CheckpointMetrics>,
869    max_transactions_per_checkpoint: usize,
870    max_checkpoint_size_bytes: usize,
871}
872
873pub struct CheckpointAggregator {
874    tables: Arc<CheckpointStore>,
875    epoch_store: Arc<AuthorityPerEpochStore>,
876    notify: Arc<Notify>,
877    current: Option<CheckpointSignatureAggregator>,
878    output: Box<dyn CertifiedCheckpointOutput>,
879    state: Arc<AuthorityState>,
880    metrics: Arc<CheckpointMetrics>,
881}
882
883// This holds information to aggregate signatures for one checkpoint
884pub struct CheckpointSignatureAggregator {
885    next_index: u64,
886    summary: CheckpointSummary,
887    digest: CheckpointDigest,
888    /// Aggregates voting stake for each signed checkpoint proposal by authority
889    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
890    tables: Arc<CheckpointStore>,
891    state: Arc<AuthorityState>,
892    metrics: Arc<CheckpointMetrics>,
893}
894
895impl CheckpointBuilder {
896    fn new(
897        state: Arc<AuthorityState>,
898        tables: Arc<CheckpointStore>,
899        epoch_store: Arc<AuthorityPerEpochStore>,
900        notify: Arc<Notify>,
901        effects_store: Arc<dyn TransactionCacheRead>,
902        accumulator: Weak<StateAccumulator>,
903        output: Box<dyn CheckpointOutput>,
904        notify_aggregator: Arc<Notify>,
905        last_built: watch::Sender<CheckpointSequenceNumber>,
906        metrics: Arc<CheckpointMetrics>,
907        max_transactions_per_checkpoint: usize,
908        max_checkpoint_size_bytes: usize,
909    ) -> Self {
910        Self {
911            state,
912            tables,
913            epoch_store,
914            notify,
915            effects_store,
916            accumulator,
917            output,
918            notify_aggregator,
919            last_built,
920            metrics,
921            max_transactions_per_checkpoint,
922            max_checkpoint_size_bytes,
923        }
924    }
925
926    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
927    /// creation of checkpoints.
928    async fn run(mut self) {
929        info!("Starting CheckpointBuilder");
930        loop {
931            self.maybe_build_checkpoints().await;
932
933            self.notify.notified().await;
934        }
935    }
936
937    async fn maybe_build_checkpoints(&mut self) {
938        let _scope = monitored_scope("BuildCheckpoints");
939
940        // Collect info about the most recently built checkpoint.
941        let summary = self
942            .epoch_store
943            .last_built_checkpoint_builder_summary()
944            .expect("epoch should not have ended");
945        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
946        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
947
948        let min_checkpoint_interval_ms = self
949            .epoch_store
950            .protocol_config()
951            .min_checkpoint_interval_ms_as_option()
952            .unwrap_or_default();
953        let mut grouped_pending_checkpoints = Vec::new();
954        let mut checkpoints_iter = self
955            .epoch_store
956            .get_pending_checkpoints(last_height)
957            .expect("unexpected epoch store error")
958            .into_iter()
959            .peekable();
960        while let Some((height, pending)) = checkpoints_iter.next() {
961            // Group PendingCheckpoints until:
962            // - minimum interval has elapsed ...
963            let current_timestamp = pending.details().timestamp_ms;
964            let can_build = match last_timestamp {
965                    Some(last_timestamp) => {
966                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
967                    }
968                    None => true,
969                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
970                //   should be written separately) ...
971                } || checkpoints_iter
972                    .peek()
973                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
974                // - or, we have reached end of epoch.
975                    || pending.details().last_of_epoch;
976            grouped_pending_checkpoints.push(pending);
977            if !can_build {
978                debug!(
979                    checkpoint_commit_height = height,
980                    ?last_timestamp,
981                    ?current_timestamp,
982                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
983                );
984                continue;
985            }
986
987            // Min interval has elapsed, we can now coalesce and build a checkpoint.
988            last_height = Some(height);
989            last_timestamp = Some(current_timestamp);
990            debug!(
991                checkpoint_commit_height = height,
992                "Making checkpoint at commit height"
993            );
994
995            match self
996                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
997                .await
998            {
999                Ok(seq) => {
1000                    self.last_built.send_if_modified(|cur| {
1001                        if seq > *cur {
1002                            *cur = seq;
1003                            true
1004                        } else {
1005                            false
1006                        }
1007                    });
1008                }
1009                Err(e) => {
1010                    error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1011                    tokio::time::sleep(Duration::from_secs(1)).await;
1012                    self.metrics.checkpoint_errors.inc();
1013                    return;
1014                }
1015            }
1016            // Ensure that the task can be cancelled at end of epoch, even if no other await
1017            // yields execution.
1018            tokio::task::yield_now().await;
1019        }
1020        debug!(
1021            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1022            grouped_pending_checkpoints.len(),
1023        );
1024    }
1025
1026    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1027    async fn make_checkpoint(
1028        &self,
1029        pendings: Vec<PendingCheckpoint>,
1030    ) -> anyhow::Result<CheckpointSequenceNumber> {
1031        let last_details = pendings.last().unwrap().details().clone();
1032
1033        // Keeps track of the effects that are already included in the current
1034        // checkpoint. This is used when there are multiple pending checkpoints
1035        // to create a single checkpoint because in such scenarios, dependencies
1036        // of a transaction may in earlier created checkpoints, or in earlier
1037        // pending checkpoints.
1038        let mut effects_in_current_checkpoint = BTreeSet::new();
1039
1040        // Stores the transactions that should be included in the checkpoint.
1041        // Transactions will be recorded in the checkpoint in this order.
1042        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1043        for pending_checkpoint in pendings.into_iter() {
1044            let pending = pending_checkpoint.into_v1();
1045            let txn_in_checkpoint = self
1046                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1047                .await?;
1048            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1049        }
1050        let new_checkpoints = self
1051            .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1052            .await?;
1053        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1054        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1055            .await?;
1056        Ok(highest_sequence)
1057    }
1058
1059    // Given the root transactions of a pending checkpoint, resolve the transactions
1060    // should be included in the checkpoint, and return them in the order they
1061    // should be included in the checkpoint. `effects_in_current_checkpoint`
1062    // tracks the transactions that already exist in the current checkpoint.
1063    #[instrument(level = "debug", skip_all)]
1064    async fn resolve_checkpoint_transactions(
1065        &self,
1066        roots: Vec<TransactionKey>,
1067        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1068    ) -> IotaResult<Vec<TransactionEffects>> {
1069        self.metrics
1070            .checkpoint_roots_count
1071            .inc_by(roots.len() as u64);
1072
1073        let root_digests = self
1074            .epoch_store
1075            .notify_read_executed_digests(&roots)
1076            .in_monitored_scope("CheckpointNotifyDigests")
1077            .await?;
1078        let root_effects = self
1079            .effects_store
1080            .notify_read_executed_effects(&root_digests)
1081            .in_monitored_scope("CheckpointNotifyRead")
1082            .await?;
1083
1084        let _scope = monitored_scope("CheckpointBuilder");
1085
1086        let consensus_commit_prologue = {
1087            // If the roots contains consensus commit prologue transaction, we want to
1088            // extract it, and put it to the front of the checkpoint.
1089
1090            let consensus_commit_prologue = self
1091                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1092                .await?;
1093
1094            // Get the un-included dependencies of the consensus commit prologue. We should
1095            // expect no other dependencies that haven't been included in any
1096            // previous checkpoints.
1097            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1098                let unsorted_ccp = self.complete_checkpoint_effects(
1099                    vec![ccp_effects.clone()],
1100                    effects_in_current_checkpoint,
1101                )?;
1102
1103                // No other dependencies of this consensus commit prologue that haven't been
1104                // included in any previous checkpoint.
1105                assert_eq!(unsorted_ccp.len(), 1);
1106                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1107            }
1108            consensus_commit_prologue
1109        };
1110
1111        let unsorted =
1112            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1113
1114        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1115        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1116        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1117            #[cfg(debug_assertions)]
1118            {
1119                // When consensus_commit_prologue is extracted, it should not be included in the
1120                // `unsorted`.
1121                for tx in unsorted.iter() {
1122                    assert!(tx.transaction_digest() != &_ccp_digest);
1123                }
1124            }
1125            sorted.push(ccp_effects);
1126        }
1127        sorted.extend(CausalOrder::causal_sort(unsorted));
1128
1129        #[cfg(msim)]
1130        {
1131            // Check consensus commit prologue invariants in sim test.
1132            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1133        }
1134
1135        Ok(sorted)
1136    }
1137
1138    // This function is used to extract the consensus commit prologue digest and
1139    // effects from the root transactions.
1140    // The consensus commit prologue is expected to be the first transaction in the
1141    // roots.
1142    async fn extract_consensus_commit_prologue(
1143        &self,
1144        root_digests: &[TransactionDigest],
1145        root_effects: &[TransactionEffects],
1146    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1147        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1148        if root_digests.is_empty() {
1149            return Ok(None);
1150        }
1151
1152        // Reads the first transaction in the roots, and checks whether it is a
1153        // consensus commit prologue transaction.
1154        // The consensus commit prologue transaction should be the first transaction
1155        // in the roots written by the consensus handler.
1156        let first_tx = self
1157            .state
1158            .get_transaction_cache_reader()
1159            .get_transaction_block(&root_digests[0])?
1160            .expect("Transaction block must exist");
1161
1162        Ok(match first_tx.transaction_data().kind() {
1163            TransactionKind::ConsensusCommitPrologueV1(_) => {
1164                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1165                Some((*first_tx.digest(), root_effects[0].clone()))
1166            }
1167            _ => None,
1168        })
1169    }
1170
1171    /// Writes the new checkpoints to the DB storage and processes them.
1172    #[instrument(level = "debug", skip_all)]
1173    async fn write_checkpoints(
1174        &self,
1175        height: CheckpointHeight,
1176        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1177    ) -> IotaResult {
1178        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1179        let mut batch = self.tables.checkpoint_content.batch();
1180        let mut all_tx_digests =
1181            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1182
1183        for (summary, contents) in &new_checkpoints {
1184            debug!(
1185                checkpoint_commit_height = height,
1186                checkpoint_seq = summary.sequence_number,
1187                contents_digest = ?contents.digest(),
1188                "writing checkpoint",
1189            );
1190            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1191
1192            self.output
1193                .checkpoint_created(summary, contents, &self.epoch_store, &self.tables)
1194                .await?;
1195
1196            self.metrics
1197                .transactions_included_in_checkpoint
1198                .inc_by(contents.size() as u64);
1199            let sequence_number = summary.sequence_number;
1200            self.metrics
1201                .last_constructed_checkpoint
1202                .set(sequence_number as i64);
1203
1204            batch.insert_batch(
1205                &self.tables.checkpoint_content,
1206                [(contents.digest(), contents)],
1207            )?;
1208
1209            batch.insert_batch(
1210                &self.tables.locally_computed_checkpoints,
1211                [(sequence_number, summary)],
1212            )?;
1213        }
1214
1215        // Durably commit transactions (but not their outputs) to the database.
1216        // Called before writing a locally built checkpoint to the CheckpointStore, so
1217        // that the inputs of the checkpoint cannot be lost.
1218        // These transactions are guaranteed to be final unless this validator
1219        // forks (i.e. constructs a checkpoint which will never be certified). In this
1220        // case some non-final transactions could be left in the database.
1221        //
1222        // This is an intermediate solution until we delay commits to the epoch db.
1223        // After we have done that, crash recovery will be done by re-processing
1224        // consensus commits and pending_consensus_transactions, and this method
1225        // can be removed.
1226        self.state
1227            .get_cache_commit()
1228            .persist_transactions(&all_tx_digests)
1229            .await?;
1230
1231        batch.write()?;
1232
1233        for (local_checkpoint, _) in &new_checkpoints {
1234            if let Some(certified_checkpoint) = self
1235                .tables
1236                .certified_checkpoints
1237                .get(local_checkpoint.sequence_number())?
1238            {
1239                self.tables
1240                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1241            }
1242        }
1243
1244        self.notify_aggregator.notify_one();
1245        self.epoch_store
1246            .process_pending_checkpoint(height, new_checkpoints)?;
1247        Ok(())
1248    }
1249
1250    #[expect(clippy::type_complexity)]
1251    fn split_checkpoint_chunks(
1252        &self,
1253        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1254        signatures: Vec<Vec<GenericSignature>>,
1255    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1256        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1257        let mut chunks = Vec::new();
1258        let mut chunk = Vec::new();
1259        let mut chunk_size: usize = 0;
1260        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1261            .into_iter()
1262            .zip(signatures.into_iter())
1263        {
1264            // Roll over to a new chunk after either max count or max size is reached.
1265            // The size calculation here is intended to estimate the size of the
1266            // FullCheckpointContents struct. If this code is modified, that struct
1267            // should also be updated accordingly.
1268            let size = transaction_size
1269                + bcs::serialized_size(&effects)?
1270                + bcs::serialized_size(&signatures)?;
1271            if chunk.len() == self.max_transactions_per_checkpoint
1272                || (chunk_size + size) > self.max_checkpoint_size_bytes
1273            {
1274                if chunk.is_empty() {
1275                    // Always allow at least one tx in a checkpoint.
1276                    warn!(
1277                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1278                        self.max_checkpoint_size_bytes
1279                    );
1280                } else {
1281                    chunks.push(chunk);
1282                    chunk = Vec::new();
1283                    chunk_size = 0;
1284                }
1285            }
1286
1287            chunk.push((effects, signatures));
1288            chunk_size += size;
1289        }
1290
1291        if !chunk.is_empty() || chunks.is_empty() {
1292            // We intentionally create an empty checkpoint if there is no content provided
1293            // to make a 'heartbeat' checkpoint.
1294            // Important: if some conditions are added here later, we need to make sure we
1295            // always have at least one chunk if last_pending_of_epoch is set
1296            chunks.push(chunk);
1297            // Note: empty checkpoints are ok - they shouldn't happen at all on
1298            // a network with even modest load. Even if they do
1299            // happen, it is still useful as it allows fullnodes to
1300            // distinguish between "no transactions have happened" and "i am not
1301            // receiving new checkpoints".
1302        }
1303        Ok(chunks)
1304    }
1305
1306    /// Creates checkpoints using the provided transaction effects and pending
1307    /// checkpoint information.
1308    #[instrument(level = "debug", skip_all)]
1309    async fn create_checkpoints(
1310        &self,
1311        all_effects: Vec<TransactionEffects>,
1312        details: &PendingCheckpointInfo,
1313    ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1314        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1315        let total = all_effects.len();
1316        let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
1317        if last_checkpoint.is_none() {
1318            let epoch = self.epoch_store.epoch();
1319            if epoch > 0 {
1320                let previous_epoch = epoch - 1;
1321                let last_verified = self.tables.get_epoch_last_checkpoint(previous_epoch)?;
1322                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1323                if let Some((ref seq, _)) = last_checkpoint {
1324                    debug!(
1325                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1326                    );
1327                } else {
1328                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1329                    // via panic
1330                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1331                }
1332            }
1333        }
1334        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1335        debug!(
1336            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1337            checkpoint_timestamp = details.timestamp_ms,
1338            "Creating checkpoint(s) for {} transactions",
1339            all_effects.len(),
1340        );
1341
1342        let all_digests: Vec<_> = all_effects
1343            .iter()
1344            .map(|effect| *effect.transaction_digest())
1345            .collect();
1346        let transactions_and_sizes = self
1347            .state
1348            .get_transaction_cache_reader()
1349            .get_transactions_and_serialized_sizes(&all_digests)?;
1350        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1351        let mut transactions = Vec::with_capacity(all_effects.len());
1352        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1353        let mut randomness_rounds = BTreeMap::new();
1354        {
1355            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1356            debug!(
1357                ?last_checkpoint_seq,
1358                "Waiting for {:?} certificates to appear in consensus",
1359                all_effects.len()
1360            );
1361
1362            for (effects, transaction_and_size) in all_effects
1363                .into_iter()
1364                .zip(transactions_and_sizes.into_iter())
1365            {
1366                let (transaction, size) = transaction_and_size
1367                    .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1368                match transaction.inner().transaction_data().kind() {
1369                    TransactionKind::ConsensusCommitPrologueV1(_)
1370                    | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1371                        // ConsensusCommitPrologue and
1372                        // AuthenticatorStateUpdateV1
1373                        // are guaranteed to be
1374                        // processed before we reach here.
1375                    }
1376                    TransactionKind::RandomnessStateUpdate(rsu) => {
1377                        randomness_rounds
1378                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1379                    }
1380                    _ => {
1381                        // All other tx should be included in the call to
1382                        // `consensus_messages_processed_notify`.
1383                        transaction_keys.push(SequencedConsensusTransactionKey::External(
1384                            ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1385                        ));
1386                    }
1387                }
1388                transactions.push(transaction);
1389                all_effects_and_transaction_sizes.push((effects, size));
1390            }
1391
1392            self.epoch_store
1393                .consensus_messages_processed_notify(transaction_keys)
1394                .await?;
1395        }
1396
1397        let signatures = self
1398            .epoch_store
1399            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1400        debug!(
1401            ?last_checkpoint_seq,
1402            "Received {} checkpoint user signatures from consensus",
1403            signatures.len()
1404        );
1405
1406        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1407        let chunks_count = chunks.len();
1408
1409        let mut checkpoints = Vec::with_capacity(chunks_count);
1410        debug!(
1411            ?last_checkpoint_seq,
1412            "Creating {} checkpoints with {} transactions", chunks_count, total,
1413        );
1414
1415        let epoch = self.epoch_store.epoch();
1416        for (index, transactions) in chunks.into_iter().enumerate() {
1417            let first_checkpoint_of_epoch = index == 0
1418                && last_checkpoint
1419                    .as_ref()
1420                    .map(|(_, c)| c.epoch != epoch)
1421                    .unwrap_or(true);
1422            if first_checkpoint_of_epoch {
1423                self.epoch_store
1424                    .record_epoch_first_checkpoint_creation_time_metric();
1425            }
1426            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1427
1428            let sequence_number = last_checkpoint
1429                .as_ref()
1430                .map(|(_, c)| c.sequence_number + 1)
1431                .unwrap_or_default();
1432            let timestamp_ms = details.timestamp_ms;
1433            if let Some((_, last_checkpoint)) = &last_checkpoint {
1434                if last_checkpoint.timestamp_ms > timestamp_ms {
1435                    error!(
1436                        "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1437                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1438                    );
1439                }
1440            }
1441
1442            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1443            let epoch_rolling_gas_cost_summary =
1444                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1445
1446            let end_of_epoch_data = if last_checkpoint_of_epoch {
1447                let (system_state_obj, system_epoch_info_event) = self
1448                    .augment_epoch_last_checkpoint(
1449                        &epoch_rolling_gas_cost_summary,
1450                        timestamp_ms,
1451                        &mut effects,
1452                        &mut signatures,
1453                        sequence_number,
1454                    )
1455                    .await?;
1456
1457                // The system epoch info event can be `None` in case if the `advance_epoch`
1458                // Move function call failed and was executed in the safe mode.
1459                // In this case, the tokens supply should be unchanged.
1460                //
1461                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1462                // to those small numbers, no overflows will occur during conversion or
1463                // subtraction.
1464                let epoch_supply_change =
1465                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1466
1467                let committee = system_state_obj
1468                    .get_current_epoch_committee()
1469                    .committee()
1470                    .clone();
1471
1472                // This must happen after the call to augment_epoch_last_checkpoint,
1473                // otherwise we will not capture the change_epoch tx.
1474                let root_state_digest = {
1475                    let state_acc = self
1476                        .accumulator
1477                        .upgrade()
1478                        .expect("No checkpoints should be getting built after local configuration");
1479                    let acc = state_acc.accumulate_checkpoint(
1480                        effects.clone(),
1481                        sequence_number,
1482                        &self.epoch_store,
1483                    )?;
1484                    state_acc
1485                        .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
1486                        .await?;
1487                    state_acc
1488                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1489                        .await?
1490                };
1491                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1492                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1493
1494                let epoch_commitments = vec![root_state_digest.into()];
1495
1496                Some(EndOfEpochData {
1497                    next_epoch_committee: committee.voting_rights,
1498                    next_epoch_protocol_version: ProtocolVersion::new(
1499                        system_state_obj.protocol_version(),
1500                    ),
1501                    epoch_commitments,
1502                    epoch_supply_change,
1503                })
1504            } else {
1505                None
1506            };
1507            let contents = CheckpointContents::new_with_digests_and_signatures(
1508                effects.iter().map(TransactionEffects::execution_digests),
1509                signatures,
1510            );
1511
1512            let num_txns = contents.size() as u64;
1513
1514            let network_total_transactions = last_checkpoint
1515                .as_ref()
1516                .map(|(_, c)| c.network_total_transactions + num_txns)
1517                .unwrap_or(num_txns);
1518
1519            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1520
1521            let matching_randomness_rounds: Vec<_> = effects
1522                .iter()
1523                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1524                .copied()
1525                .collect();
1526
1527            let summary = CheckpointSummary::new(
1528                self.epoch_store.protocol_config(),
1529                epoch,
1530                sequence_number,
1531                network_total_transactions,
1532                &contents,
1533                previous_digest,
1534                epoch_rolling_gas_cost_summary,
1535                end_of_epoch_data,
1536                timestamp_ms,
1537                matching_randomness_rounds,
1538            );
1539            summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1540            if last_checkpoint_of_epoch {
1541                info!(
1542                    checkpoint_seq = sequence_number,
1543                    "creating last checkpoint of epoch {}", epoch
1544                );
1545                if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
1546                    self.epoch_store
1547                        .report_epoch_metrics_at_last_checkpoint(stats);
1548                }
1549            }
1550            last_checkpoint = Some((sequence_number, summary.clone()));
1551            checkpoints.push((summary, contents));
1552        }
1553
1554        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1555    }
1556
1557    fn get_epoch_total_gas_cost(
1558        &self,
1559        last_checkpoint: Option<&CheckpointSummary>,
1560        cur_checkpoint_effects: &[TransactionEffects],
1561    ) -> GasCostSummary {
1562        let (previous_epoch, previous_gas_costs) = last_checkpoint
1563            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1564            .unwrap_or_default();
1565        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1566        if previous_epoch == self.epoch_store.epoch() {
1567            // sum only when we are within the same epoch
1568            GasCostSummary::new(
1569                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1570                previous_gas_costs.computation_cost_burned
1571                    + current_gas_costs.computation_cost_burned,
1572                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1573                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1574                previous_gas_costs.non_refundable_storage_fee
1575                    + current_gas_costs.non_refundable_storage_fee,
1576            )
1577        } else {
1578            current_gas_costs
1579        }
1580    }
1581
1582    /// Augments the last checkpoint of the epoch by creating and executing an
1583    /// advance epoch transaction.
1584    #[instrument(level = "error", skip_all)]
1585    async fn augment_epoch_last_checkpoint(
1586        &self,
1587        epoch_total_gas_cost: &GasCostSummary,
1588        epoch_start_timestamp_ms: CheckpointTimestamp,
1589        checkpoint_effects: &mut Vec<TransactionEffects>,
1590        signatures: &mut Vec<Vec<GenericSignature>>,
1591        checkpoint: CheckpointSequenceNumber,
1592    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1593        let (system_state, system_epoch_info_event, effects) = self
1594            .state
1595            .create_and_execute_advance_epoch_tx(
1596                &self.epoch_store,
1597                epoch_total_gas_cost,
1598                checkpoint,
1599                epoch_start_timestamp_ms,
1600            )
1601            .await?;
1602        checkpoint_effects.push(effects);
1603        signatures.push(vec![]);
1604        Ok((system_state, system_epoch_info_event))
1605    }
1606
1607    /// For the given roots return complete list of effects to include in
1608    /// checkpoint This list includes the roots and all their dependencies,
1609    /// which are not part of checkpoint already. Note that this function
1610    /// may be called multiple times to construct the checkpoint.
1611    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1612    /// that are already included in the checkpoint. Txs in `roots` that
1613    /// need to be included in the checkpoint will be added to
1614    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1615    #[instrument(level = "debug", skip_all)]
1616    fn complete_checkpoint_effects(
1617        &self,
1618        mut roots: Vec<TransactionEffects>,
1619        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1620    ) -> IotaResult<Vec<TransactionEffects>> {
1621        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1622        let mut results = vec![];
1623        let mut seen = HashSet::new();
1624        loop {
1625            let mut pending = HashSet::new();
1626
1627            let transactions_included = self
1628                .epoch_store
1629                .builder_included_transactions_in_checkpoint(
1630                    roots.iter().map(|e| e.transaction_digest()),
1631                )?;
1632
1633            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1634                let digest = effect.transaction_digest();
1635                // Unnecessary to read effects of a dependency if the effect is already
1636                // processed.
1637                seen.insert(*digest);
1638
1639                // Skip roots that are already included in the checkpoint.
1640                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1641                    continue;
1642                }
1643
1644                // Skip roots already included in checkpoints or roots from previous epochs
1645                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1646                    continue;
1647                }
1648
1649                let existing_effects = self
1650                    .epoch_store
1651                    .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1652
1653                for (dependency, effects_signature_exists) in
1654                    effect.dependencies().iter().zip(existing_effects.iter())
1655                {
1656                    // Skip here if dependency not executed in the current epoch.
1657                    // Note that the existence of an effects signature in the
1658                    // epoch store for the given digest indicates that the transaction
1659                    // was locally executed in the current epoch
1660                    if !effects_signature_exists {
1661                        continue;
1662                    }
1663                    if seen.insert(*dependency) {
1664                        pending.insert(*dependency);
1665                    }
1666                }
1667                results.push(effect);
1668            }
1669            if pending.is_empty() {
1670                break;
1671            }
1672            let pending = pending.into_iter().collect::<Vec<_>>();
1673            let effects = self.effects_store.multi_get_executed_effects(&pending)?;
1674            let effects = effects
1675                .into_iter()
1676                .zip(pending)
1677                .map(|(opt, digest)| match opt {
1678                    Some(x) => x,
1679                    None => panic!(
1680                        "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1681                    ),
1682                })
1683                .collect::<Vec<_>>();
1684            roots = effects;
1685        }
1686
1687        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1688        Ok(results)
1689    }
1690
1691    // This function is used to check the invariants of the consensus commit
1692    // prologue transactions in the checkpoint in simtest.
1693    #[cfg(msim)]
1694    fn expensive_consensus_commit_prologue_invariants_check(
1695        &self,
1696        root_digests: &[TransactionDigest],
1697        sorted: &[TransactionEffects],
1698    ) {
1699        // Gets all the consensus commit prologue transactions from the roots.
1700        let root_txs = self
1701            .state
1702            .get_transaction_cache_reader()
1703            .multi_get_transaction_blocks(root_digests)
1704            .unwrap();
1705        let ccps = root_txs
1706            .iter()
1707            .filter_map(|tx| {
1708                tx.as_ref().filter(|tx| {
1709                    matches!(
1710                        tx.transaction_data().kind(),
1711                        TransactionKind::ConsensusCommitPrologueV1(_)
1712                    )
1713                })
1714            })
1715            .collect::<Vec<_>>();
1716
1717        // There should be at most one consensus commit prologue transaction in the
1718        // roots.
1719        assert!(ccps.len() <= 1);
1720
1721        // Get all the transactions in the checkpoint.
1722        let txs = self
1723            .state
1724            .get_transaction_cache_reader()
1725            .multi_get_transaction_blocks(
1726                &sorted
1727                    .iter()
1728                    .map(|tx| *tx.transaction_digest())
1729                    .collect::<Vec<_>>(),
1730            )
1731            .unwrap();
1732
1733        if ccps.is_empty() {
1734            // If there is no consensus commit prologue transaction in the roots, then there
1735            // should be no consensus commit prologue transaction in the
1736            // checkpoint.
1737            for tx in txs.iter().flatten() {
1738                assert!(!matches!(
1739                    tx.transaction_data().kind(),
1740                    TransactionKind::ConsensusCommitPrologueV1(_)
1741                ));
1742            }
1743        } else {
1744            // If there is one consensus commit prologue, it must be the first one in the
1745            // checkpoint.
1746            assert!(matches!(
1747                txs[0].as_ref().unwrap().transaction_data().kind(),
1748                TransactionKind::ConsensusCommitPrologueV1(_)
1749            ));
1750
1751            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1752
1753            for tx in txs.iter().skip(1).flatten() {
1754                assert!(!matches!(
1755                    tx.transaction_data().kind(),
1756                    TransactionKind::ConsensusCommitPrologueV1(_)
1757                ));
1758            }
1759        }
1760    }
1761}
1762
1763impl CheckpointAggregator {
1764    fn new(
1765        tables: Arc<CheckpointStore>,
1766        epoch_store: Arc<AuthorityPerEpochStore>,
1767        notify: Arc<Notify>,
1768        output: Box<dyn CertifiedCheckpointOutput>,
1769        state: Arc<AuthorityState>,
1770        metrics: Arc<CheckpointMetrics>,
1771    ) -> Self {
1772        let current = None;
1773        Self {
1774            tables,
1775            epoch_store,
1776            notify,
1777            current,
1778            output,
1779            state,
1780            metrics,
1781        }
1782    }
1783
1784    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1785    /// aggregation of checkpoints.
1786    /// The function ensures continuous aggregation of checkpoints, handling
1787    /// errors and retries gracefully, and allowing for proper shutdown on
1788    /// receiving an exit signal.
1789    async fn run(mut self) {
1790        info!("Starting CheckpointAggregator");
1791        loop {
1792            if let Err(e) = self.run_and_notify().await {
1793                error!(
1794                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1795                    e
1796                );
1797                self.metrics.checkpoint_errors.inc();
1798                tokio::time::sleep(Duration::from_secs(1)).await;
1799                continue;
1800            }
1801
1802            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1803        }
1804    }
1805
1806    async fn run_and_notify(&mut self) -> IotaResult {
1807        let summaries = self.run_inner()?;
1808        for summary in summaries {
1809            self.output.certified_checkpoint_created(&summary).await?;
1810        }
1811        Ok(())
1812    }
1813
1814    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1815        let _scope = monitored_scope("CheckpointAggregator");
1816        let mut result = vec![];
1817        'outer: loop {
1818            let next_to_certify = self.next_checkpoint_to_certify();
1819            let current = if let Some(current) = &mut self.current {
1820                // It's possible that the checkpoint was already certified by
1821                // the rest of the network and we've already received the
1822                // certified checkpoint via StateSync. In this case, we reset
1823                // the current signature aggregator to the next checkpoint to
1824                // be certified
1825                if current.summary.sequence_number < next_to_certify {
1826                    self.current = None;
1827                    continue;
1828                }
1829                current
1830            } else {
1831                let Some(summary) = self
1832                    .epoch_store
1833                    .get_built_checkpoint_summary(next_to_certify)?
1834                else {
1835                    return Ok(result);
1836                };
1837                self.current = Some(CheckpointSignatureAggregator {
1838                    next_index: 0,
1839                    digest: summary.digest(),
1840                    summary,
1841                    signatures_by_digest: MultiStakeAggregator::new(
1842                        self.epoch_store.committee().clone(),
1843                    ),
1844                    tables: self.tables.clone(),
1845                    state: self.state.clone(),
1846                    metrics: self.metrics.clone(),
1847                });
1848                self.current.as_mut().unwrap()
1849            };
1850
1851            let epoch_tables = self
1852                .epoch_store
1853                .tables()
1854                .expect("should not run past end of epoch");
1855            let iter = epoch_tables.get_pending_checkpoint_signatures_iter(
1856                current.summary.sequence_number,
1857                current.next_index,
1858            )?;
1859            for ((seq, index), data) in iter {
1860                if seq != current.summary.sequence_number {
1861                    trace!(
1862                        checkpoint_seq =? current.summary.sequence_number,
1863                        "Not enough checkpoint signatures",
1864                    );
1865                    // No more signatures (yet) for this checkpoint
1866                    return Ok(result);
1867                }
1868                trace!(
1869                    checkpoint_seq = current.summary.sequence_number,
1870                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
1871                    current.summary.digest(),
1872                    data.summary.auth_sig().authority.concise()
1873                );
1874                self.metrics
1875                    .checkpoint_participation
1876                    .with_label_values(&[&format!(
1877                        "{:?}",
1878                        data.summary.auth_sig().authority.concise()
1879                    )])
1880                    .inc();
1881                if let Ok(auth_signature) = current.try_aggregate(data) {
1882                    debug!(
1883                        checkpoint_seq = current.summary.sequence_number,
1884                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
1885                        current.summary.digest(),
1886                    );
1887                    let summary = VerifiedCheckpoint::new_unchecked(
1888                        CertifiedCheckpointSummary::new_from_data_and_sig(
1889                            current.summary.clone(),
1890                            auth_signature,
1891                        ),
1892                    );
1893
1894                    self.tables.insert_certified_checkpoint(&summary)?;
1895                    self.metrics
1896                        .last_certified_checkpoint
1897                        .set(current.summary.sequence_number as i64);
1898                    current
1899                        .summary
1900                        .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
1901                    result.push(summary.into_inner());
1902                    self.current = None;
1903                    continue 'outer;
1904                } else {
1905                    current.next_index = index + 1;
1906                }
1907            }
1908            break;
1909        }
1910        Ok(result)
1911    }
1912
1913    fn next_checkpoint_to_certify(&self) -> CheckpointSequenceNumber {
1914        self.tables
1915            .certified_checkpoints
1916            .unbounded_iter()
1917            .skip_to_last()
1918            .next()
1919            .map(|(seq, _)| seq + 1)
1920            .unwrap_or_default()
1921    }
1922}
1923
1924impl CheckpointSignatureAggregator {
1925    #[expect(clippy::result_unit_err)]
1926    pub fn try_aggregate(
1927        &mut self,
1928        data: CheckpointSignatureMessage,
1929    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
1930        let their_digest = *data.summary.digest();
1931        let (_, signature) = data.summary.into_data_and_sig();
1932        let author = signature.authority;
1933        let envelope =
1934            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
1935        match self.signatures_by_digest.insert(their_digest, envelope) {
1936            // ignore repeated signatures
1937            InsertResult::Failed {
1938                error:
1939                    IotaError::StakeAggregatorRepeatedSigner {
1940                        conflicting_sig: false,
1941                        ..
1942                    },
1943            } => Err(()),
1944            InsertResult::Failed { error } => {
1945                warn!(
1946                    checkpoint_seq = self.summary.sequence_number,
1947                    "Failed to aggregate new signature from validator {:?}: {:?}",
1948                    author.concise(),
1949                    error
1950                );
1951                self.check_for_split_brain();
1952                Err(())
1953            }
1954            InsertResult::QuorumReached(cert) => {
1955                // It is not guaranteed that signature.authority == consensus_cert.author, but
1956                // we do verify the signature so we know that the author signed
1957                // the message at some point.
1958                if their_digest != self.digest {
1959                    self.metrics.remote_checkpoint_forks.inc();
1960                    warn!(
1961                        checkpoint_seq = self.summary.sequence_number,
1962                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
1963                        author.concise(),
1964                        their_digest,
1965                        self.digest
1966                    );
1967                    return Err(());
1968                }
1969                Ok(cert)
1970            }
1971            InsertResult::NotEnoughVotes {
1972                bad_votes: _,
1973                bad_authorities: _,
1974            } => {
1975                self.check_for_split_brain();
1976                Err(())
1977            }
1978        }
1979    }
1980
1981    /// Check if there is a split brain condition in checkpoint signature
1982    /// aggregation, defined as any state wherein it is no longer possible
1983    /// to achieve quorum on a checkpoint proposal, irrespective of the
1984    /// outcome of any outstanding votes.
1985    fn check_for_split_brain(&self) {
1986        debug!(
1987            checkpoint_seq = self.summary.sequence_number,
1988            "Checking for split brain condition"
1989        );
1990        if self.signatures_by_digest.quorum_unreachable() {
1991            // TODO: at this point we should immediately halt processing
1992            // of new transaction certificates to avoid building on top of
1993            // forked output
1994            // self.halt_all_execution();
1995
1996            let digests_by_stake_messages = self
1997                .signatures_by_digest
1998                .get_all_unique_values()
1999                .into_iter()
2000                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2001                .map(|(digest, (_authorities, total_stake))| {
2002                    format!("{digest:?} (total stake: {total_stake})")
2003                })
2004                .collect::<Vec<String>>();
2005            error!(
2006                checkpoint_seq = self.summary.sequence_number,
2007                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2008                self.signatures_by_digest.uncommitted_stake(),
2009                digests_by_stake_messages,
2010            );
2011            self.metrics.split_brain_checkpoint_forks.inc();
2012
2013            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2014            let local_summary = self.summary.clone();
2015            let state = self.state.clone();
2016            let tables = self.tables.clone();
2017
2018            tokio::spawn(async move {
2019                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2020            });
2021        }
2022    }
2023}
2024
2025/// Create data dump containing relevant data for diagnosing cause of the
2026/// split brain by querying one disagreeing validator for full checkpoint
2027/// contents. To minimize peer chatter, we only query one validator at random
2028/// from each disagreeing faction, as all honest validators that participated in
2029/// this round may inevitably run the same process.
2030async fn diagnose_split_brain(
2031    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2032    local_summary: CheckpointSummary,
2033    state: Arc<AuthorityState>,
2034    tables: Arc<CheckpointStore>,
2035) {
2036    debug!(
2037        checkpoint_seq = local_summary.sequence_number,
2038        "Running split brain diagnostics..."
2039    );
2040    let time = Utc::now();
2041    // collect one random disagreeing validator per differing digest
2042    let digest_to_validator = all_unique_values
2043        .iter()
2044        .filter_map(|(digest, (validators, _))| {
2045            if *digest != local_summary.digest() {
2046                let random_validator = validators.choose(&mut OsRng).unwrap();
2047                Some((*digest, *random_validator))
2048            } else {
2049                None
2050            }
2051        })
2052        .collect::<HashMap<_, _>>();
2053    if digest_to_validator.is_empty() {
2054        panic!(
2055            "Given split brain condition, there should be at \
2056                least one validator that disagrees with local signature"
2057        );
2058    }
2059
2060    let epoch_store = state.load_epoch_store_one_call_per_task();
2061    let committee = epoch_store
2062        .epoch_start_state()
2063        .get_iota_committee_with_network_metadata();
2064    let network_config = default_iota_network_config();
2065    let network_clients =
2066        make_network_authority_clients_with_network_config(&committee, &network_config);
2067
2068    // Query all disagreeing validators
2069    let response_futures = digest_to_validator
2070        .values()
2071        .cloned()
2072        .map(|validator| {
2073            let client = network_clients
2074                .get(&validator)
2075                .expect("Failed to get network client");
2076            let request = CheckpointRequest {
2077                sequence_number: Some(local_summary.sequence_number),
2078                request_content: true,
2079                certified: false,
2080            };
2081            client.handle_checkpoint(request)
2082        })
2083        .collect::<Vec<_>>();
2084
2085    let digest_name_pair = digest_to_validator.iter();
2086    let response_data = futures::future::join_all(response_futures)
2087        .await
2088        .into_iter()
2089        .zip(digest_name_pair)
2090        .filter_map(|(response, (digest, name))| match response {
2091            Ok(response) => match response {
2092                CheckpointResponse {
2093                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2094                    contents: Some(contents),
2095                } => Some((*name, *digest, summary, contents)),
2096                CheckpointResponse {
2097                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2098                    contents: _,
2099                } => {
2100                    panic!("Expected pending checkpoint, but got certified checkpoint");
2101                }
2102                CheckpointResponse {
2103                    checkpoint: None,
2104                    contents: _,
2105                } => {
2106                    error!(
2107                        "Summary for checkpoint {:?} not found on validator {:?}",
2108                        local_summary.sequence_number, name
2109                    );
2110                    None
2111                }
2112                CheckpointResponse {
2113                    checkpoint: _,
2114                    contents: None,
2115                } => {
2116                    error!(
2117                        "Contents for checkpoint {:?} not found on validator {:?}",
2118                        local_summary.sequence_number, name
2119                    );
2120                    None
2121                }
2122            },
2123            Err(e) => {
2124                error!(
2125                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2126                    e
2127                );
2128                None
2129            }
2130        })
2131        .collect::<Vec<_>>();
2132
2133    let local_checkpoint_contents = tables
2134        .get_checkpoint_contents(&local_summary.content_digest)
2135        .unwrap_or_else(|_| {
2136            panic!(
2137                "Could not find checkpoint contents for digest {:?}",
2138                local_summary.digest()
2139            )
2140        })
2141        .unwrap_or_else(|| {
2142            panic!(
2143                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2144                local_summary.sequence_number,
2145                local_summary.digest()
2146            )
2147        });
2148    let local_contents_text = format!("{local_checkpoint_contents:?}");
2149
2150    let local_summary_text = format!("{local_summary:?}");
2151    let local_validator = state.name.concise();
2152    let diff_patches = response_data
2153        .iter()
2154        .map(|(name, other_digest, other_summary, contents)| {
2155            let other_contents_text = format!("{contents:?}");
2156            let other_summary_text = format!("{other_summary:?}");
2157            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2158                .enumerate_transactions(&local_summary)
2159                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2160                .unzip();
2161            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2162                .enumerate_transactions(other_summary)
2163                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2164                .unzip();
2165            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2166            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2167            let local_transactions_text = format!("{local_transactions:#?}");
2168            let other_transactions_text = format!("{other_transactions:#?}");
2169            let transactions_patch =
2170                create_patch(&local_transactions_text, &other_transactions_text);
2171            let local_effects_text = format!("{local_effects:#?}");
2172            let other_effects_text = format!("{other_effects:#?}");
2173            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2174            let seq_number = local_summary.sequence_number;
2175            let local_digest = local_summary.digest();
2176            let other_validator = name.concise();
2177            format!(
2178                "Checkpoint: {seq_number:?}\n\
2179                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2180                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2181                Summary Diff: \n{summary_patch}\n\n\
2182                Contents Diff: \n{contents_patch}\n\n\
2183                Transactions Diff: \n{transactions_patch}\n\n\
2184                Effects Diff: \n{effects_patch}",
2185            )
2186        })
2187        .collect::<Vec<_>>()
2188        .join("\n\n\n");
2189
2190    let header = format!(
2191        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2192        Datetime: {time}",
2193    );
2194    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2195    let path = tempfile::tempdir()
2196        .expect("Failed to create tempdir")
2197        .into_path()
2198        .join(Path::new("checkpoint_fork_dump.txt"));
2199    let mut file = File::create(path).unwrap();
2200    write!(file, "{fork_logs_text}").unwrap();
2201    debug!("{}", fork_logs_text);
2202
2203    fail_point!("split_brain_reached");
2204}
2205
2206pub trait CheckpointServiceNotify {
2207    fn notify_checkpoint_signature(
2208        &self,
2209        epoch_store: &AuthorityPerEpochStore,
2210        info: &CheckpointSignatureMessage,
2211    ) -> IotaResult;
2212
2213    fn notify_checkpoint(&self) -> IotaResult;
2214}
2215
2216enum CheckpointServiceState {
2217    Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2218    Started,
2219}
2220
2221impl CheckpointServiceState {
2222    fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2223        let mut state = CheckpointServiceState::Started;
2224        std::mem::swap(self, &mut state);
2225
2226        match state {
2227            CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2228            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2229        }
2230    }
2231}
2232
2233pub struct CheckpointService {
2234    tables: Arc<CheckpointStore>,
2235    notify_builder: Arc<Notify>,
2236    notify_aggregator: Arc<Notify>,
2237    last_signature_index: Mutex<u64>,
2238    // A notification for the current highest built sequence number.
2239    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2240    // The highest sequence number that had already been built at the time CheckpointService
2241    // was constructed
2242    highest_previously_built_seq: CheckpointSequenceNumber,
2243    metrics: Arc<CheckpointMetrics>,
2244    state: Mutex<CheckpointServiceState>,
2245}
2246
2247impl CheckpointService {
2248    /// Spawns the checkpoint service, initializing and starting the checkpoint
2249    /// builder and aggregator tasks.
2250    /// Constructs a new CheckpointService in an un-started state.
2251    pub fn build(
2252        state: Arc<AuthorityState>,
2253        checkpoint_store: Arc<CheckpointStore>,
2254        epoch_store: Arc<AuthorityPerEpochStore>,
2255        effects_store: Arc<dyn TransactionCacheRead>,
2256        accumulator: Weak<StateAccumulator>,
2257        checkpoint_output: Box<dyn CheckpointOutput>,
2258        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2259        metrics: Arc<CheckpointMetrics>,
2260        max_transactions_per_checkpoint: usize,
2261        max_checkpoint_size_bytes: usize,
2262    ) -> Arc<Self> {
2263        info!(
2264            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2265        );
2266        let notify_builder = Arc::new(Notify::new());
2267        let notify_aggregator = Arc::new(Notify::new());
2268
2269        let highest_previously_built_seq = epoch_store
2270            .last_built_checkpoint_builder_summary()
2271            .expect("epoch should not have ended")
2272            .map(|s| s.summary.sequence_number)
2273            .unwrap_or(0);
2274
2275        let (highest_currently_built_seq_tx, _) = watch::channel(highest_previously_built_seq);
2276
2277        let aggregator = CheckpointAggregator::new(
2278            checkpoint_store.clone(),
2279            epoch_store.clone(),
2280            notify_aggregator.clone(),
2281            certified_checkpoint_output,
2282            state.clone(),
2283            metrics.clone(),
2284        );
2285
2286        let builder = CheckpointBuilder::new(
2287            state.clone(),
2288            checkpoint_store.clone(),
2289            epoch_store.clone(),
2290            notify_builder.clone(),
2291            effects_store,
2292            accumulator,
2293            checkpoint_output,
2294            notify_aggregator.clone(),
2295            highest_currently_built_seq_tx.clone(),
2296            metrics.clone(),
2297            max_transactions_per_checkpoint,
2298            max_checkpoint_size_bytes,
2299        );
2300
2301        let last_signature_index = epoch_store
2302            .get_last_checkpoint_signature_index()
2303            .expect("should not cross end of epoch");
2304        let last_signature_index = Mutex::new(last_signature_index);
2305
2306        Arc::new(Self {
2307            tables: checkpoint_store,
2308            notify_builder,
2309            notify_aggregator,
2310            last_signature_index,
2311            highest_currently_built_seq_tx,
2312            highest_previously_built_seq,
2313            metrics,
2314            state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2315                builder, aggregator,
2316            )))),
2317        })
2318    }
2319
2320    /// Starts the CheckpointService.
2321    ///
2322    /// This function blocks until the CheckpointBuilder re-builds all
2323    /// checkpoints that had been built before the most recent restart. You
2324    /// can think of this as a WAL replay operation. Upon startup, we may
2325    /// have a number of consensus commits and resulting checkpoints that
2326    /// were built but not committed to disk. We want to reprocess the
2327    /// commits and rebuild the checkpoints before starting normal operation.
2328    pub async fn spawn(&self) -> JoinSet<()> {
2329        let mut tasks = JoinSet::new();
2330
2331        let (builder, aggregator) = self.state.lock().take_unstarted();
2332        tasks.spawn(monitored_future!(builder.run()));
2333        tasks.spawn(monitored_future!(aggregator.run()));
2334
2335        // If this times out, the validator may still start up. The worst that can
2336        // happen is that we will crash later on instead of immediately. The eventual
2337        // crash would occur because we may be missing transactions that are below the
2338        // highest_synced_checkpoint watermark, which can cause a crash in
2339        // `CheckpointExecutor::extract_randomness_rounds`.
2340        if tokio::time::timeout(
2341            Duration::from_secs(120),
2342            self.wait_for_rebuilt_checkpoints(),
2343        )
2344        .await
2345        .is_err()
2346        {
2347            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2348        }
2349
2350        tasks
2351    }
2352}
2353
2354impl CheckpointService {
2355    /// Waits until all checkpoints had been built before the node restarted
2356    /// are rebuilt. This is required to preserve the invariant that all
2357    /// checkpoints (and their transactions) below the
2358    /// highest_synced_checkpoint watermark are available. Once the
2359    /// checkpoints are constructed, we can be sure that the transactions
2360    /// have also been executed.
2361    pub async fn wait_for_rebuilt_checkpoints(&self) {
2362        let highest_previously_built_seq = self.highest_previously_built_seq;
2363        let mut rx = self.highest_currently_built_seq_tx.subscribe();
2364        loop {
2365            let highest_currently_built_seq = *rx.borrow_and_update();
2366            if highest_currently_built_seq >= highest_previously_built_seq {
2367                break;
2368            }
2369            rx.changed().await.unwrap();
2370        }
2371    }
2372
2373    #[cfg(test)]
2374    fn write_and_notify_checkpoint_for_testing(
2375        &self,
2376        epoch_store: &AuthorityPerEpochStore,
2377        checkpoint: PendingCheckpoint,
2378    ) -> IotaResult {
2379        use crate::authority::authority_per_epoch_store::ConsensusCommitOutput;
2380
2381        let mut output = ConsensusCommitOutput::new();
2382        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2383        let mut batch = epoch_store.db_batch_for_test();
2384        output.write_to_batch(epoch_store, &mut batch)?;
2385        batch.write()?;
2386        self.notify_checkpoint()?;
2387        Ok(())
2388    }
2389}
2390
2391impl CheckpointServiceNotify for CheckpointService {
2392    fn notify_checkpoint_signature(
2393        &self,
2394        epoch_store: &AuthorityPerEpochStore,
2395        info: &CheckpointSignatureMessage,
2396    ) -> IotaResult {
2397        let sequence = info.summary.sequence_number;
2398        let signer = info.summary.auth_sig().authority.concise();
2399
2400        if let Some(highest_verified_checkpoint) = self
2401            .tables
2402            .get_highest_verified_checkpoint()?
2403            .map(|x| *x.sequence_number())
2404        {
2405            if sequence <= highest_verified_checkpoint {
2406                trace!(
2407                    checkpoint_seq = sequence,
2408                    "Ignore checkpoint signature from {} - already certified", signer,
2409                );
2410                self.metrics
2411                    .last_ignored_checkpoint_signature_received
2412                    .set(sequence as i64);
2413                return Ok(());
2414            }
2415        }
2416        trace!(
2417            checkpoint_seq = sequence,
2418            "Received checkpoint signature, digest {} from {}",
2419            info.summary.digest(),
2420            signer,
2421        );
2422        self.metrics
2423            .last_received_checkpoint_signatures
2424            .with_label_values(&[&signer.to_string()])
2425            .set(sequence as i64);
2426        // While it can be tempting to make last_signature_index into AtomicU64, this
2427        // won't work We need to make sure we write to `pending_signatures` and
2428        // trigger `notify_aggregator` without race conditions
2429        let mut index = self.last_signature_index.lock();
2430        *index += 1;
2431        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2432        self.notify_aggregator.notify_one();
2433        Ok(())
2434    }
2435
2436    fn notify_checkpoint(&self) -> IotaResult {
2437        self.notify_builder.notify_one();
2438        Ok(())
2439    }
2440}
2441
2442// test helper
2443pub struct CheckpointServiceNoop {}
2444impl CheckpointServiceNotify for CheckpointServiceNoop {
2445    fn notify_checkpoint_signature(
2446        &self,
2447        _: &AuthorityPerEpochStore,
2448        _: &CheckpointSignatureMessage,
2449    ) -> IotaResult {
2450        Ok(())
2451    }
2452
2453    fn notify_checkpoint(&self) -> IotaResult {
2454        Ok(())
2455    }
2456}
2457
2458#[cfg(test)]
2459mod tests {
2460    use std::{
2461        collections::{BTreeMap, HashMap},
2462        ops::Deref,
2463    };
2464
2465    use futures::{FutureExt as _, future::BoxFuture};
2466    use iota_macros::sim_test;
2467    use iota_protocol_config::{Chain, ProtocolConfig};
2468    use iota_types::{
2469        base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2470        crypto::Signature,
2471        digests::TransactionEventsDigest,
2472        effects::{TransactionEffects, TransactionEvents},
2473        messages_checkpoint::SignedCheckpointSummary,
2474        move_package::MovePackage,
2475        object,
2476        transaction::{GenesisObject, VerifiedTransaction},
2477    };
2478    use tokio::sync::mpsc;
2479
2480    use super::*;
2481    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2482
2483    #[sim_test]
2484    pub async fn checkpoint_builder_test() {
2485        telemetry_subscribers::init_for_testing();
2486
2487        let mut protocol_config =
2488            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2489        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2490        let state = TestAuthorityBuilder::new()
2491            .with_protocol_config(protocol_config)
2492            .build()
2493            .await;
2494
2495        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2496        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2497            vec![GenesisObject::RawObject {
2498                data: object::Data::Package(
2499                    MovePackage::new(
2500                        ObjectID::random(),
2501                        SequenceNumber::new(),
2502                        BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2503                        100_000,
2504                        // no modules so empty type_origin_table as no types are defined in this
2505                        // package
2506                        Vec::new(),
2507                        // no modules so empty linkage_table as no dependencies of this package
2508                        // exist
2509                        BTreeMap::new(),
2510                    )
2511                    .unwrap(),
2512                ),
2513                owner: object::Owner::Immutable,
2514            }],
2515            vec![],
2516        );
2517        for i in 0..15 {
2518            state
2519                .database_for_testing()
2520                .perpetual_tables
2521                .transactions
2522                .insert(&d(i), dummy_tx.serializable_ref())
2523                .unwrap();
2524        }
2525        for i in 15..20 {
2526            state
2527                .database_for_testing()
2528                .perpetual_tables
2529                .transactions
2530                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2531                .unwrap();
2532        }
2533
2534        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2535        commit_cert_for_test(
2536            &mut store,
2537            state.clone(),
2538            d(1),
2539            vec![d(2), d(3)],
2540            GasCostSummary::new(11, 11, 12, 11, 1),
2541        );
2542        commit_cert_for_test(
2543            &mut store,
2544            state.clone(),
2545            d(2),
2546            vec![d(3), d(4)],
2547            GasCostSummary::new(21, 21, 22, 21, 1),
2548        );
2549        commit_cert_for_test(
2550            &mut store,
2551            state.clone(),
2552            d(3),
2553            vec![],
2554            GasCostSummary::new(31, 31, 32, 31, 1),
2555        );
2556        commit_cert_for_test(
2557            &mut store,
2558            state.clone(),
2559            d(4),
2560            vec![],
2561            GasCostSummary::new(41, 41, 42, 41, 1),
2562        );
2563        for i in [5, 6, 7, 10, 11, 12, 13] {
2564            commit_cert_for_test(
2565                &mut store,
2566                state.clone(),
2567                d(i),
2568                vec![],
2569                GasCostSummary::new(41, 41, 42, 41, 1),
2570            );
2571        }
2572        for i in [15, 16, 17] {
2573            commit_cert_for_test(
2574                &mut store,
2575                state.clone(),
2576                d(i),
2577                vec![],
2578                GasCostSummary::new(51, 51, 52, 51, 1),
2579            );
2580        }
2581        let all_digests: Vec<_> = store.keys().copied().collect();
2582        for digest in all_digests {
2583            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2584            state
2585                .epoch_store_for_testing()
2586                .test_insert_user_signature(digest, vec![signature]);
2587        }
2588
2589        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2590        let (certified_output, mut certified_result) =
2591            mpsc::channel::<CertifiedCheckpointSummary>(10);
2592        let store = Arc::new(store);
2593
2594        let ckpt_dir = tempfile::tempdir().unwrap();
2595        let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2596        let epoch_store = state.epoch_store_for_testing();
2597
2598        let accumulator = Arc::new(StateAccumulator::new_for_tests(
2599            state.get_accumulator_store().clone(),
2600        ));
2601
2602        let checkpoint_service = CheckpointService::build(
2603            state.clone(),
2604            checkpoint_store,
2605            epoch_store.clone(),
2606            store,
2607            Arc::downgrade(&accumulator),
2608            Box::new(output),
2609            Box::new(certified_output),
2610            CheckpointMetrics::new_for_tests(),
2611            3,
2612            100_000,
2613        );
2614        let _tasks = checkpoint_service.spawn().await;
2615
2616        checkpoint_service
2617            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2618            .unwrap();
2619        checkpoint_service
2620            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2621            .unwrap();
2622        checkpoint_service
2623            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2624            .unwrap();
2625        checkpoint_service
2626            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2627            .unwrap();
2628        checkpoint_service
2629            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2630            .unwrap();
2631        checkpoint_service
2632            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2633            .unwrap();
2634
2635        let (c1c, c1s) = result.recv().await.unwrap();
2636        let (c2c, c2s) = result.recv().await.unwrap();
2637
2638        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2639        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2640        assert_eq!(c1t, vec![d(4)]);
2641        assert_eq!(c1s.previous_digest, None);
2642        assert_eq!(c1s.sequence_number, 0);
2643        assert_eq!(
2644            c1s.epoch_rolling_gas_cost_summary,
2645            GasCostSummary::new(41, 41, 42, 41, 1)
2646        );
2647
2648        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2649        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2650        assert_eq!(c2s.sequence_number, 1);
2651        assert_eq!(
2652            c2s.epoch_rolling_gas_cost_summary,
2653            GasCostSummary::new(104, 104, 108, 104, 4)
2654        );
2655
2656        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2657        // Verify that we split into 2 checkpoints.
2658        let (c3c, c3s) = result.recv().await.unwrap();
2659        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2660        let (c4c, c4s) = result.recv().await.unwrap();
2661        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2662        assert_eq!(c3s.sequence_number, 2);
2663        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2664        assert_eq!(c4s.sequence_number, 3);
2665        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2666        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2667        assert_eq!(c4t, vec![d(13)]);
2668
2669        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2670        // max. Verify that we split into 2 checkpoints.
2671        let (c5c, c5s) = result.recv().await.unwrap();
2672        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2673        let (c6c, c6s) = result.recv().await.unwrap();
2674        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2675        assert_eq!(c5s.sequence_number, 4);
2676        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2677        assert_eq!(c6s.sequence_number, 5);
2678        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2679        assert_eq!(c5t, vec![d(15), d(16)]);
2680        assert_eq!(c6t, vec![d(17)]);
2681
2682        // Pending at index 4 was too soon after the prior one and should be coalesced
2683        // into the next one.
2684        let (c7c, c7s) = result.recv().await.unwrap();
2685        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2686        assert_eq!(c7t, vec![d(5), d(6)]);
2687        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2688        assert_eq!(c7s.sequence_number, 6);
2689
2690        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2691        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2692
2693        checkpoint_service
2694            .notify_checkpoint_signature(
2695                &epoch_store,
2696                &CheckpointSignatureMessage { summary: c2ss },
2697            )
2698            .unwrap();
2699        checkpoint_service
2700            .notify_checkpoint_signature(
2701                &epoch_store,
2702                &CheckpointSignatureMessage { summary: c1ss },
2703            )
2704            .unwrap();
2705
2706        let c1sc = certified_result.recv().await.unwrap();
2707        let c2sc = certified_result.recv().await.unwrap();
2708        assert_eq!(c1sc.sequence_number, 0);
2709        assert_eq!(c2sc.sequence_number, 1);
2710    }
2711
2712    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2713        fn notify_read_executed_effects(
2714            &self,
2715            digests: &[TransactionDigest],
2716        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2717            std::future::ready(Ok(digests
2718                .iter()
2719                .map(|d| self.get(d).expect("effects not found").clone())
2720                .collect()))
2721            .boxed()
2722        }
2723
2724        fn notify_read_executed_effects_digests(
2725            &self,
2726            digests: &[TransactionDigest],
2727        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2728            std::future::ready(Ok(digests
2729                .iter()
2730                .map(|d| {
2731                    self.get(d)
2732                        .map(|fx| fx.digest())
2733                        .expect("effects not found")
2734                })
2735                .collect()))
2736            .boxed()
2737        }
2738
2739        fn multi_get_executed_effects(
2740            &self,
2741            digests: &[TransactionDigest],
2742        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2743            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2744        }
2745
2746        // Unimplemented methods - its unfortunate to have this big blob of useless
2747        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2748        // for these tests, as it caused a ton of complication in non-test code.
2749        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2750        // implementors).
2751
2752        fn multi_get_transaction_blocks(
2753            &self,
2754            _: &[TransactionDigest],
2755        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2756            unimplemented!()
2757        }
2758
2759        fn multi_get_executed_effects_digests(
2760            &self,
2761            _: &[TransactionDigest],
2762        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2763            unimplemented!()
2764        }
2765
2766        fn multi_get_effects(
2767            &self,
2768            _: &[TransactionEffectsDigest],
2769        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2770            unimplemented!()
2771        }
2772
2773        fn multi_get_events(
2774            &self,
2775            _: &[TransactionEventsDigest],
2776        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2777            unimplemented!()
2778        }
2779    }
2780
2781    #[async_trait::async_trait]
2782    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2783        async fn checkpoint_created(
2784            &self,
2785            summary: &CheckpointSummary,
2786            contents: &CheckpointContents,
2787            _epoch_store: &Arc<AuthorityPerEpochStore>,
2788            _checkpoint_store: &Arc<CheckpointStore>,
2789        ) -> IotaResult {
2790            self.try_send((contents.clone(), summary.clone())).unwrap();
2791            Ok(())
2792        }
2793    }
2794
2795    #[async_trait::async_trait]
2796    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2797        async fn certified_checkpoint_created(
2798            &self,
2799            summary: &CertifiedCheckpointSummary,
2800        ) -> IotaResult {
2801            self.try_send(summary.clone()).unwrap();
2802            Ok(())
2803        }
2804    }
2805
2806    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2807        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2808            roots: t
2809                .into_iter()
2810                .map(|t| TransactionKey::Digest(d(t)))
2811                .collect(),
2812            details: PendingCheckpointInfo {
2813                timestamp_ms,
2814                last_of_epoch: false,
2815                checkpoint_height: i,
2816            },
2817        })
2818    }
2819
2820    fn d(i: u8) -> TransactionDigest {
2821        let mut bytes: [u8; 32] = Default::default();
2822        bytes[0] = i;
2823        TransactionDigest::new(bytes)
2824    }
2825
2826    fn e(
2827        transaction_digest: TransactionDigest,
2828        dependencies: Vec<TransactionDigest>,
2829        gas_used: GasCostSummary,
2830    ) -> TransactionEffects {
2831        let mut effects = TransactionEffects::default();
2832        *effects.transaction_digest_mut_for_testing() = transaction_digest;
2833        *effects.dependencies_mut_for_testing() = dependencies;
2834        *effects.gas_cost_summary_mut_for_testing() = gas_used;
2835        effects
2836    }
2837
2838    fn commit_cert_for_test(
2839        store: &mut HashMap<TransactionDigest, TransactionEffects>,
2840        state: Arc<AuthorityState>,
2841        digest: TransactionDigest,
2842        dependencies: Vec<TransactionDigest>,
2843        gas_used: GasCostSummary,
2844    ) {
2845        let epoch_store = state.epoch_store_for_testing();
2846        let effects = e(digest, dependencies, gas_used);
2847        store.insert(digest, effects.clone());
2848        epoch_store
2849            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2850            .expect("Inserting cert fx and sigs should not fail");
2851    }
2852}