iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::Duration,
17};
18
19use chrono::Utc;
20use diffy::create_patch;
21use iota_common::{debug_fatal, fatal};
22use iota_macros::fail_point;
23use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
24use iota_network::default_iota_network_config;
25use iota_protocol_config::ProtocolVersion;
26use iota_types::{
27    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
28    committee::StakeUnit,
29    crypto::AuthorityStrongQuorumSignInfo,
30    digests::{CheckpointContentsDigest, CheckpointDigest},
31    effects::{TransactionEffects, TransactionEffectsAPI},
32    error::{IotaError, IotaResult},
33    event::SystemEpochInfoEvent,
34    executable_transaction::VerifiedExecutableTransaction,
35    gas::GasCostSummary,
36    iota_system_state::{
37        IotaSystemState, IotaSystemStateTrait,
38        epoch_start_iota_system_state::EpochStartSystemStateTrait,
39    },
40    message_envelope::Message,
41    messages_checkpoint::{
42        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
43        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
44        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
45        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
46        VerifiedCheckpointContents,
47    },
48    messages_consensus::ConsensusTransactionKey,
49    signature::GenericSignature,
50    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
51};
52use itertools::Itertools;
53use nonempty::NonEmpty;
54use parking_lot::Mutex;
55use rand::{rngs::OsRng, seq::SliceRandom};
56use serde::{Deserialize, Serialize};
57use tokio::{
58    sync::{Notify, watch},
59    task::JoinSet,
60    time::timeout,
61};
62use tracing::{debug, error, info, instrument, warn};
63use typed_store::{
64    DBMapUtils, Map, TypedStoreError,
65    rocks::{DBMap, MetricConf},
66    traits::{TableSummary, TypedStoreDebug},
67};
68
69pub use crate::checkpoints::{
70    checkpoint_output::{
71        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
72    },
73    metrics::CheckpointMetrics,
74};
75use crate::{
76    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
77    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
78    checkpoints::{
79        causal_order::CausalOrder,
80        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
81    },
82    consensus_handler::SequencedConsensusTransactionKey,
83    execution_cache::TransactionCacheRead,
84    stake_aggregator::{InsertResult, MultiStakeAggregator},
85    state_accumulator::StateAccumulator,
86};
87
88pub type CheckpointHeight = u64;
89
90pub struct EpochStats {
91    pub checkpoint_count: u64,
92    pub transaction_count: u64,
93    pub total_gas_reward: u64,
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub struct PendingCheckpointInfo {
98    pub timestamp_ms: CheckpointTimestamp,
99    pub last_of_epoch: bool,
100    pub checkpoint_height: CheckpointHeight,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum PendingCheckpoint {
105    // This is an enum for future upgradability, though at the moment there is only one variant.
106    V1(PendingCheckpointContentsV1),
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct PendingCheckpointContentsV1 {
111    pub roots: Vec<TransactionKey>,
112    pub details: PendingCheckpointInfo,
113}
114
115impl PendingCheckpoint {
116    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
117        match self {
118            PendingCheckpoint::V1(contents) => contents,
119        }
120    }
121
122    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
123        match self {
124            PendingCheckpoint::V1(contents) => contents,
125        }
126    }
127
128    pub fn roots(&self) -> &Vec<TransactionKey> {
129        &self.as_v1().roots
130    }
131
132    pub fn details(&self) -> &PendingCheckpointInfo {
133        &self.as_v1().details
134    }
135
136    pub fn height(&self) -> CheckpointHeight {
137        self.details().checkpoint_height
138    }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
142pub struct BuilderCheckpointSummary {
143    pub summary: CheckpointSummary,
144    // Height at which this checkpoint summary was built. None for genesis checkpoint
145    pub checkpoint_height: Option<CheckpointHeight>,
146    pub position_in_commit: usize,
147}
148
149#[derive(DBMapUtils)]
150pub struct CheckpointStore {
151    /// Maps checkpoint contents digest to checkpoint contents
152    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
153
154    /// Maps checkpoint contents digest to checkpoint sequence number
155    pub(crate) checkpoint_sequence_by_contents_digest:
156        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
157
158    /// Stores entire checkpoint contents from state sync, indexed by sequence
159    /// number, for efficient reads of full checkpoints. Entries from this
160    /// table are deleted after state accumulation has completed.
161    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
162
163    /// Stores certified checkpoints
164    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
165    /// Map from checkpoint digest to certified checkpoint
166    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
167
168    /// Store locally computed checkpoint summaries so that we can detect forks
169    /// and log useful information. Can be pruned as soon as we verify that
170    /// we are in agreement with the latest certified checkpoint.
171    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
172
173    /// A map from epoch ID to the sequence number of the last checkpoint in
174    /// that epoch.
175    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
176
177    /// Watermarks used to determine the highest verified, fully synced, and
178    /// fully executed checkpoints
179    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
180}
181
182impl CheckpointStore {
183    pub fn new(path: &Path) -> Arc<Self> {
184        Arc::new(Self::open_tables_read_write(
185            path.to_path_buf(),
186            MetricConf::new("checkpoint"),
187            None,
188            None,
189        ))
190    }
191
192    pub fn open_readonly(path: &Path) -> CheckpointStoreReadOnly {
193        Self::get_read_only_handle(
194            path.to_path_buf(),
195            None,
196            None,
197            MetricConf::new("checkpoint_readonly"),
198        )
199    }
200
201    #[instrument(level = "info", skip_all)]
202    pub fn insert_genesis_checkpoint(
203        &self,
204        checkpoint: VerifiedCheckpoint,
205        contents: CheckpointContents,
206        epoch_store: &AuthorityPerEpochStore,
207    ) {
208        assert_eq!(
209            checkpoint.epoch(),
210            0,
211            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
212        );
213        assert_eq!(
214            *checkpoint.sequence_number(),
215            0,
216            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
217        );
218
219        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
220        // already
221        if self
222            .get_checkpoint_by_digest(checkpoint.digest())
223            .unwrap()
224            .is_none()
225        {
226            if epoch_store.epoch() == checkpoint.epoch {
227                epoch_store
228                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
229                    .unwrap();
230            } else {
231                debug!(
232                    validator_epoch =% epoch_store.epoch(),
233                    genesis_epoch =% checkpoint.epoch(),
234                    "Not inserting checkpoint builder data for genesis checkpoint",
235                );
236            }
237            self.insert_checkpoint_contents(contents).unwrap();
238            self.insert_verified_checkpoint(&checkpoint).unwrap();
239            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
240        }
241    }
242
243    pub fn get_checkpoint_by_digest(
244        &self,
245        digest: &CheckpointDigest,
246    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
247        self.checkpoint_by_digest
248            .get(digest)
249            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
250    }
251
252    pub fn get_checkpoint_by_sequence_number(
253        &self,
254        sequence_number: CheckpointSequenceNumber,
255    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
256        self.certified_checkpoints
257            .get(&sequence_number)
258            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
259    }
260
261    pub fn get_locally_computed_checkpoint(
262        &self,
263        sequence_number: CheckpointSequenceNumber,
264    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
265        self.locally_computed_checkpoints.get(&sequence_number)
266    }
267
268    pub fn get_sequence_number_by_contents_digest(
269        &self,
270        digest: &CheckpointContentsDigest,
271    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
272        self.checkpoint_sequence_by_contents_digest.get(digest)
273    }
274
275    pub fn delete_contents_digest_sequence_number_mapping(
276        &self,
277        digest: &CheckpointContentsDigest,
278    ) -> Result<(), TypedStoreError> {
279        self.checkpoint_sequence_by_contents_digest.remove(digest)
280    }
281
282    pub fn get_latest_certified_checkpoint(&self) -> Option<VerifiedCheckpoint> {
283        self.certified_checkpoints
284            .unbounded_iter()
285            .skip_to_last()
286            .next()
287            .map(|(_, v)| v.into())
288    }
289
290    pub fn get_latest_locally_computed_checkpoint(&self) -> Option<CheckpointSummary> {
291        self.locally_computed_checkpoints
292            .unbounded_iter()
293            .skip_to_last()
294            .next()
295            .map(|(_, v)| v)
296    }
297
298    pub fn multi_get_checkpoint_by_sequence_number(
299        &self,
300        sequence_numbers: &[CheckpointSequenceNumber],
301    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
302        let checkpoints = self
303            .certified_checkpoints
304            .multi_get(sequence_numbers)?
305            .into_iter()
306            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
307            .collect();
308
309        Ok(checkpoints)
310    }
311
312    pub fn multi_get_checkpoint_content(
313        &self,
314        contents_digest: &[CheckpointContentsDigest],
315    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
316        self.checkpoint_content.multi_get(contents_digest)
317    }
318
319    pub fn get_highest_verified_checkpoint(
320        &self,
321    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
322        let highest_verified = if let Some(highest_verified) =
323            self.watermarks.get(&CheckpointWatermark::HighestVerified)?
324        {
325            highest_verified
326        } else {
327            return Ok(None);
328        };
329        self.get_checkpoint_by_digest(&highest_verified.1)
330    }
331
332    pub fn get_highest_synced_checkpoint(
333        &self,
334    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
335        let highest_synced = if let Some(highest_synced) =
336            self.watermarks.get(&CheckpointWatermark::HighestSynced)?
337        {
338            highest_synced
339        } else {
340            return Ok(None);
341        };
342        self.get_checkpoint_by_digest(&highest_synced.1)
343    }
344
345    pub fn get_highest_executed_checkpoint_seq_number(
346        &self,
347    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
348        if let Some(highest_executed) =
349            self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
350        {
351            Ok(Some(highest_executed.0))
352        } else {
353            Ok(None)
354        }
355    }
356
357    pub fn get_highest_executed_checkpoint(
358        &self,
359    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
360        let highest_executed = if let Some(highest_executed) =
361            self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
362        {
363            highest_executed
364        } else {
365            return Ok(None);
366        };
367        self.get_checkpoint_by_digest(&highest_executed.1)
368    }
369
370    pub fn get_highest_pruned_checkpoint_seq_number(
371        &self,
372    ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
373        Ok(self
374            .watermarks
375            .get(&CheckpointWatermark::HighestPruned)?
376            .unwrap_or_default()
377            .0)
378    }
379
380    pub fn get_checkpoint_contents(
381        &self,
382        digest: &CheckpointContentsDigest,
383    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
384        self.checkpoint_content.get(digest)
385    }
386
387    pub fn get_full_checkpoint_contents_by_sequence_number(
388        &self,
389        seq: CheckpointSequenceNumber,
390    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
391        self.full_checkpoint_content.get(&seq)
392    }
393
394    fn prune_local_summaries(&self) -> IotaResult {
395        if let Some((last_local_summary, _)) = self
396            .locally_computed_checkpoints
397            .unbounded_iter()
398            .skip_to_last()
399            .next()
400        {
401            let mut batch = self.locally_computed_checkpoints.batch();
402            batch.schedule_delete_range(
403                &self.locally_computed_checkpoints,
404                &0,
405                &last_local_summary,
406            )?;
407            batch.write()?;
408            info!("Pruned local summaries up to {:?}", last_local_summary);
409        }
410        Ok(())
411    }
412
413    fn check_for_checkpoint_fork(
414        &self,
415        local_checkpoint: &CheckpointSummary,
416        verified_checkpoint: &VerifiedCheckpoint,
417    ) {
418        if local_checkpoint != verified_checkpoint.data() {
419            let verified_contents = self
420                .get_checkpoint_contents(&verified_checkpoint.content_digest)
421                .map(|opt_contents| {
422                    opt_contents
423                        .map(|contents| format!("{:?}", contents))
424                        .unwrap_or_else(|| {
425                            format!(
426                                "Verified checkpoint contents not found, digest: {:?}",
427                                verified_checkpoint.content_digest,
428                            )
429                        })
430                })
431                .map_err(|e| {
432                    format!(
433                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
434                        verified_checkpoint.content_digest, e
435                    )
436                })
437                .unwrap_or_else(|err_msg| err_msg);
438
439            let local_contents = self
440                .get_checkpoint_contents(&local_checkpoint.content_digest)
441                .map(|opt_contents| {
442                    opt_contents
443                        .map(|contents| format!("{:?}", contents))
444                        .unwrap_or_else(|| {
445                            format!(
446                                "Local checkpoint contents not found, digest: {:?}",
447                                local_checkpoint.content_digest
448                            )
449                        })
450                })
451                .map_err(|e| {
452                    format!(
453                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
454                        local_checkpoint.content_digest, e
455                    )
456                })
457                .unwrap_or_else(|err_msg| err_msg);
458
459            // checkpoint contents may be too large for panic message.
460            error!(
461                verified_checkpoint = ?verified_checkpoint.data(),
462                ?verified_contents,
463                ?local_checkpoint,
464                ?local_contents,
465                "Local checkpoint fork detected!",
466            );
467            fatal!(
468                "Local checkpoint fork detected for sequence number: {}",
469                local_checkpoint.sequence_number()
470            );
471        }
472    }
473
474    // Called by consensus (ConsensusAggregator).
475    // Different from `insert_verified_checkpoint`, it does not touch
476    // the highest_verified_checkpoint watermark such that state sync
477    // will have a chance to process this checkpoint and perform some
478    // state-sync only things.
479    pub fn insert_certified_checkpoint(
480        &self,
481        checkpoint: &VerifiedCheckpoint,
482    ) -> Result<(), TypedStoreError> {
483        debug!(
484            checkpoint_seq = checkpoint.sequence_number(),
485            "Inserting certified checkpoint",
486        );
487        let mut batch = self.certified_checkpoints.batch();
488        batch
489            .insert_batch(
490                &self.certified_checkpoints,
491                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
492            )?
493            .insert_batch(
494                &self.checkpoint_by_digest,
495                [(checkpoint.digest(), checkpoint.serializable_ref())],
496            )?;
497        if checkpoint.next_epoch_committee().is_some() {
498            batch.insert_batch(
499                &self.epoch_last_checkpoint_map,
500                [(&checkpoint.epoch(), checkpoint.sequence_number())],
501            )?;
502        }
503        batch.write()?;
504
505        if let Some(local_checkpoint) = self
506            .locally_computed_checkpoints
507            .get(checkpoint.sequence_number())?
508        {
509            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
510        }
511
512        Ok(())
513    }
514
515    // Called by state sync, apart from inserting the checkpoint and updating
516    // related tables, it also bumps the highest_verified_checkpoint watermark.
517    #[instrument(level = "debug", skip_all)]
518    pub fn insert_verified_checkpoint(
519        &self,
520        checkpoint: &VerifiedCheckpoint,
521    ) -> Result<(), TypedStoreError> {
522        self.insert_certified_checkpoint(checkpoint)?;
523        self.update_highest_verified_checkpoint(checkpoint)
524    }
525
526    pub fn update_highest_verified_checkpoint(
527        &self,
528        checkpoint: &VerifiedCheckpoint,
529    ) -> Result<(), TypedStoreError> {
530        if Some(*checkpoint.sequence_number())
531            > self
532                .get_highest_verified_checkpoint()?
533                .map(|x| *x.sequence_number())
534        {
535            debug!(
536                checkpoint_seq = checkpoint.sequence_number(),
537                "Updating highest verified checkpoint",
538            );
539            self.watermarks.insert(
540                &CheckpointWatermark::HighestVerified,
541                &(*checkpoint.sequence_number(), *checkpoint.digest()),
542            )?;
543        }
544
545        Ok(())
546    }
547
548    pub fn update_highest_synced_checkpoint(
549        &self,
550        checkpoint: &VerifiedCheckpoint,
551    ) -> Result<(), TypedStoreError> {
552        debug!(
553            checkpoint_seq = checkpoint.sequence_number(),
554            "Updating highest synced checkpoint",
555        );
556        self.watermarks.insert(
557            &CheckpointWatermark::HighestSynced,
558            &(*checkpoint.sequence_number(), *checkpoint.digest()),
559        )
560    }
561
562    pub fn update_highest_executed_checkpoint(
563        &self,
564        checkpoint: &VerifiedCheckpoint,
565    ) -> Result<(), TypedStoreError> {
566        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
567            if seq_number >= *checkpoint.sequence_number() {
568                return Ok(());
569            }
570            assert_eq!(
571                seq_number + 1,
572                *checkpoint.sequence_number(),
573                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
574                checkpoint.sequence_number(),
575                seq_number
576            );
577        }
578        debug!(
579            checkpoint_seq = checkpoint.sequence_number(),
580            "Updating highest executed checkpoint",
581        );
582        self.watermarks.insert(
583            &CheckpointWatermark::HighestExecuted,
584            &(*checkpoint.sequence_number(), *checkpoint.digest()),
585        )
586    }
587
588    pub fn update_highest_pruned_checkpoint(
589        &self,
590        checkpoint: &VerifiedCheckpoint,
591    ) -> Result<(), TypedStoreError> {
592        self.watermarks.insert(
593            &CheckpointWatermark::HighestPruned,
594            &(*checkpoint.sequence_number(), *checkpoint.digest()),
595        )
596    }
597
598    /// Sets highest executed checkpoint to any value.
599    ///
600    /// WARNING: This method is very subtle and can corrupt the database if used
601    /// incorrectly. It should only be used in one-off cases or tests after
602    /// fully understanding the risk.
603    pub fn set_highest_executed_checkpoint_subtle(
604        &self,
605        checkpoint: &VerifiedCheckpoint,
606    ) -> Result<(), TypedStoreError> {
607        self.watermarks.insert(
608            &CheckpointWatermark::HighestExecuted,
609            &(*checkpoint.sequence_number(), *checkpoint.digest()),
610        )
611    }
612
613    pub fn insert_checkpoint_contents(
614        &self,
615        contents: CheckpointContents,
616    ) -> Result<(), TypedStoreError> {
617        debug!(
618            checkpoint_seq = ?contents.digest(),
619            "Inserting checkpoint contents",
620        );
621        self.checkpoint_content.insert(contents.digest(), &contents)
622    }
623
624    pub fn insert_verified_checkpoint_contents(
625        &self,
626        checkpoint: &VerifiedCheckpoint,
627        full_contents: VerifiedCheckpointContents,
628    ) -> Result<(), TypedStoreError> {
629        let mut batch = self.full_checkpoint_content.batch();
630        batch.insert_batch(
631            &self.checkpoint_sequence_by_contents_digest,
632            [(&checkpoint.content_digest, checkpoint.sequence_number())],
633        )?;
634        let full_contents = full_contents.into_inner();
635        batch.insert_batch(
636            &self.full_checkpoint_content,
637            [(checkpoint.sequence_number(), &full_contents)],
638        )?;
639
640        let contents = full_contents.into_checkpoint_contents();
641        assert_eq!(&checkpoint.content_digest, contents.digest());
642
643        batch.insert_batch(&self.checkpoint_content, [(contents.digest(), &contents)])?;
644
645        batch.write()
646    }
647
648    pub fn delete_full_checkpoint_contents(
649        &self,
650        seq: CheckpointSequenceNumber,
651    ) -> Result<(), TypedStoreError> {
652        self.full_checkpoint_content.remove(&seq)
653    }
654
655    pub fn get_epoch_last_checkpoint(
656        &self,
657        epoch_id: EpochId,
658    ) -> IotaResult<Option<VerifiedCheckpoint>> {
659        let seq = self.epoch_last_checkpoint_map.get(&epoch_id)?;
660        let checkpoint = match seq {
661            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
662            None => None,
663        };
664        Ok(checkpoint)
665    }
666
667    pub fn insert_epoch_last_checkpoint(
668        &self,
669        epoch_id: EpochId,
670        checkpoint: &VerifiedCheckpoint,
671    ) -> IotaResult {
672        self.epoch_last_checkpoint_map
673            .insert(&epoch_id, checkpoint.sequence_number())?;
674        Ok(())
675    }
676
677    pub fn get_epoch_state_commitments(
678        &self,
679        epoch: EpochId,
680    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
681        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
682            checkpoint
683                .end_of_epoch_data
684                .as_ref()
685                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
686                .epoch_commitments
687                .clone()
688        });
689        Ok(commitments)
690    }
691
692    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
693    /// statistics of the epoch.
694    pub fn get_epoch_stats(
695        &self,
696        epoch: EpochId,
697        last_checkpoint: &CheckpointSummary,
698    ) -> Option<EpochStats> {
699        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
700            (0, 0)
701        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
702            (
703                checkpoint.sequence_number + 1,
704                checkpoint.network_total_transactions,
705            )
706        } else {
707            return None;
708        };
709        Some(EpochStats {
710            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
711            transaction_count: last_checkpoint.network_total_transactions
712                - prev_epoch_network_transactions,
713            total_gas_reward: last_checkpoint
714                .epoch_rolling_gas_cost_summary
715                .computation_cost,
716        })
717    }
718
719    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
720        // This checkpoints the entire db and not one column family
721        self.checkpoint_content
722            .checkpoint_db(path)
723            .map_err(Into::into)
724    }
725
726    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
727        let mut wb = self.watermarks.batch();
728        wb.delete_batch(
729            &self.watermarks,
730            std::iter::once(CheckpointWatermark::HighestExecuted),
731        )?;
732        wb.write()?;
733        Ok(())
734    }
735
736    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
737        self.delete_highest_executed_checkpoint_test_only()?;
738        self.watermarks.rocksdb.flush()?;
739        Ok(())
740    }
741
742    /// Re-executes all transactions from all local, uncertified checkpoints for
743    /// crash recovery. All transactions thus re-executed are guaranteed to
744    /// not have any missing dependencies, because we start from the highest
745    /// executed checkpoint, and proceed through checkpoints in order.
746    #[instrument(level = "debug", skip_all)]
747    pub async fn reexecute_local_checkpoints(
748        &self,
749        state: &AuthorityState,
750        epoch_store: &AuthorityPerEpochStore,
751    ) {
752        info!("rexecuting locally computed checkpoints for crash recovery");
753        let epoch = epoch_store.epoch();
754        let highest_executed = self
755            .get_highest_executed_checkpoint_seq_number()
756            .expect("get_highest_executed_checkpoint_seq_number should not fail")
757            .unwrap_or(0);
758
759        let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
760            info!("no locally built checkpoints to verify");
761            return;
762        };
763
764        for seq in highest_executed + 1..=*highest_built.sequence_number() {
765            info!(?seq, "Re-executing locally computed checkpoint");
766            let Some(checkpoint) = self
767                .get_locally_computed_checkpoint(seq)
768                .expect("get_locally_computed_checkpoint should not fail")
769            else {
770                panic!("locally computed checkpoint {:?} not found", seq);
771            };
772
773            let Some(contents) = self
774                .get_checkpoint_contents(&checkpoint.content_digest)
775                .expect("get_checkpoint_contents should not fail")
776            else {
777                panic!(
778                    "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
779                    seq, checkpoint.content_digest
780                );
781            };
782
783            let cache = state.get_transaction_cache_reader();
784
785            let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
786            let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
787            let txns = cache
788                .multi_get_transaction_blocks(&tx_digests)
789                .expect("multi_get_transaction_blocks should not fail");
790            for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
791                if tx.is_none() {
792                    panic!("transaction {:?} not found", digest);
793                }
794            }
795
796            let txns: Vec<_> = txns
797                .into_iter()
798                .map(|tx| tx.unwrap())
799                .zip(fx_digests.into_iter())
800                // end of epoch transaction can only be executed by CheckpointExecutor
801                .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
802                .map(|(tx, fx)| {
803                    (
804                        VerifiedExecutableTransaction::new_from_checkpoint(
805                            (*tx).clone(),
806                            epoch,
807                            seq,
808                        ),
809                        fx,
810                    )
811                })
812                .collect();
813
814            let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
815
816            info!(
817                ?seq,
818                ?tx_digests,
819                "Re-executing transactions for locally built checkpoint"
820            );
821            // this will panic if any re-execution diverges from the previously recorded
822            // effects digest
823            state.enqueue_with_expected_effects_digest(txns, epoch_store);
824
825            // a task that logs every so often until it is cancelled
826            // This should normally finish very quickly, so seeing this log more than once
827            // or twice is likely a sign of a problem.
828            let waiting_logger = tokio::task::spawn(async move {
829                let mut interval = tokio::time::interval(Duration::from_secs(1));
830                loop {
831                    interval.tick().await;
832                    warn!(?seq, "Still waiting for re-execution to complete");
833                }
834            });
835
836            cache
837                .notify_read_executed_effects_digests(&tx_digests)
838                .await
839                .expect("notify_read_executed_effects_digests should not fail");
840
841            waiting_logger.abort();
842            waiting_logger.await.ok();
843            info!(?seq, "Re-execution completed for locally built checkpoint");
844        }
845
846        info!("Re-execution of locally built checkpoints completed");
847    }
848}
849
850#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
851pub enum CheckpointWatermark {
852    HighestVerified,
853    HighestSynced,
854    HighestExecuted,
855    HighestPruned,
856}
857
858pub struct CheckpointBuilder {
859    state: Arc<AuthorityState>,
860    tables: Arc<CheckpointStore>,
861    epoch_store: Arc<AuthorityPerEpochStore>,
862    notify: Arc<Notify>,
863    notify_aggregator: Arc<Notify>,
864    last_built: watch::Sender<CheckpointSequenceNumber>,
865    effects_store: Arc<dyn TransactionCacheRead>,
866    accumulator: Weak<StateAccumulator>,
867    output: Box<dyn CheckpointOutput>,
868    metrics: Arc<CheckpointMetrics>,
869    max_transactions_per_checkpoint: usize,
870    max_checkpoint_size_bytes: usize,
871}
872
873pub struct CheckpointAggregator {
874    tables: Arc<CheckpointStore>,
875    epoch_store: Arc<AuthorityPerEpochStore>,
876    notify: Arc<Notify>,
877    current: Option<CheckpointSignatureAggregator>,
878    output: Box<dyn CertifiedCheckpointOutput>,
879    state: Arc<AuthorityState>,
880    metrics: Arc<CheckpointMetrics>,
881}
882
883// This holds information to aggregate signatures for one checkpoint
884pub struct CheckpointSignatureAggregator {
885    next_index: u64,
886    summary: CheckpointSummary,
887    digest: CheckpointDigest,
888    /// Aggregates voting stake for each signed checkpoint proposal by authority
889    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
890    tables: Arc<CheckpointStore>,
891    state: Arc<AuthorityState>,
892    metrics: Arc<CheckpointMetrics>,
893}
894
895impl CheckpointBuilder {
896    fn new(
897        state: Arc<AuthorityState>,
898        tables: Arc<CheckpointStore>,
899        epoch_store: Arc<AuthorityPerEpochStore>,
900        notify: Arc<Notify>,
901        effects_store: Arc<dyn TransactionCacheRead>,
902        accumulator: Weak<StateAccumulator>,
903        output: Box<dyn CheckpointOutput>,
904        notify_aggregator: Arc<Notify>,
905        last_built: watch::Sender<CheckpointSequenceNumber>,
906        metrics: Arc<CheckpointMetrics>,
907        max_transactions_per_checkpoint: usize,
908        max_checkpoint_size_bytes: usize,
909    ) -> Self {
910        Self {
911            state,
912            tables,
913            epoch_store,
914            notify,
915            effects_store,
916            accumulator,
917            output,
918            notify_aggregator,
919            last_built,
920            metrics,
921            max_transactions_per_checkpoint,
922            max_checkpoint_size_bytes,
923        }
924    }
925
926    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
927    /// creation of checkpoints.
928    async fn run(mut self) {
929        info!("Starting CheckpointBuilder");
930        loop {
931            self.maybe_build_checkpoints().await;
932
933            self.notify.notified().await;
934        }
935    }
936
937    async fn maybe_build_checkpoints(&mut self) {
938        let _scope = monitored_scope("BuildCheckpoints");
939
940        // Collect info about the most recently built checkpoint.
941        let summary = self
942            .epoch_store
943            .last_built_checkpoint_builder_summary()
944            .expect("epoch should not have ended");
945        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
946        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
947
948        let min_checkpoint_interval_ms = self
949            .epoch_store
950            .protocol_config()
951            .min_checkpoint_interval_ms_as_option()
952            .unwrap_or_default();
953        let mut grouped_pending_checkpoints = Vec::new();
954        let mut checkpoints_iter = self
955            .epoch_store
956            .get_pending_checkpoints(last_height)
957            .expect("unexpected epoch store error")
958            .into_iter()
959            .peekable();
960        while let Some((height, pending)) = checkpoints_iter.next() {
961            // Group PendingCheckpoints until:
962            // - minimum interval has elapsed ...
963            let current_timestamp = pending.details().timestamp_ms;
964            let can_build = match last_timestamp {
965                    Some(last_timestamp) => {
966                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
967                    }
968                    None => true,
969                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
970                //   should be written separately) ...
971                } || checkpoints_iter
972                    .peek()
973                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
974                // - or, we have reached end of epoch.
975                    || pending.details().last_of_epoch;
976            grouped_pending_checkpoints.push(pending);
977            if !can_build {
978                debug!(
979                    checkpoint_commit_height = height,
980                    ?last_timestamp,
981                    ?current_timestamp,
982                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
983                );
984                continue;
985            }
986
987            // Min interval has elapsed, we can now coalesce and build a checkpoint.
988            last_height = Some(height);
989            last_timestamp = Some(current_timestamp);
990            debug!(
991                checkpoint_commit_height = height,
992                "Making checkpoint at commit height"
993            );
994
995            match self
996                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
997                .await
998            {
999                Ok(seq) => {
1000                    self.last_built.send_if_modified(|cur| {
1001                        if seq > *cur {
1002                            *cur = seq;
1003                            true
1004                        } else {
1005                            false
1006                        }
1007                    });
1008                }
1009                Err(e) => {
1010                    error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1011                    tokio::time::sleep(Duration::from_secs(1)).await;
1012                    self.metrics.checkpoint_errors.inc();
1013                    return;
1014                }
1015            }
1016            // Ensure that the task can be cancelled at end of epoch, even if no other await
1017            // yields execution.
1018            tokio::task::yield_now().await;
1019        }
1020        debug!(
1021            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1022            grouped_pending_checkpoints.len(),
1023        );
1024    }
1025
1026    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1027    async fn make_checkpoint(
1028        &self,
1029        pendings: Vec<PendingCheckpoint>,
1030    ) -> anyhow::Result<CheckpointSequenceNumber> {
1031        let last_details = pendings.last().unwrap().details().clone();
1032
1033        // Keeps track of the effects that are already included in the current
1034        // checkpoint. This is used when there are multiple pending checkpoints
1035        // to create a single checkpoint because in such scenarios, dependencies
1036        // of a transaction may in earlier created checkpoints, or in earlier
1037        // pending checkpoints.
1038        let mut effects_in_current_checkpoint = BTreeSet::new();
1039
1040        // Stores the transactions that should be included in the checkpoint.
1041        // Transactions will be recorded in the checkpoint in this order.
1042        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1043        for pending_checkpoint in pendings.into_iter() {
1044            let pending = pending_checkpoint.into_v1();
1045            let txn_in_checkpoint = self
1046                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1047                .await?;
1048            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1049        }
1050        let new_checkpoints = self
1051            .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1052            .await?;
1053        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1054        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1055            .await?;
1056        Ok(highest_sequence)
1057    }
1058
1059    // Given the root transactions of a pending checkpoint, resolve the transactions
1060    // should be included in the checkpoint, and return them in the order they
1061    // should be included in the checkpoint. `effects_in_current_checkpoint`
1062    // tracks the transactions that already exist in the current checkpoint.
1063    #[instrument(level = "debug", skip_all)]
1064    async fn resolve_checkpoint_transactions(
1065        &self,
1066        roots: Vec<TransactionKey>,
1067        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1068    ) -> IotaResult<Vec<TransactionEffects>> {
1069        self.metrics
1070            .checkpoint_roots_count
1071            .inc_by(roots.len() as u64);
1072
1073        let root_digests = self
1074            .epoch_store
1075            .notify_read_executed_digests(&roots)
1076            .in_monitored_scope("CheckpointNotifyDigests")
1077            .await?;
1078        let root_effects = self
1079            .effects_store
1080            .notify_read_executed_effects(&root_digests)
1081            .in_monitored_scope("CheckpointNotifyRead")
1082            .await?;
1083
1084        let _scope = monitored_scope("CheckpointBuilder");
1085
1086        let consensus_commit_prologue = {
1087            // If the roots contains consensus commit prologue transaction, we want to
1088            // extract it, and put it to the front of the checkpoint.
1089
1090            let consensus_commit_prologue = self
1091                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1092                .await?;
1093
1094            // Get the un-included dependencies of the consensus commit prologue. We should
1095            // expect no other dependencies that haven't been included in any
1096            // previous checkpoints.
1097            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1098                let unsorted_ccp = self.complete_checkpoint_effects(
1099                    vec![ccp_effects.clone()],
1100                    effects_in_current_checkpoint,
1101                )?;
1102
1103                // No other dependencies of this consensus commit prologue that haven't been
1104                // included in any previous checkpoint.
1105                assert_eq!(unsorted_ccp.len(), 1);
1106                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1107            }
1108            consensus_commit_prologue
1109        };
1110
1111        let unsorted =
1112            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1113
1114        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1115        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1116        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1117            #[cfg(debug_assertions)]
1118            {
1119                // When consensus_commit_prologue is extracted, it should not be included in the
1120                // `unsorted`.
1121                for tx in unsorted.iter() {
1122                    assert!(tx.transaction_digest() != &_ccp_digest);
1123                }
1124            }
1125            sorted.push(ccp_effects);
1126        }
1127        sorted.extend(CausalOrder::causal_sort(unsorted));
1128
1129        #[cfg(msim)]
1130        {
1131            // Check consensus commit prologue invariants in sim test.
1132            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1133        }
1134
1135        Ok(sorted)
1136    }
1137
1138    // This function is used to extract the consensus commit prologue digest and
1139    // effects from the root transactions.
1140    // The consensus commit prologue is expected to be the first transaction in the
1141    // roots.
1142    async fn extract_consensus_commit_prologue(
1143        &self,
1144        root_digests: &[TransactionDigest],
1145        root_effects: &[TransactionEffects],
1146    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1147        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1148        if root_digests.is_empty() {
1149            return Ok(None);
1150        }
1151
1152        // Reads the first transaction in the roots, and checks whether it is a
1153        // consensus commit prologue transaction.
1154        // The consensus commit prologue transaction should be the first transaction
1155        // in the roots written by the consensus handler.
1156        let first_tx = self
1157            .state
1158            .get_transaction_cache_reader()
1159            .get_transaction_block(&root_digests[0])?
1160            .expect("Transaction block must exist");
1161
1162        Ok(match first_tx.transaction_data().kind() {
1163            TransactionKind::ConsensusCommitPrologueV1(_) => {
1164                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1165                Some((*first_tx.digest(), root_effects[0].clone()))
1166            }
1167            _ => None,
1168        })
1169    }
1170
1171    /// Writes the new checkpoints to the DB storage and processes them.
1172    #[instrument(level = "debug", skip_all)]
1173    async fn write_checkpoints(
1174        &self,
1175        height: CheckpointHeight,
1176        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1177    ) -> IotaResult {
1178        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1179        let mut batch = self.tables.checkpoint_content.batch();
1180        let mut all_tx_digests =
1181            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1182
1183        for (summary, contents) in &new_checkpoints {
1184            debug!(
1185                checkpoint_commit_height = height,
1186                checkpoint_seq = summary.sequence_number,
1187                contents_digest = ?contents.digest(),
1188                "writing checkpoint",
1189            );
1190            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1191
1192            self.output
1193                .checkpoint_created(summary, contents, &self.epoch_store, &self.tables)
1194                .await?;
1195
1196            self.metrics
1197                .transactions_included_in_checkpoint
1198                .inc_by(contents.size() as u64);
1199            let sequence_number = summary.sequence_number;
1200            self.metrics
1201                .last_constructed_checkpoint
1202                .set(sequence_number as i64);
1203
1204            batch.insert_batch(
1205                &self.tables.checkpoint_content,
1206                [(contents.digest(), contents)],
1207            )?;
1208
1209            batch.insert_batch(
1210                &self.tables.locally_computed_checkpoints,
1211                [(sequence_number, summary)],
1212            )?;
1213        }
1214
1215        // Durably commit transactions (but not their outputs) to the database.
1216        // Called before writing a locally built checkpoint to the CheckpointStore, so
1217        // that the inputs of the checkpoint cannot be lost.
1218        // These transactions are guaranteed to be final unless this validator
1219        // forks (i.e. constructs a checkpoint which will never be certified). In this
1220        // case some non-final transactions could be left in the database.
1221        //
1222        // This is an intermediate solution until we delay commits to the epoch db.
1223        // After we have done that, crash recovery will be done by re-processing
1224        // consensus commits and pending_consensus_transactions, and this method
1225        // can be removed.
1226        self.state
1227            .get_cache_commit()
1228            .persist_transactions(&all_tx_digests)
1229            .await?;
1230
1231        batch.write()?;
1232
1233        for (local_checkpoint, _) in &new_checkpoints {
1234            if let Some(certified_checkpoint) = self
1235                .tables
1236                .certified_checkpoints
1237                .get(local_checkpoint.sequence_number())?
1238            {
1239                self.tables
1240                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1241            }
1242        }
1243
1244        self.notify_aggregator.notify_one();
1245        self.epoch_store
1246            .process_pending_checkpoint(height, new_checkpoints)?;
1247        Ok(())
1248    }
1249
1250    #[expect(clippy::type_complexity)]
1251    fn split_checkpoint_chunks(
1252        &self,
1253        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1254        signatures: Vec<Vec<GenericSignature>>,
1255    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1256        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1257        let mut chunks = Vec::new();
1258        let mut chunk = Vec::new();
1259        let mut chunk_size: usize = 0;
1260        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1261            .into_iter()
1262            .zip(signatures.into_iter())
1263        {
1264            // Roll over to a new chunk after either max count or max size is reached.
1265            // The size calculation here is intended to estimate the size of the
1266            // FullCheckpointContents struct. If this code is modified, that struct
1267            // should also be updated accordingly.
1268            let size = transaction_size
1269                + bcs::serialized_size(&effects)?
1270                + bcs::serialized_size(&signatures)?;
1271            if chunk.len() == self.max_transactions_per_checkpoint
1272                || (chunk_size + size) > self.max_checkpoint_size_bytes
1273            {
1274                if chunk.is_empty() {
1275                    // Always allow at least one tx in a checkpoint.
1276                    warn!(
1277                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1278                        self.max_checkpoint_size_bytes
1279                    );
1280                } else {
1281                    chunks.push(chunk);
1282                    chunk = Vec::new();
1283                    chunk_size = 0;
1284                }
1285            }
1286
1287            chunk.push((effects, signatures));
1288            chunk_size += size;
1289        }
1290
1291        if !chunk.is_empty() || chunks.is_empty() {
1292            // We intentionally create an empty checkpoint if there is no content provided
1293            // to make a 'heartbeat' checkpoint.
1294            // Important: if some conditions are added here later, we need to make sure we
1295            // always have at least one chunk if last_pending_of_epoch is set
1296            chunks.push(chunk);
1297            // Note: empty checkpoints are ok - they shouldn't happen at all on
1298            // a network with even modest load. Even if they do
1299            // happen, it is still useful as it allows fullnodes to
1300            // distinguish between "no transactions have happened" and "i am not
1301            // receiving new checkpoints".
1302        }
1303        Ok(chunks)
1304    }
1305
1306    /// Creates checkpoints using the provided transaction effects and pending
1307    /// checkpoint information.
1308    #[instrument(level = "debug", skip_all)]
1309    async fn create_checkpoints(
1310        &self,
1311        all_effects: Vec<TransactionEffects>,
1312        details: &PendingCheckpointInfo,
1313    ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1314        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1315        let total = all_effects.len();
1316        let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
1317        if last_checkpoint.is_none() {
1318            let epoch = self.epoch_store.epoch();
1319            if epoch > 0 {
1320                let previous_epoch = epoch - 1;
1321                let last_verified = self.tables.get_epoch_last_checkpoint(previous_epoch)?;
1322                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1323                if let Some((ref seq, _)) = last_checkpoint {
1324                    debug!(
1325                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1326                    );
1327                } else {
1328                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1329                    // via panic
1330                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1331                }
1332            }
1333        }
1334        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1335        debug!(
1336            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1337            checkpoint_timestamp = details.timestamp_ms,
1338            "Creating checkpoint(s) for {} transactions",
1339            all_effects.len(),
1340        );
1341
1342        let all_digests: Vec<_> = all_effects
1343            .iter()
1344            .map(|effect| *effect.transaction_digest())
1345            .collect();
1346        let transactions_and_sizes = self
1347            .state
1348            .get_transaction_cache_reader()
1349            .get_transactions_and_serialized_sizes(&all_digests)?;
1350        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1351        let mut transactions = Vec::with_capacity(all_effects.len());
1352        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1353        let mut randomness_rounds = BTreeMap::new();
1354        {
1355            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1356            debug!(
1357                ?last_checkpoint_seq,
1358                "Waiting for {:?} certificates to appear in consensus",
1359                all_effects.len()
1360            );
1361
1362            for (effects, transaction_and_size) in all_effects
1363                .into_iter()
1364                .zip(transactions_and_sizes.into_iter())
1365            {
1366                let (transaction, size) = transaction_and_size
1367                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1368                match transaction.inner().transaction_data().kind() {
1369                    TransactionKind::ConsensusCommitPrologueV1(_)
1370                    | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1371                        // ConsensusCommitPrologue and
1372                        // AuthenticatorStateUpdateV1
1373                        // are guaranteed to be
1374                        // processed before we reach here.
1375                    }
1376                    TransactionKind::RandomnessStateUpdate(rsu) => {
1377                        randomness_rounds
1378                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1379                    }
1380                    _ => {
1381                        // All other tx should be included in the call to
1382                        // `consensus_messages_processed_notify`.
1383                        transaction_keys.push(SequencedConsensusTransactionKey::External(
1384                            ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1385                        ));
1386                    }
1387                }
1388                transactions.push(transaction);
1389                all_effects_and_transaction_sizes.push((effects, size));
1390            }
1391
1392            self.epoch_store
1393                .consensus_messages_processed_notify(transaction_keys)
1394                .await?;
1395        }
1396
1397        let signatures = self
1398            .epoch_store
1399            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1400        debug!(
1401            ?last_checkpoint_seq,
1402            "Received {} checkpoint user signatures from consensus",
1403            signatures.len()
1404        );
1405
1406        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1407        let chunks_count = chunks.len();
1408
1409        let mut checkpoints = Vec::with_capacity(chunks_count);
1410        debug!(
1411            ?last_checkpoint_seq,
1412            "Creating {} checkpoints with {} transactions", chunks_count, total,
1413        );
1414
1415        let epoch = self.epoch_store.epoch();
1416        for (index, transactions) in chunks.into_iter().enumerate() {
1417            let first_checkpoint_of_epoch = index == 0
1418                && last_checkpoint
1419                    .as_ref()
1420                    .map(|(_, c)| c.epoch != epoch)
1421                    .unwrap_or(true);
1422            if first_checkpoint_of_epoch {
1423                self.epoch_store
1424                    .record_epoch_first_checkpoint_creation_time_metric();
1425            }
1426            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1427
1428            let sequence_number = last_checkpoint
1429                .as_ref()
1430                .map(|(_, c)| c.sequence_number + 1)
1431                .unwrap_or_default();
1432            let timestamp_ms = details.timestamp_ms;
1433            if let Some((_, last_checkpoint)) = &last_checkpoint {
1434                if last_checkpoint.timestamp_ms > timestamp_ms {
1435                    error!(
1436                        "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1437                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1438                    );
1439                }
1440            }
1441
1442            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1443            let epoch_rolling_gas_cost_summary =
1444                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1445
1446            let end_of_epoch_data = if last_checkpoint_of_epoch {
1447                let (system_state_obj, system_epoch_info_event) = self
1448                    .augment_epoch_last_checkpoint(
1449                        &epoch_rolling_gas_cost_summary,
1450                        timestamp_ms,
1451                        &mut effects,
1452                        &mut signatures,
1453                        sequence_number,
1454                    )
1455                    .await?;
1456
1457                // The system epoch info event can be `None` in case if the `advance_epoch`
1458                // Move function call failed and was executed in the safe mode.
1459                // In this case, the tokens supply should be unchanged.
1460                //
1461                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1462                // to those small numbers, no overflows will occur during conversion or
1463                // subtraction.
1464                let epoch_supply_change =
1465                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1466
1467                let committee = system_state_obj
1468                    .get_current_epoch_committee()
1469                    .committee()
1470                    .clone();
1471
1472                // This must happen after the call to augment_epoch_last_checkpoint,
1473                // otherwise we will not capture the change_epoch tx.
1474                let root_state_digest = {
1475                    let state_acc = self
1476                        .accumulator
1477                        .upgrade()
1478                        .expect("No checkpoints should be getting built after local configuration");
1479                    let acc = state_acc.accumulate_checkpoint(
1480                        effects.clone(),
1481                        sequence_number,
1482                        &self.epoch_store,
1483                    )?;
1484                    state_acc
1485                        .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
1486                        .await?;
1487                    state_acc
1488                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1489                        .await?
1490                };
1491                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1492                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1493
1494                let epoch_commitments = vec![root_state_digest.into()];
1495
1496                Some(EndOfEpochData {
1497                    next_epoch_committee: committee.voting_rights,
1498                    next_epoch_protocol_version: ProtocolVersion::new(
1499                        system_state_obj.protocol_version(),
1500                    ),
1501                    epoch_commitments,
1502                    epoch_supply_change,
1503                })
1504            } else {
1505                None
1506            };
1507            let contents = CheckpointContents::new_with_digests_and_signatures(
1508                effects.iter().map(TransactionEffects::execution_digests),
1509                signatures,
1510            );
1511
1512            let num_txns = contents.size() as u64;
1513
1514            let network_total_transactions = last_checkpoint
1515                .as_ref()
1516                .map(|(_, c)| c.network_total_transactions + num_txns)
1517                .unwrap_or(num_txns);
1518
1519            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1520
1521            let matching_randomness_rounds: Vec<_> = effects
1522                .iter()
1523                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1524                .copied()
1525                .collect();
1526
1527            let summary = CheckpointSummary::new(
1528                self.epoch_store.protocol_config(),
1529                epoch,
1530                sequence_number,
1531                network_total_transactions,
1532                &contents,
1533                previous_digest,
1534                epoch_rolling_gas_cost_summary,
1535                end_of_epoch_data,
1536                timestamp_ms,
1537                matching_randomness_rounds,
1538            );
1539            summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1540            if last_checkpoint_of_epoch {
1541                info!(
1542                    checkpoint_seq = sequence_number,
1543                    "creating last checkpoint of epoch {}", epoch
1544                );
1545                if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
1546                    self.epoch_store
1547                        .report_epoch_metrics_at_last_checkpoint(stats);
1548                }
1549            }
1550            last_checkpoint = Some((sequence_number, summary.clone()));
1551            checkpoints.push((summary, contents));
1552        }
1553
1554        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1555    }
1556
1557    fn get_epoch_total_gas_cost(
1558        &self,
1559        last_checkpoint: Option<&CheckpointSummary>,
1560        cur_checkpoint_effects: &[TransactionEffects],
1561    ) -> GasCostSummary {
1562        let (previous_epoch, previous_gas_costs) = last_checkpoint
1563            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1564            .unwrap_or_default();
1565        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1566        if previous_epoch == self.epoch_store.epoch() {
1567            // sum only when we are within the same epoch
1568            GasCostSummary::new(
1569                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1570                previous_gas_costs.computation_cost_burned
1571                    + current_gas_costs.computation_cost_burned,
1572                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1573                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1574                previous_gas_costs.non_refundable_storage_fee
1575                    + current_gas_costs.non_refundable_storage_fee,
1576            )
1577        } else {
1578            current_gas_costs
1579        }
1580    }
1581
1582    /// Augments the last checkpoint of the epoch by creating and executing an
1583    /// advance epoch transaction.
1584    #[instrument(level = "error", skip_all)]
1585    async fn augment_epoch_last_checkpoint(
1586        &self,
1587        epoch_total_gas_cost: &GasCostSummary,
1588        epoch_start_timestamp_ms: CheckpointTimestamp,
1589        checkpoint_effects: &mut Vec<TransactionEffects>,
1590        signatures: &mut Vec<Vec<GenericSignature>>,
1591        checkpoint: CheckpointSequenceNumber,
1592    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1593        let (system_state, system_epoch_info_event, effects) = self
1594            .state
1595            .create_and_execute_advance_epoch_tx(
1596                &self.epoch_store,
1597                epoch_total_gas_cost,
1598                checkpoint,
1599                epoch_start_timestamp_ms,
1600            )
1601            .await?;
1602        checkpoint_effects.push(effects);
1603        signatures.push(vec![]);
1604        Ok((system_state, system_epoch_info_event))
1605    }
1606
1607    /// For the given roots return complete list of effects to include in
1608    /// checkpoint This list includes the roots and all their dependencies,
1609    /// which are not part of checkpoint already. Note that this function
1610    /// may be called multiple times to construct the checkpoint.
1611    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1612    /// that are already included in the checkpoint. Txs in `roots` that
1613    /// need to be included in the checkpoint will be added to
1614    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1615    #[instrument(level = "debug", skip_all)]
1616    fn complete_checkpoint_effects(
1617        &self,
1618        mut roots: Vec<TransactionEffects>,
1619        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1620    ) -> IotaResult<Vec<TransactionEffects>> {
1621        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1622        let mut results = vec![];
1623        let mut seen = HashSet::new();
1624        loop {
1625            let mut pending = HashSet::new();
1626
1627            let transactions_included = self
1628                .epoch_store
1629                .builder_included_transactions_in_checkpoint(
1630                    roots.iter().map(|e| e.transaction_digest()),
1631                )?;
1632
1633            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1634                let digest = effect.transaction_digest();
1635                // Unnecessary to read effects of a dependency if the effect is already
1636                // processed.
1637                seen.insert(*digest);
1638
1639                // Skip roots that are already included in the checkpoint.
1640                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1641                    continue;
1642                }
1643
1644                // Skip roots already included in checkpoints or roots from previous epochs
1645                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1646                    continue;
1647                }
1648
1649                let existing_effects = self
1650                    .epoch_store
1651                    .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1652
1653                for (dependency, effects_signature_exists) in
1654                    effect.dependencies().iter().zip(existing_effects.iter())
1655                {
1656                    // Skip here if dependency not executed in the current epoch.
1657                    // Note that the existence of an effects signature in the
1658                    // epoch store for the given digest indicates that the transaction
1659                    // was locally executed in the current epoch
1660                    if !effects_signature_exists {
1661                        continue;
1662                    }
1663                    if seen.insert(*dependency) {
1664                        pending.insert(*dependency);
1665                    }
1666                }
1667                results.push(effect);
1668            }
1669            if pending.is_empty() {
1670                break;
1671            }
1672            let pending = pending.into_iter().collect::<Vec<_>>();
1673            let effects = self.effects_store.multi_get_executed_effects(&pending)?;
1674            let effects = effects
1675                .into_iter()
1676                .zip(pending)
1677                .map(|(opt, digest)| match opt {
1678                    Some(x) => x,
1679                    None => panic!(
1680                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
1681                        digest
1682                    ),
1683                })
1684                .collect::<Vec<_>>();
1685            roots = effects;
1686        }
1687
1688        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1689        Ok(results)
1690    }
1691
1692    // This function is used to check the invariants of the consensus commit
1693    // prologue transactions in the checkpoint in simtest.
1694    #[cfg(msim)]
1695    fn expensive_consensus_commit_prologue_invariants_check(
1696        &self,
1697        root_digests: &[TransactionDigest],
1698        sorted: &[TransactionEffects],
1699    ) {
1700        // Gets all the consensus commit prologue transactions from the roots.
1701        let root_txs = self
1702            .state
1703            .get_transaction_cache_reader()
1704            .multi_get_transaction_blocks(root_digests)
1705            .unwrap();
1706        let ccps = root_txs
1707            .iter()
1708            .filter_map(|tx| {
1709                tx.as_ref().filter(|tx| {
1710                    matches!(
1711                        tx.transaction_data().kind(),
1712                        TransactionKind::ConsensusCommitPrologueV1(_)
1713                    )
1714                })
1715            })
1716            .collect::<Vec<_>>();
1717
1718        // There should be at most one consensus commit prologue transaction in the
1719        // roots.
1720        assert!(ccps.len() <= 1);
1721
1722        // Get all the transactions in the checkpoint.
1723        let txs = self
1724            .state
1725            .get_transaction_cache_reader()
1726            .multi_get_transaction_blocks(
1727                &sorted
1728                    .iter()
1729                    .map(|tx| *tx.transaction_digest())
1730                    .collect::<Vec<_>>(),
1731            )
1732            .unwrap();
1733
1734        if ccps.is_empty() {
1735            // If there is no consensus commit prologue transaction in the roots, then there
1736            // should be no consensus commit prologue transaction in the
1737            // checkpoint.
1738            for tx in txs.iter().flatten() {
1739                assert!(!matches!(
1740                    tx.transaction_data().kind(),
1741                    TransactionKind::ConsensusCommitPrologueV1(_)
1742                ));
1743            }
1744        } else {
1745            // If there is one consensus commit prologue, it must be the first one in the
1746            // checkpoint.
1747            assert!(matches!(
1748                txs[0].as_ref().unwrap().transaction_data().kind(),
1749                TransactionKind::ConsensusCommitPrologueV1(_)
1750            ));
1751
1752            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1753
1754            for tx in txs.iter().skip(1).flatten() {
1755                assert!(!matches!(
1756                    tx.transaction_data().kind(),
1757                    TransactionKind::ConsensusCommitPrologueV1(_)
1758                ));
1759            }
1760        }
1761    }
1762}
1763
1764impl CheckpointAggregator {
1765    fn new(
1766        tables: Arc<CheckpointStore>,
1767        epoch_store: Arc<AuthorityPerEpochStore>,
1768        notify: Arc<Notify>,
1769        output: Box<dyn CertifiedCheckpointOutput>,
1770        state: Arc<AuthorityState>,
1771        metrics: Arc<CheckpointMetrics>,
1772    ) -> Self {
1773        let current = None;
1774        Self {
1775            tables,
1776            epoch_store,
1777            notify,
1778            current,
1779            output,
1780            state,
1781            metrics,
1782        }
1783    }
1784
1785    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1786    /// aggregation of checkpoints.
1787    /// The function ensures continuous aggregation of checkpoints, handling
1788    /// errors and retries gracefully, and allowing for proper shutdown on
1789    /// receiving an exit signal.
1790    async fn run(mut self) {
1791        info!("Starting CheckpointAggregator");
1792        loop {
1793            if let Err(e) = self.run_and_notify().await {
1794                error!(
1795                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1796                    e
1797                );
1798                self.metrics.checkpoint_errors.inc();
1799                tokio::time::sleep(Duration::from_secs(1)).await;
1800                continue;
1801            }
1802
1803            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1804        }
1805    }
1806
1807    async fn run_and_notify(&mut self) -> IotaResult {
1808        let summaries = self.run_inner()?;
1809        for summary in summaries {
1810            self.output.certified_checkpoint_created(&summary).await?;
1811        }
1812        Ok(())
1813    }
1814
1815    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1816        let _scope = monitored_scope("CheckpointAggregator");
1817        let mut result = vec![];
1818        'outer: loop {
1819            let next_to_certify = self.next_checkpoint_to_certify();
1820            let current = if let Some(current) = &mut self.current {
1821                // It's possible that the checkpoint was already certified by
1822                // the rest of the network and we've already received the
1823                // certified checkpoint via StateSync. In this case, we reset
1824                // the current signature aggregator to the next checkpoint to
1825                // be certified
1826                if current.summary.sequence_number < next_to_certify {
1827                    self.current = None;
1828                    continue;
1829                }
1830                current
1831            } else {
1832                let Some(summary) = self
1833                    .epoch_store
1834                    .get_built_checkpoint_summary(next_to_certify)?
1835                else {
1836                    return Ok(result);
1837                };
1838                self.current = Some(CheckpointSignatureAggregator {
1839                    next_index: 0,
1840                    digest: summary.digest(),
1841                    summary,
1842                    signatures_by_digest: MultiStakeAggregator::new(
1843                        self.epoch_store.committee().clone(),
1844                    ),
1845                    tables: self.tables.clone(),
1846                    state: self.state.clone(),
1847                    metrics: self.metrics.clone(),
1848                });
1849                self.current.as_mut().unwrap()
1850            };
1851
1852            let epoch_tables = self
1853                .epoch_store
1854                .tables()
1855                .expect("should not run past end of epoch");
1856            let iter = epoch_tables.get_pending_checkpoint_signatures_iter(
1857                current.summary.sequence_number,
1858                current.next_index,
1859            )?;
1860            for ((seq, index), data) in iter {
1861                if seq != current.summary.sequence_number {
1862                    debug!(
1863                        checkpoint_seq =? current.summary.sequence_number,
1864                        "Not enough checkpoint signatures",
1865                    );
1866                    // No more signatures (yet) for this checkpoint
1867                    return Ok(result);
1868                }
1869                debug!(
1870                    checkpoint_seq = current.summary.sequence_number,
1871                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
1872                    current.summary.digest(),
1873                    data.summary.auth_sig().authority.concise()
1874                );
1875                self.metrics
1876                    .checkpoint_participation
1877                    .with_label_values(&[&format!(
1878                        "{:?}",
1879                        data.summary.auth_sig().authority.concise()
1880                    )])
1881                    .inc();
1882                if let Ok(auth_signature) = current.try_aggregate(data) {
1883                    let summary = VerifiedCheckpoint::new_unchecked(
1884                        CertifiedCheckpointSummary::new_from_data_and_sig(
1885                            current.summary.clone(),
1886                            auth_signature,
1887                        ),
1888                    );
1889
1890                    self.tables.insert_certified_checkpoint(&summary)?;
1891                    self.metrics
1892                        .last_certified_checkpoint
1893                        .set(current.summary.sequence_number as i64);
1894                    current
1895                        .summary
1896                        .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
1897                    result.push(summary.into_inner());
1898                    self.current = None;
1899                    continue 'outer;
1900                } else {
1901                    current.next_index = index + 1;
1902                }
1903            }
1904            break;
1905        }
1906        Ok(result)
1907    }
1908
1909    fn next_checkpoint_to_certify(&self) -> CheckpointSequenceNumber {
1910        self.tables
1911            .certified_checkpoints
1912            .unbounded_iter()
1913            .skip_to_last()
1914            .next()
1915            .map(|(seq, _)| seq + 1)
1916            .unwrap_or_default()
1917    }
1918}
1919
1920impl CheckpointSignatureAggregator {
1921    #[expect(clippy::result_unit_err)]
1922    pub fn try_aggregate(
1923        &mut self,
1924        data: CheckpointSignatureMessage,
1925    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
1926        let their_digest = *data.summary.digest();
1927        let (_, signature) = data.summary.into_data_and_sig();
1928        let author = signature.authority;
1929        let envelope =
1930            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
1931        match self.signatures_by_digest.insert(their_digest, envelope) {
1932            // ignore repeated signatures
1933            InsertResult::Failed {
1934                error:
1935                    IotaError::StakeAggregatorRepeatedSigner {
1936                        conflicting_sig: false,
1937                        ..
1938                    },
1939            } => Err(()),
1940            InsertResult::Failed { error } => {
1941                warn!(
1942                    checkpoint_seq = self.summary.sequence_number,
1943                    "Failed to aggregate new signature from validator {:?}: {:?}",
1944                    author.concise(),
1945                    error
1946                );
1947                self.check_for_split_brain();
1948                Err(())
1949            }
1950            InsertResult::QuorumReached(cert) => {
1951                // It is not guaranteed that signature.authority == consensus_cert.author, but
1952                // we do verify the signature so we know that the author signed
1953                // the message at some point.
1954                if their_digest != self.digest {
1955                    self.metrics.remote_checkpoint_forks.inc();
1956                    warn!(
1957                        checkpoint_seq = self.summary.sequence_number,
1958                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
1959                        author.concise(),
1960                        their_digest,
1961                        self.digest
1962                    );
1963                    return Err(());
1964                }
1965                Ok(cert)
1966            }
1967            InsertResult::NotEnoughVotes {
1968                bad_votes: _,
1969                bad_authorities: _,
1970            } => {
1971                self.check_for_split_brain();
1972                Err(())
1973            }
1974        }
1975    }
1976
1977    /// Check if there is a split brain condition in checkpoint signature
1978    /// aggregation, defined as any state wherein it is no longer possible
1979    /// to achieve quorum on a checkpoint proposal, irrespective of the
1980    /// outcome of any outstanding votes.
1981    fn check_for_split_brain(&self) {
1982        debug!(
1983            checkpoint_seq = self.summary.sequence_number,
1984            "Checking for split brain condition"
1985        );
1986        if self.signatures_by_digest.quorum_unreachable() {
1987            // TODO: at this point we should immediately halt processing
1988            // of new transaction certificates to avoid building on top of
1989            // forked output
1990            // self.halt_all_execution();
1991
1992            let digests_by_stake_messages = self
1993                .signatures_by_digest
1994                .get_all_unique_values()
1995                .into_iter()
1996                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
1997                .map(|(digest, (_authorities, total_stake))| {
1998                    format!("{:?} (total stake: {})", digest, total_stake)
1999                })
2000                .collect::<Vec<String>>();
2001            error!(
2002                checkpoint_seq = self.summary.sequence_number,
2003                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2004                self.signatures_by_digest.uncommitted_stake(),
2005                digests_by_stake_messages,
2006            );
2007            self.metrics.split_brain_checkpoint_forks.inc();
2008
2009            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2010            let local_summary = self.summary.clone();
2011            let state = self.state.clone();
2012            let tables = self.tables.clone();
2013
2014            tokio::spawn(async move {
2015                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2016            });
2017        }
2018    }
2019}
2020
2021/// Create data dump containing relevant data for diagnosing cause of the
2022/// split brain by querying one disagreeing validator for full checkpoint
2023/// contents. To minimize peer chatter, we only query one validator at random
2024/// from each disagreeing faction, as all honest validators that participated in
2025/// this round may inevitably run the same process.
2026async fn diagnose_split_brain(
2027    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2028    local_summary: CheckpointSummary,
2029    state: Arc<AuthorityState>,
2030    tables: Arc<CheckpointStore>,
2031) {
2032    debug!(
2033        checkpoint_seq = local_summary.sequence_number,
2034        "Running split brain diagnostics..."
2035    );
2036    let time = Utc::now();
2037    // collect one random disagreeing validator per differing digest
2038    let digest_to_validator = all_unique_values
2039        .iter()
2040        .filter_map(|(digest, (validators, _))| {
2041            if *digest != local_summary.digest() {
2042                let random_validator = validators.choose(&mut OsRng).unwrap();
2043                Some((*digest, *random_validator))
2044            } else {
2045                None
2046            }
2047        })
2048        .collect::<HashMap<_, _>>();
2049    if digest_to_validator.is_empty() {
2050        panic!(
2051            "Given split brain condition, there should be at \
2052                least one validator that disagrees with local signature"
2053        );
2054    }
2055
2056    let epoch_store = state.load_epoch_store_one_call_per_task();
2057    let committee = epoch_store
2058        .epoch_start_state()
2059        .get_iota_committee_with_network_metadata();
2060    let network_config = default_iota_network_config();
2061    let network_clients =
2062        make_network_authority_clients_with_network_config(&committee, &network_config);
2063
2064    // Query all disagreeing validators
2065    let response_futures = digest_to_validator
2066        .values()
2067        .cloned()
2068        .map(|validator| {
2069            let client = network_clients
2070                .get(&validator)
2071                .expect("Failed to get network client");
2072            let request = CheckpointRequest {
2073                sequence_number: Some(local_summary.sequence_number),
2074                request_content: true,
2075                certified: false,
2076            };
2077            client.handle_checkpoint(request)
2078        })
2079        .collect::<Vec<_>>();
2080
2081    let digest_name_pair = digest_to_validator.iter();
2082    let response_data = futures::future::join_all(response_futures)
2083        .await
2084        .into_iter()
2085        .zip(digest_name_pair)
2086        .filter_map(|(response, (digest, name))| match response {
2087            Ok(response) => match response {
2088                CheckpointResponse {
2089                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2090                    contents: Some(contents),
2091                } => Some((*name, *digest, summary, contents)),
2092                CheckpointResponse {
2093                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2094                    contents: _,
2095                } => {
2096                    panic!("Expected pending checkpoint, but got certified checkpoint");
2097                }
2098                CheckpointResponse {
2099                    checkpoint: None,
2100                    contents: _,
2101                } => {
2102                    error!(
2103                        "Summary for checkpoint {:?} not found on validator {:?}",
2104                        local_summary.sequence_number, name
2105                    );
2106                    None
2107                }
2108                CheckpointResponse {
2109                    checkpoint: _,
2110                    contents: None,
2111                } => {
2112                    error!(
2113                        "Contents for checkpoint {:?} not found on validator {:?}",
2114                        local_summary.sequence_number, name
2115                    );
2116                    None
2117                }
2118            },
2119            Err(e) => {
2120                error!(
2121                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2122                    e
2123                );
2124                None
2125            }
2126        })
2127        .collect::<Vec<_>>();
2128
2129    let local_checkpoint_contents = tables
2130        .get_checkpoint_contents(&local_summary.content_digest)
2131        .unwrap_or_else(|_| {
2132            panic!(
2133                "Could not find checkpoint contents for digest {:?}",
2134                local_summary.digest()
2135            )
2136        })
2137        .unwrap_or_else(|| {
2138            panic!(
2139                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2140                local_summary.sequence_number,
2141                local_summary.digest()
2142            )
2143        });
2144    let local_contents_text = format!("{local_checkpoint_contents:?}");
2145
2146    let local_summary_text = format!("{local_summary:?}");
2147    let local_validator = state.name.concise();
2148    let diff_patches = response_data
2149        .iter()
2150        .map(|(name, other_digest, other_summary, contents)| {
2151            let other_contents_text = format!("{contents:?}");
2152            let other_summary_text = format!("{other_summary:?}");
2153            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2154                .enumerate_transactions(&local_summary)
2155                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2156                .unzip();
2157            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2158                .enumerate_transactions(other_summary)
2159                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2160                .unzip();
2161            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2162            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2163            let local_transactions_text = format!("{local_transactions:#?}");
2164            let other_transactions_text = format!("{other_transactions:#?}");
2165            let transactions_patch =
2166                create_patch(&local_transactions_text, &other_transactions_text);
2167            let local_effects_text = format!("{local_effects:#?}");
2168            let other_effects_text = format!("{other_effects:#?}");
2169            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2170            let seq_number = local_summary.sequence_number;
2171            let local_digest = local_summary.digest();
2172            let other_validator = name.concise();
2173            format!(
2174                "Checkpoint: {seq_number:?}\n\
2175                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2176                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2177                Summary Diff: \n{summary_patch}\n\n\
2178                Contents Diff: \n{contents_patch}\n\n\
2179                Transactions Diff: \n{transactions_patch}\n\n\
2180                Effects Diff: \n{effects_patch}",
2181            )
2182        })
2183        .collect::<Vec<_>>()
2184        .join("\n\n\n");
2185
2186    let header = format!(
2187        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2188        Datetime: {time}",
2189    );
2190    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2191    let path = tempfile::tempdir()
2192        .expect("Failed to create tempdir")
2193        .into_path()
2194        .join(Path::new("checkpoint_fork_dump.txt"));
2195    let mut file = File::create(path).unwrap();
2196    write!(file, "{}", fork_logs_text).unwrap();
2197    debug!("{}", fork_logs_text);
2198
2199    fail_point!("split_brain_reached");
2200}
2201
2202pub trait CheckpointServiceNotify {
2203    fn notify_checkpoint_signature(
2204        &self,
2205        epoch_store: &AuthorityPerEpochStore,
2206        info: &CheckpointSignatureMessage,
2207    ) -> IotaResult;
2208
2209    fn notify_checkpoint(&self) -> IotaResult;
2210}
2211
2212enum CheckpointServiceState {
2213    Unstarted((CheckpointBuilder, CheckpointAggregator)),
2214    Started,
2215}
2216
2217impl CheckpointServiceState {
2218    fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2219        let mut state = CheckpointServiceState::Started;
2220        std::mem::swap(self, &mut state);
2221
2222        match state {
2223            CheckpointServiceState::Unstarted((builder, aggregator)) => (builder, aggregator),
2224            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2225        }
2226    }
2227}
2228
2229pub struct CheckpointService {
2230    tables: Arc<CheckpointStore>,
2231    notify_builder: Arc<Notify>,
2232    notify_aggregator: Arc<Notify>,
2233    last_signature_index: Mutex<u64>,
2234    // A notification for the current highest built sequence number.
2235    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2236    // The highest sequence number that had already been built at the time CheckpointService
2237    // was constructed
2238    highest_previously_built_seq: CheckpointSequenceNumber,
2239    metrics: Arc<CheckpointMetrics>,
2240    state: Mutex<CheckpointServiceState>,
2241}
2242
2243impl CheckpointService {
2244    /// Spawns the checkpoint service, initializing and starting the checkpoint
2245    /// builder and aggregator tasks.
2246    /// Constructs a new CheckpointService in an un-started state.
2247    pub fn build(
2248        state: Arc<AuthorityState>,
2249        checkpoint_store: Arc<CheckpointStore>,
2250        epoch_store: Arc<AuthorityPerEpochStore>,
2251        effects_store: Arc<dyn TransactionCacheRead>,
2252        accumulator: Weak<StateAccumulator>,
2253        checkpoint_output: Box<dyn CheckpointOutput>,
2254        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2255        metrics: Arc<CheckpointMetrics>,
2256        max_transactions_per_checkpoint: usize,
2257        max_checkpoint_size_bytes: usize,
2258    ) -> Arc<Self> {
2259        info!(
2260            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2261        );
2262        let notify_builder = Arc::new(Notify::new());
2263        let notify_aggregator = Arc::new(Notify::new());
2264
2265        let highest_previously_built_seq = epoch_store
2266            .last_built_checkpoint_builder_summary()
2267            .expect("epoch should not have ended")
2268            .map(|s| s.summary.sequence_number)
2269            .unwrap_or(0);
2270
2271        let (highest_currently_built_seq_tx, _) = watch::channel(highest_previously_built_seq);
2272
2273        let aggregator = CheckpointAggregator::new(
2274            checkpoint_store.clone(),
2275            epoch_store.clone(),
2276            notify_aggregator.clone(),
2277            certified_checkpoint_output,
2278            state.clone(),
2279            metrics.clone(),
2280        );
2281
2282        let builder = CheckpointBuilder::new(
2283            state.clone(),
2284            checkpoint_store.clone(),
2285            epoch_store.clone(),
2286            notify_builder.clone(),
2287            effects_store,
2288            accumulator,
2289            checkpoint_output,
2290            notify_aggregator.clone(),
2291            highest_currently_built_seq_tx.clone(),
2292            metrics.clone(),
2293            max_transactions_per_checkpoint,
2294            max_checkpoint_size_bytes,
2295        );
2296
2297        let last_signature_index = epoch_store
2298            .get_last_checkpoint_signature_index()
2299            .expect("should not cross end of epoch");
2300        let last_signature_index = Mutex::new(last_signature_index);
2301
2302        Arc::new(Self {
2303            tables: checkpoint_store,
2304            notify_builder,
2305            notify_aggregator,
2306            last_signature_index,
2307            highest_currently_built_seq_tx,
2308            highest_previously_built_seq,
2309            metrics,
2310            state: Mutex::new(CheckpointServiceState::Unstarted((builder, aggregator))),
2311        })
2312    }
2313
2314    /// Starts the CheckpointService.
2315    ///
2316    /// This function blocks until the CheckpointBuilder re-builds all
2317    /// checkpoints that had been built before the most recent restart. You
2318    /// can think of this as a WAL replay operation. Upon startup, we may
2319    /// have a number of consensus commits and resulting checkpoints that
2320    /// were built but not committed to disk. We want to reprocess the
2321    /// commits and rebuild the checkpoints before starting normal operation.
2322    pub async fn spawn(&self) -> JoinSet<()> {
2323        let mut tasks = JoinSet::new();
2324
2325        let (builder, aggregator) = self.state.lock().take_unstarted();
2326        tasks.spawn(monitored_future!(builder.run()));
2327        tasks.spawn(monitored_future!(aggregator.run()));
2328
2329        loop {
2330            if tokio::time::timeout(Duration::from_secs(10), self.wait_for_rebuilt_checkpoints())
2331                .await
2332                .is_ok()
2333            {
2334                break;
2335            } else {
2336                debug_fatal!("Still waiting for checkpoints to be rebuilt");
2337            }
2338        }
2339
2340        tasks
2341    }
2342}
2343
2344impl CheckpointService {
2345    /// Waits until all checkpoints had been built before the node restarted
2346    /// are rebuilt.
2347    pub async fn wait_for_rebuilt_checkpoints(&self) {
2348        let highest_previously_built_seq = self.highest_previously_built_seq;
2349        let mut rx = self.highest_currently_built_seq_tx.subscribe();
2350        loop {
2351            let highest_currently_built_seq = *rx.borrow_and_update();
2352            if highest_currently_built_seq >= highest_previously_built_seq {
2353                break;
2354            }
2355            rx.changed().await.unwrap();
2356        }
2357    }
2358
2359    #[cfg(test)]
2360    fn write_and_notify_checkpoint_for_testing(
2361        &self,
2362        epoch_store: &AuthorityPerEpochStore,
2363        checkpoint: PendingCheckpoint,
2364    ) -> IotaResult {
2365        use crate::authority::authority_per_epoch_store::ConsensusCommitOutput;
2366
2367        let mut output = ConsensusCommitOutput::new();
2368        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2369        let mut batch = epoch_store.db_batch_for_test();
2370        output.write_to_batch(epoch_store, &mut batch)?;
2371        batch.write()?;
2372        self.notify_checkpoint()?;
2373        Ok(())
2374    }
2375}
2376
2377impl CheckpointServiceNotify for CheckpointService {
2378    fn notify_checkpoint_signature(
2379        &self,
2380        epoch_store: &AuthorityPerEpochStore,
2381        info: &CheckpointSignatureMessage,
2382    ) -> IotaResult {
2383        let sequence = info.summary.sequence_number;
2384        let signer = info.summary.auth_sig().authority.concise();
2385
2386        if let Some(highest_verified_checkpoint) = self
2387            .tables
2388            .get_highest_verified_checkpoint()?
2389            .map(|x| *x.sequence_number())
2390        {
2391            if sequence <= highest_verified_checkpoint {
2392                debug!(
2393                    checkpoint_seq = sequence,
2394                    "Ignore checkpoint signature from {} - already certified", signer,
2395                );
2396                self.metrics
2397                    .last_ignored_checkpoint_signature_received
2398                    .set(sequence as i64);
2399                return Ok(());
2400            }
2401        }
2402        debug!(
2403            checkpoint_seq = sequence,
2404            "Received checkpoint signature, digest {} from {}",
2405            info.summary.digest(),
2406            signer,
2407        );
2408        self.metrics
2409            .last_received_checkpoint_signatures
2410            .with_label_values(&[&signer.to_string()])
2411            .set(sequence as i64);
2412        // While it can be tempting to make last_signature_index into AtomicU64, this
2413        // won't work We need to make sure we write to `pending_signatures` and
2414        // trigger `notify_aggregator` without race conditions
2415        let mut index = self.last_signature_index.lock();
2416        *index += 1;
2417        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2418        self.notify_aggregator.notify_one();
2419        Ok(())
2420    }
2421
2422    fn notify_checkpoint(&self) -> IotaResult {
2423        self.notify_builder.notify_one();
2424        Ok(())
2425    }
2426}
2427
2428// test helper
2429pub struct CheckpointServiceNoop {}
2430impl CheckpointServiceNotify for CheckpointServiceNoop {
2431    fn notify_checkpoint_signature(
2432        &self,
2433        _: &AuthorityPerEpochStore,
2434        _: &CheckpointSignatureMessage,
2435    ) -> IotaResult {
2436        Ok(())
2437    }
2438
2439    fn notify_checkpoint(&self) -> IotaResult {
2440        Ok(())
2441    }
2442}
2443
2444#[cfg(test)]
2445mod tests {
2446    use std::{
2447        collections::{BTreeMap, HashMap},
2448        ops::Deref,
2449    };
2450
2451    use futures::{FutureExt as _, future::BoxFuture};
2452    use iota_macros::sim_test;
2453    use iota_protocol_config::{Chain, ProtocolConfig};
2454    use iota_types::{
2455        base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2456        crypto::Signature,
2457        digests::TransactionEventsDigest,
2458        effects::{TransactionEffects, TransactionEvents},
2459        messages_checkpoint::SignedCheckpointSummary,
2460        move_package::MovePackage,
2461        object,
2462        transaction::{GenesisObject, VerifiedTransaction},
2463    };
2464    use tokio::sync::mpsc;
2465
2466    use super::*;
2467    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2468
2469    #[sim_test]
2470    pub async fn checkpoint_builder_test() {
2471        telemetry_subscribers::init_for_testing();
2472
2473        let mut protocol_config =
2474            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2475        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2476        let state = TestAuthorityBuilder::new()
2477            .with_protocol_config(protocol_config)
2478            .build()
2479            .await;
2480
2481        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2482        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2483            vec![GenesisObject::RawObject {
2484                data: object::Data::Package(
2485                    MovePackage::new(
2486                        ObjectID::random(),
2487                        SequenceNumber::new(),
2488                        BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2489                        100_000,
2490                        // no modules so empty type_origin_table as no types are defined in this
2491                        // package
2492                        Vec::new(),
2493                        // no modules so empty linkage_table as no dependencies of this package
2494                        // exist
2495                        BTreeMap::new(),
2496                    )
2497                    .unwrap(),
2498                ),
2499                owner: object::Owner::Immutable,
2500            }],
2501            vec![],
2502        );
2503        for i in 0..15 {
2504            state
2505                .database_for_testing()
2506                .perpetual_tables
2507                .transactions
2508                .insert(&d(i), dummy_tx.serializable_ref())
2509                .unwrap();
2510        }
2511        for i in 15..20 {
2512            state
2513                .database_for_testing()
2514                .perpetual_tables
2515                .transactions
2516                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2517                .unwrap();
2518        }
2519
2520        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2521        commit_cert_for_test(
2522            &mut store,
2523            state.clone(),
2524            d(1),
2525            vec![d(2), d(3)],
2526            GasCostSummary::new(11, 11, 12, 11, 1),
2527        );
2528        commit_cert_for_test(
2529            &mut store,
2530            state.clone(),
2531            d(2),
2532            vec![d(3), d(4)],
2533            GasCostSummary::new(21, 21, 22, 21, 1),
2534        );
2535        commit_cert_for_test(
2536            &mut store,
2537            state.clone(),
2538            d(3),
2539            vec![],
2540            GasCostSummary::new(31, 31, 32, 31, 1),
2541        );
2542        commit_cert_for_test(
2543            &mut store,
2544            state.clone(),
2545            d(4),
2546            vec![],
2547            GasCostSummary::new(41, 41, 42, 41, 1),
2548        );
2549        for i in [5, 6, 7, 10, 11, 12, 13] {
2550            commit_cert_for_test(
2551                &mut store,
2552                state.clone(),
2553                d(i),
2554                vec![],
2555                GasCostSummary::new(41, 41, 42, 41, 1),
2556            );
2557        }
2558        for i in [15, 16, 17] {
2559            commit_cert_for_test(
2560                &mut store,
2561                state.clone(),
2562                d(i),
2563                vec![],
2564                GasCostSummary::new(51, 51, 52, 51, 1),
2565            );
2566        }
2567        let all_digests: Vec<_> = store.keys().copied().collect();
2568        for digest in all_digests {
2569            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2570            state
2571                .epoch_store_for_testing()
2572                .test_insert_user_signature(digest, vec![signature]);
2573        }
2574
2575        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2576        let (certified_output, mut certified_result) =
2577            mpsc::channel::<CertifiedCheckpointSummary>(10);
2578        let store = Arc::new(store);
2579
2580        let ckpt_dir = tempfile::tempdir().unwrap();
2581        let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2582        let epoch_store = state.epoch_store_for_testing();
2583
2584        let accumulator = Arc::new(StateAccumulator::new_for_tests(
2585            state.get_accumulator_store().clone(),
2586        ));
2587
2588        let checkpoint_service = CheckpointService::build(
2589            state.clone(),
2590            checkpoint_store,
2591            epoch_store.clone(),
2592            store,
2593            Arc::downgrade(&accumulator),
2594            Box::new(output),
2595            Box::new(certified_output),
2596            CheckpointMetrics::new_for_tests(),
2597            3,
2598            100_000,
2599        );
2600        let _tasks = checkpoint_service.spawn().await;
2601
2602        checkpoint_service
2603            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2604            .unwrap();
2605        checkpoint_service
2606            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2607            .unwrap();
2608        checkpoint_service
2609            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2610            .unwrap();
2611        checkpoint_service
2612            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2613            .unwrap();
2614        checkpoint_service
2615            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2616            .unwrap();
2617        checkpoint_service
2618            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2619            .unwrap();
2620
2621        let (c1c, c1s) = result.recv().await.unwrap();
2622        let (c2c, c2s) = result.recv().await.unwrap();
2623
2624        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2625        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2626        assert_eq!(c1t, vec![d(4)]);
2627        assert_eq!(c1s.previous_digest, None);
2628        assert_eq!(c1s.sequence_number, 0);
2629        assert_eq!(
2630            c1s.epoch_rolling_gas_cost_summary,
2631            GasCostSummary::new(41, 41, 42, 41, 1)
2632        );
2633
2634        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2635        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2636        assert_eq!(c2s.sequence_number, 1);
2637        assert_eq!(
2638            c2s.epoch_rolling_gas_cost_summary,
2639            GasCostSummary::new(104, 104, 108, 104, 4)
2640        );
2641
2642        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2643        // Verify that we split into 2 checkpoints.
2644        let (c3c, c3s) = result.recv().await.unwrap();
2645        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2646        let (c4c, c4s) = result.recv().await.unwrap();
2647        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2648        assert_eq!(c3s.sequence_number, 2);
2649        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2650        assert_eq!(c4s.sequence_number, 3);
2651        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2652        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2653        assert_eq!(c4t, vec![d(13)]);
2654
2655        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2656        // max. Verify that we split into 2 checkpoints.
2657        let (c5c, c5s) = result.recv().await.unwrap();
2658        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2659        let (c6c, c6s) = result.recv().await.unwrap();
2660        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2661        assert_eq!(c5s.sequence_number, 4);
2662        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2663        assert_eq!(c6s.sequence_number, 5);
2664        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2665        assert_eq!(c5t, vec![d(15), d(16)]);
2666        assert_eq!(c6t, vec![d(17)]);
2667
2668        // Pending at index 4 was too soon after the prior one and should be coalesced
2669        // into the next one.
2670        let (c7c, c7s) = result.recv().await.unwrap();
2671        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2672        assert_eq!(c7t, vec![d(5), d(6)]);
2673        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2674        assert_eq!(c7s.sequence_number, 6);
2675
2676        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2677        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2678
2679        checkpoint_service
2680            .notify_checkpoint_signature(
2681                &epoch_store,
2682                &CheckpointSignatureMessage { summary: c2ss },
2683            )
2684            .unwrap();
2685        checkpoint_service
2686            .notify_checkpoint_signature(
2687                &epoch_store,
2688                &CheckpointSignatureMessage { summary: c1ss },
2689            )
2690            .unwrap();
2691
2692        let c1sc = certified_result.recv().await.unwrap();
2693        let c2sc = certified_result.recv().await.unwrap();
2694        assert_eq!(c1sc.sequence_number, 0);
2695        assert_eq!(c2sc.sequence_number, 1);
2696    }
2697
2698    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2699        fn notify_read_executed_effects(
2700            &self,
2701            digests: &[TransactionDigest],
2702        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2703            std::future::ready(Ok(digests
2704                .iter()
2705                .map(|d| self.get(d).expect("effects not found").clone())
2706                .collect()))
2707            .boxed()
2708        }
2709
2710        fn notify_read_executed_effects_digests(
2711            &self,
2712            digests: &[TransactionDigest],
2713        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2714            std::future::ready(Ok(digests
2715                .iter()
2716                .map(|d| {
2717                    self.get(d)
2718                        .map(|fx| fx.digest())
2719                        .expect("effects not found")
2720                })
2721                .collect()))
2722            .boxed()
2723        }
2724
2725        fn multi_get_executed_effects(
2726            &self,
2727            digests: &[TransactionDigest],
2728        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2729            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2730        }
2731
2732        // Unimplemented methods - its unfortunate to have this big blob of useless
2733        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2734        // for these tests, as it caused a ton of complication in non-test code.
2735        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2736        // implementors).
2737
2738        fn multi_get_transaction_blocks(
2739            &self,
2740            _: &[TransactionDigest],
2741        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2742            unimplemented!()
2743        }
2744
2745        fn multi_get_executed_effects_digests(
2746            &self,
2747            _: &[TransactionDigest],
2748        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2749            unimplemented!()
2750        }
2751
2752        fn multi_get_effects(
2753            &self,
2754            _: &[TransactionEffectsDigest],
2755        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2756            unimplemented!()
2757        }
2758
2759        fn multi_get_events(
2760            &self,
2761            _: &[TransactionEventsDigest],
2762        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2763            unimplemented!()
2764        }
2765    }
2766
2767    #[async_trait::async_trait]
2768    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2769        async fn checkpoint_created(
2770            &self,
2771            summary: &CheckpointSummary,
2772            contents: &CheckpointContents,
2773            _epoch_store: &Arc<AuthorityPerEpochStore>,
2774            _checkpoint_store: &Arc<CheckpointStore>,
2775        ) -> IotaResult {
2776            self.try_send((contents.clone(), summary.clone())).unwrap();
2777            Ok(())
2778        }
2779    }
2780
2781    #[async_trait::async_trait]
2782    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2783        async fn certified_checkpoint_created(
2784            &self,
2785            summary: &CertifiedCheckpointSummary,
2786        ) -> IotaResult {
2787            self.try_send(summary.clone()).unwrap();
2788            Ok(())
2789        }
2790    }
2791
2792    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2793        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2794            roots: t
2795                .into_iter()
2796                .map(|t| TransactionKey::Digest(d(t)))
2797                .collect(),
2798            details: PendingCheckpointInfo {
2799                timestamp_ms,
2800                last_of_epoch: false,
2801                checkpoint_height: i,
2802            },
2803        })
2804    }
2805
2806    fn d(i: u8) -> TransactionDigest {
2807        let mut bytes: [u8; 32] = Default::default();
2808        bytes[0] = i;
2809        TransactionDigest::new(bytes)
2810    }
2811
2812    fn e(
2813        transaction_digest: TransactionDigest,
2814        dependencies: Vec<TransactionDigest>,
2815        gas_used: GasCostSummary,
2816    ) -> TransactionEffects {
2817        let mut effects = TransactionEffects::default();
2818        *effects.transaction_digest_mut_for_testing() = transaction_digest;
2819        *effects.dependencies_mut_for_testing() = dependencies;
2820        *effects.gas_cost_summary_mut_for_testing() = gas_used;
2821        effects
2822    }
2823
2824    fn commit_cert_for_test(
2825        store: &mut HashMap<TransactionDigest, TransactionEffects>,
2826        state: Arc<AuthorityState>,
2827        digest: TransactionDigest,
2828        dependencies: Vec<TransactionDigest>,
2829        gas_used: GasCostSummary,
2830    ) {
2831        let epoch_store = state.epoch_store_for_testing();
2832        let effects = e(digest, dependencies, gas_used);
2833        store.insert(digest, effects.clone());
2834        epoch_store
2835            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2836            .expect("Inserting cert fx and sigs should not fail");
2837    }
2838}