iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27    committee::StakeUnit,
28    crypto::AuthorityStrongQuorumSignInfo,
29    digests::{CheckpointContentsDigest, CheckpointDigest},
30    effects::{TransactionEffects, TransactionEffectsAPI},
31    error::{IotaError, IotaResult},
32    event::SystemEpochInfoEvent,
33    gas::GasCostSummary,
34    iota_system_state::{
35        IotaSystemState, IotaSystemStateTrait,
36        epoch_start_iota_system_state::EpochStartSystemStateTrait,
37    },
38    message_envelope::Message,
39    messages_checkpoint::{
40        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
41        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
42        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
43        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
44        VerifiedCheckpointContents,
45    },
46    messages_consensus::ConsensusTransactionKey,
47    signature::GenericSignature,
48    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
49};
50use itertools::Itertools;
51use nonempty::NonEmpty;
52use parking_lot::Mutex;
53use rand::{rngs::OsRng, seq::SliceRandom};
54use serde::{Deserialize, Serialize};
55use tokio::{
56    sync::{Notify, watch},
57    task::JoinSet,
58    time::timeout,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use typed_store::{
62    DBMapUtils, Map, TypedStoreError,
63    rocks::{DBMap, MetricConf},
64    traits::{TableSummary, TypedStoreDebug},
65};
66
67pub use crate::checkpoints::{
68    checkpoint_output::{
69        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
70    },
71    metrics::CheckpointMetrics,
72};
73use crate::{
74    authority::{
75        AuthorityState,
76        authority_per_epoch_store::{AuthorityPerEpochStore, scorer::MAX_SCORE},
77    },
78    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
79    checkpoints::{
80        causal_order::CausalOrder,
81        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
82    },
83    consensus_handler::SequencedConsensusTransactionKey,
84    execution_cache::TransactionCacheRead,
85    stake_aggregator::{InsertResult, MultiStakeAggregator},
86    state_accumulator::StateAccumulator,
87};
88
89pub type CheckpointHeight = u64;
90
91pub struct EpochStats {
92    pub checkpoint_count: u64,
93    pub transaction_count: u64,
94    pub total_gas_reward: u64,
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize)]
98pub struct PendingCheckpointInfo {
99    pub timestamp_ms: CheckpointTimestamp,
100    pub last_of_epoch: bool,
101    pub checkpoint_height: CheckpointHeight,
102}
103
104#[derive(Clone, Debug, Serialize, Deserialize)]
105pub enum PendingCheckpoint {
106    // This is an enum for future upgradability, though at the moment there is only one variant.
107    V1(PendingCheckpointContentsV1),
108}
109
110#[derive(Clone, Debug, Serialize, Deserialize)]
111pub struct PendingCheckpointContentsV1 {
112    pub roots: Vec<TransactionKey>,
113    pub details: PendingCheckpointInfo,
114}
115
116impl PendingCheckpoint {
117    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
118        match self {
119            PendingCheckpoint::V1(contents) => contents,
120        }
121    }
122
123    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
124        match self {
125            PendingCheckpoint::V1(contents) => contents,
126        }
127    }
128
129    pub fn roots(&self) -> &Vec<TransactionKey> {
130        &self.as_v1().roots
131    }
132
133    pub fn details(&self) -> &PendingCheckpointInfo {
134        &self.as_v1().details
135    }
136
137    pub fn height(&self) -> CheckpointHeight {
138        self.details().checkpoint_height
139    }
140}
141
142#[derive(Clone, Debug, Serialize, Deserialize)]
143pub struct BuilderCheckpointSummary {
144    pub summary: CheckpointSummary,
145    // Height at which this checkpoint summary was built. None for genesis checkpoint
146    pub checkpoint_height: Option<CheckpointHeight>,
147    pub position_in_commit: usize,
148}
149
150#[derive(DBMapUtils)]
151pub struct CheckpointStoreTables {
152    /// Maps checkpoint contents digest to checkpoint contents
153    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
154
155    /// Maps checkpoint contents digest to checkpoint sequence number.
156    /// Entries from this table are deleted after state accumulation has
157    /// completed together with the corresponding full_checkpoint_content.
158    pub(crate) checkpoint_sequence_by_contents_digest:
159        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
160
161    /// Stores entire checkpoint contents from state sync, indexed by sequence
162    /// number, for efficient reads of full checkpoints. Entries from this
163    /// table are deleted after state accumulation has completed. See
164    /// NUM_SAVED_FULL_CHECKPOINT_CONTENTS.
165    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
166
167    /// Stores certified checkpoints
168    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
169    /// Map from checkpoint digest to certified checkpoint
170    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
171
172    /// Store locally computed checkpoint summaries so that we can detect forks
173    /// and log useful information. Can be pruned as soon as we verify that
174    /// we are in agreement with the latest certified checkpoint.
175    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
176
177    /// A map from epoch ID to the sequence number of the last checkpoint in
178    /// that epoch.
179    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
180
181    /// Watermarks used to determine the highest verified, fully synced, and
182    /// fully executed checkpoints
183    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
184}
185
186impl CheckpointStoreTables {
187    pub fn new(path: &Path, metric_name: &'static str) -> Self {
188        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
189    }
190    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
191        Self::get_read_only_handle(
192            path.to_path_buf(),
193            None,
194            None,
195            MetricConf::new("checkpoint_readonly"),
196        )
197    }
198}
199
200pub struct CheckpointStore {
201    pub(crate) tables: CheckpointStoreTables,
202    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
203    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
204}
205
206impl CheckpointStore {
207    pub fn new(path: &Path) -> Arc<Self> {
208        let tables = CheckpointStoreTables::new(path, "checkpoint");
209        Arc::new(Self {
210            tables,
211            synced_checkpoint_notify_read: NotifyRead::new(),
212            executed_checkpoint_notify_read: NotifyRead::new(),
213        })
214    }
215
216    pub fn new_for_tests() -> Arc<Self> {
217        let ckpt_dir = tempfile::tempdir().unwrap();
218        CheckpointStore::new(ckpt_dir.path())
219    }
220
221    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
222        let tables = CheckpointStoreTables::new(path, "db_checkpoint");
223        Arc::new(Self {
224            tables,
225            synced_checkpoint_notify_read: NotifyRead::new(),
226            executed_checkpoint_notify_read: NotifyRead::new(),
227        })
228    }
229
230    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
231        CheckpointStoreTables::open_readonly(path)
232    }
233
234    #[instrument(level = "info", skip_all)]
235    pub fn insert_genesis_checkpoint(
236        &self,
237        checkpoint: VerifiedCheckpoint,
238        contents: CheckpointContents,
239        epoch_store: &AuthorityPerEpochStore,
240    ) {
241        assert_eq!(
242            checkpoint.epoch(),
243            0,
244            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
245        );
246        assert_eq!(
247            *checkpoint.sequence_number(),
248            0,
249            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
250        );
251
252        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
253        // already
254        if self
255            .get_checkpoint_by_digest(checkpoint.digest())
256            .unwrap()
257            .is_none()
258        {
259            if epoch_store.epoch() == checkpoint.epoch {
260                epoch_store
261                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
262                    .unwrap();
263            } else {
264                debug!(
265                    validator_epoch =% epoch_store.epoch(),
266                    genesis_epoch =% checkpoint.epoch(),
267                    "Not inserting checkpoint builder data for genesis checkpoint",
268                );
269            }
270            self.insert_checkpoint_contents(contents).unwrap();
271            self.insert_verified_checkpoint(&checkpoint).unwrap();
272            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
273        }
274    }
275
276    pub fn get_checkpoint_by_digest(
277        &self,
278        digest: &CheckpointDigest,
279    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
280        self.tables
281            .checkpoint_by_digest
282            .get(digest)
283            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
284    }
285
286    pub fn get_checkpoint_by_sequence_number(
287        &self,
288        sequence_number: CheckpointSequenceNumber,
289    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
290        self.tables
291            .certified_checkpoints
292            .get(&sequence_number)
293            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
294    }
295
296    pub fn get_locally_computed_checkpoint(
297        &self,
298        sequence_number: CheckpointSequenceNumber,
299    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
300        self.tables
301            .locally_computed_checkpoints
302            .get(&sequence_number)
303    }
304
305    /// Get checkpoint sequence number by contents digest.
306    ///
307    /// Entries from this table are deleted after state accumulation has
308    /// completed together with the corresponding full_checkpoint_content.
309    pub fn get_sequence_number_by_contents_digest(
310        &self,
311        digest: &CheckpointContentsDigest,
312    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
313        self.tables
314            .checkpoint_sequence_by_contents_digest
315            .get(digest)
316    }
317
318    pub fn delete_contents_digest_sequence_number_mapping(
319        &self,
320        digest: &CheckpointContentsDigest,
321    ) -> Result<(), TypedStoreError> {
322        self.tables
323            .checkpoint_sequence_by_contents_digest
324            .remove(digest)
325    }
326
327    pub fn get_latest_certified_checkpoint(
328        &self,
329    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
330        Ok(self
331            .tables
332            .certified_checkpoints
333            .reversed_safe_iter_with_bounds(None, None)?
334            .next()
335            .transpose()?
336            .map(|(_, v)| v.into()))
337    }
338
339    pub fn get_latest_locally_computed_checkpoint(
340        &self,
341    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
342        Ok(self
343            .tables
344            .locally_computed_checkpoints
345            .reversed_safe_iter_with_bounds(None, None)?
346            .next()
347            .transpose()?
348            .map(|(_, v)| v))
349    }
350
351    pub fn multi_get_checkpoint_by_sequence_number(
352        &self,
353        sequence_numbers: &[CheckpointSequenceNumber],
354    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
355        let checkpoints = self
356            .tables
357            .certified_checkpoints
358            .multi_get(sequence_numbers)?
359            .into_iter()
360            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
361            .collect();
362
363        Ok(checkpoints)
364    }
365
366    pub fn multi_get_checkpoint_content(
367        &self,
368        contents_digest: &[CheckpointContentsDigest],
369    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
370        self.tables.checkpoint_content.multi_get(contents_digest)
371    }
372
373    pub fn get_highest_verified_checkpoint(
374        &self,
375    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
376        let highest_verified = if let Some(highest_verified) = self
377            .tables
378            .watermarks
379            .get(&CheckpointWatermark::HighestVerified)?
380        {
381            highest_verified
382        } else {
383            return Ok(None);
384        };
385        self.get_checkpoint_by_digest(&highest_verified.1)
386    }
387
388    pub fn get_highest_synced_checkpoint(
389        &self,
390    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
391        let highest_synced = if let Some(highest_synced) = self
392            .tables
393            .watermarks
394            .get(&CheckpointWatermark::HighestSynced)?
395        {
396            highest_synced
397        } else {
398            return Ok(None);
399        };
400        self.get_checkpoint_by_digest(&highest_synced.1)
401    }
402
403    pub fn get_highest_synced_checkpoint_seq_number(
404        &self,
405    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
406        if let Some(highest_synced) = self
407            .tables
408            .watermarks
409            .get(&CheckpointWatermark::HighestSynced)?
410        {
411            Ok(Some(highest_synced.0))
412        } else {
413            Ok(None)
414        }
415    }
416
417    pub fn get_highest_executed_checkpoint_seq_number(
418        &self,
419    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
420        if let Some(highest_executed) = self
421            .tables
422            .watermarks
423            .get(&CheckpointWatermark::HighestExecuted)?
424        {
425            Ok(Some(highest_executed.0))
426        } else {
427            Ok(None)
428        }
429    }
430
431    pub fn get_highest_executed_checkpoint(
432        &self,
433    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
434        let highest_executed = if let Some(highest_executed) = self
435            .tables
436            .watermarks
437            .get(&CheckpointWatermark::HighestExecuted)?
438        {
439            highest_executed
440        } else {
441            return Ok(None);
442        };
443        self.get_checkpoint_by_digest(&highest_executed.1)
444    }
445
446    pub fn get_highest_pruned_checkpoint_seq_number(
447        &self,
448    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
449        self.tables
450            .watermarks
451            .get(&CheckpointWatermark::HighestPruned)
452            .map(|watermark| watermark.map(|w| w.0))
453    }
454
455    pub fn get_checkpoint_contents(
456        &self,
457        digest: &CheckpointContentsDigest,
458    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
459        self.tables.checkpoint_content.get(digest)
460    }
461
462    pub fn get_full_checkpoint_contents_by_sequence_number(
463        &self,
464        seq: CheckpointSequenceNumber,
465    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
466        self.tables.full_checkpoint_content.get(&seq)
467    }
468
469    fn prune_local_summaries(&self) -> IotaResult {
470        if let Some((last_local_summary, _)) = self
471            .tables
472            .locally_computed_checkpoints
473            .reversed_safe_iter_with_bounds(None, None)?
474            .next()
475            .transpose()?
476        {
477            let mut batch = self.tables.locally_computed_checkpoints.batch();
478            batch.schedule_delete_range(
479                &self.tables.locally_computed_checkpoints,
480                &0,
481                &last_local_summary,
482            )?;
483            batch.write()?;
484            info!("Pruned local summaries up to {:?}", last_local_summary);
485        }
486        Ok(())
487    }
488
489    #[instrument(level = "trace", skip_all)]
490    fn check_for_checkpoint_fork(
491        &self,
492        local_checkpoint: &CheckpointSummary,
493        verified_checkpoint: &VerifiedCheckpoint,
494    ) {
495        if local_checkpoint != verified_checkpoint.data() {
496            let verified_contents = self
497                .get_checkpoint_contents(&verified_checkpoint.content_digest)
498                .map(|opt_contents| {
499                    opt_contents
500                        .map(|contents| format!("{contents:?}"))
501                        .unwrap_or_else(|| {
502                            format!(
503                                "Verified checkpoint contents not found, digest: {:?}",
504                                verified_checkpoint.content_digest,
505                            )
506                        })
507                })
508                .map_err(|e| {
509                    format!(
510                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
511                        verified_checkpoint.content_digest, e
512                    )
513                })
514                .unwrap_or_else(|err_msg| err_msg);
515
516            let local_contents = self
517                .get_checkpoint_contents(&local_checkpoint.content_digest)
518                .map(|opt_contents| {
519                    opt_contents
520                        .map(|contents| format!("{contents:?}"))
521                        .unwrap_or_else(|| {
522                            format!(
523                                "Local checkpoint contents not found, digest: {:?}",
524                                local_checkpoint.content_digest
525                            )
526                        })
527                })
528                .map_err(|e| {
529                    format!(
530                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
531                        local_checkpoint.content_digest, e
532                    )
533                })
534                .unwrap_or_else(|err_msg| err_msg);
535
536            // checkpoint contents may be too large for panic message.
537            error!(
538                verified_checkpoint = ?verified_checkpoint.data(),
539                ?verified_contents,
540                ?local_checkpoint,
541                ?local_contents,
542                "Local checkpoint fork detected!",
543            );
544            fatal!(
545                "Local checkpoint fork detected for sequence number: {}",
546                local_checkpoint.sequence_number()
547            );
548        }
549    }
550
551    // Called by consensus (ConsensusAggregator).
552    // Different from `insert_verified_checkpoint`, it does not touch
553    // the highest_verified_checkpoint watermark such that state sync
554    // will have a chance to process this checkpoint and perform some
555    // state-sync only things.
556    pub fn insert_certified_checkpoint(
557        &self,
558        checkpoint: &VerifiedCheckpoint,
559    ) -> Result<(), TypedStoreError> {
560        debug!(
561            checkpoint_seq = checkpoint.sequence_number(),
562            "Inserting certified checkpoint",
563        );
564        let mut batch = self.tables.certified_checkpoints.batch();
565        batch
566            .insert_batch(
567                &self.tables.certified_checkpoints,
568                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
569            )?
570            .insert_batch(
571                &self.tables.checkpoint_by_digest,
572                [(checkpoint.digest(), checkpoint.serializable_ref())],
573            )?;
574        if checkpoint.next_epoch_committee().is_some() {
575            batch.insert_batch(
576                &self.tables.epoch_last_checkpoint_map,
577                [(&checkpoint.epoch(), checkpoint.sequence_number())],
578            )?;
579        }
580        batch.write()?;
581
582        if let Some(local_checkpoint) = self
583            .tables
584            .locally_computed_checkpoints
585            .get(checkpoint.sequence_number())?
586        {
587            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
588        }
589
590        Ok(())
591    }
592
593    // Called by state sync, apart from inserting the checkpoint and updating
594    // related tables, it also bumps the highest_verified_checkpoint watermark.
595    #[instrument(level = "trace", skip_all)]
596    pub fn insert_verified_checkpoint(
597        &self,
598        checkpoint: &VerifiedCheckpoint,
599    ) -> Result<(), TypedStoreError> {
600        self.insert_certified_checkpoint(checkpoint)?;
601        self.update_highest_verified_checkpoint(checkpoint)
602    }
603
604    pub fn update_highest_verified_checkpoint(
605        &self,
606        checkpoint: &VerifiedCheckpoint,
607    ) -> Result<(), TypedStoreError> {
608        if Some(*checkpoint.sequence_number())
609            > self
610                .get_highest_verified_checkpoint()?
611                .map(|x| *x.sequence_number())
612        {
613            debug!(
614                checkpoint_seq = checkpoint.sequence_number(),
615                "Updating highest verified checkpoint",
616            );
617            self.tables.watermarks.insert(
618                &CheckpointWatermark::HighestVerified,
619                &(*checkpoint.sequence_number(), *checkpoint.digest()),
620            )?;
621        }
622
623        Ok(())
624    }
625
626    pub fn update_highest_synced_checkpoint(
627        &self,
628        checkpoint: &VerifiedCheckpoint,
629    ) -> Result<(), TypedStoreError> {
630        let seq = *checkpoint.sequence_number();
631        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
632        self.tables.watermarks.insert(
633            &CheckpointWatermark::HighestSynced,
634            &(seq, *checkpoint.digest()),
635        )?;
636        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
637        Ok(())
638    }
639
640    async fn notify_read_checkpoint_watermark<F>(
641        &self,
642        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
643        seq: CheckpointSequenceNumber,
644        get_watermark: F,
645    ) -> VerifiedCheckpoint
646    where
647        F: Fn() -> Option<CheckpointSequenceNumber>,
648    {
649        type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
650
651        notify_read
652            .read(&[seq], |seqs| {
653                let seq = seqs[0];
654                let Some(highest) = get_watermark() else {
655                    return Ok(vec![None]) as ReadResult;
656                };
657                if highest < seq {
658                    return Ok(vec![None]) as ReadResult;
659                }
660                let checkpoint = self
661                    .get_checkpoint_by_sequence_number(seq)
662                    .expect("db error")
663                    .expect("checkpoint not found");
664                Ok(vec![Some(checkpoint)]) as ReadResult
665            })
666            .await
667            .unwrap()
668            .into_iter()
669            .next()
670            .unwrap()
671    }
672
673    pub async fn notify_read_synced_checkpoint(
674        &self,
675        seq: CheckpointSequenceNumber,
676    ) -> VerifiedCheckpoint {
677        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
678            self.get_highest_synced_checkpoint_seq_number()
679                .expect("db error")
680        })
681        .await
682    }
683
684    pub async fn notify_read_executed_checkpoint(
685        &self,
686        seq: CheckpointSequenceNumber,
687    ) -> VerifiedCheckpoint {
688        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
689            self.get_highest_executed_checkpoint_seq_number()
690                .expect("db error")
691        })
692        .await
693    }
694
695    pub fn update_highest_executed_checkpoint(
696        &self,
697        checkpoint: &VerifiedCheckpoint,
698    ) -> Result<(), TypedStoreError> {
699        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
700            if seq_number >= *checkpoint.sequence_number() {
701                return Ok(());
702            }
703            assert_eq!(
704                seq_number + 1,
705                *checkpoint.sequence_number(),
706                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
707                checkpoint.sequence_number(),
708                seq_number
709            );
710        }
711        let seq = *checkpoint.sequence_number();
712        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
713        self.tables.watermarks.insert(
714            &CheckpointWatermark::HighestExecuted,
715            &(seq, *checkpoint.digest()),
716        )?;
717        self.executed_checkpoint_notify_read
718            .notify(&seq, checkpoint);
719        Ok(())
720    }
721
722    pub fn update_highest_pruned_checkpoint(
723        &self,
724        checkpoint: &VerifiedCheckpoint,
725    ) -> Result<(), TypedStoreError> {
726        self.tables.watermarks.insert(
727            &CheckpointWatermark::HighestPruned,
728            &(*checkpoint.sequence_number(), *checkpoint.digest()),
729        )
730    }
731
732    /// Sets highest executed checkpoint to any value.
733    ///
734    /// WARNING: This method is very subtle and can corrupt the database if used
735    /// incorrectly. It should only be used in one-off cases or tests after
736    /// fully understanding the risk.
737    pub fn set_highest_executed_checkpoint_subtle(
738        &self,
739        checkpoint: &VerifiedCheckpoint,
740    ) -> Result<(), TypedStoreError> {
741        self.tables.watermarks.insert(
742            &CheckpointWatermark::HighestExecuted,
743            &(*checkpoint.sequence_number(), *checkpoint.digest()),
744        )
745    }
746
747    pub fn insert_checkpoint_contents(
748        &self,
749        contents: CheckpointContents,
750    ) -> Result<(), TypedStoreError> {
751        debug!(
752            checkpoint_seq = ?contents.digest(),
753            "Inserting checkpoint contents",
754        );
755        self.tables
756            .checkpoint_content
757            .insert(contents.digest(), &contents)
758    }
759
760    /// Inserts the full checkpoint contents along with the mapping from
761    /// contents digest to sequence number, and the checkpoint contents.
762    ///
763    /// The entries for mapping the contents digest to sequence number,
764    /// and the full_checkpoint_content are deleted after state accumulation has
765    /// completed. See NUM_SAVED_FULL_CHECKPOINT_CONTENTS.
766    pub fn insert_verified_checkpoint_contents(
767        &self,
768        checkpoint: &VerifiedCheckpoint,
769        full_contents: VerifiedCheckpointContents,
770    ) -> Result<(), TypedStoreError> {
771        let mut batch = self.tables.full_checkpoint_content.batch();
772        batch.insert_batch(
773            &self.tables.checkpoint_sequence_by_contents_digest,
774            [(&checkpoint.content_digest, checkpoint.sequence_number())],
775        )?;
776        let full_contents = full_contents.into_inner();
777        batch.insert_batch(
778            &self.tables.full_checkpoint_content,
779            [(checkpoint.sequence_number(), &full_contents)],
780        )?;
781
782        let contents = full_contents.into_checkpoint_contents();
783        assert_eq!(&checkpoint.content_digest, contents.digest());
784
785        batch.insert_batch(
786            &self.tables.checkpoint_content,
787            [(contents.digest(), &contents)],
788        )?;
789
790        batch.write()
791    }
792
793    pub fn delete_full_checkpoint_contents(
794        &self,
795        seq: CheckpointSequenceNumber,
796    ) -> Result<(), TypedStoreError> {
797        self.tables.full_checkpoint_content.remove(&seq)
798    }
799
800    pub fn get_epoch_last_checkpoint(
801        &self,
802        epoch_id: EpochId,
803    ) -> IotaResult<Option<VerifiedCheckpoint>> {
804        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
805        let checkpoint = match seq {
806            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
807            None => None,
808        };
809        Ok(checkpoint)
810    }
811
812    pub fn insert_epoch_last_checkpoint(
813        &self,
814        epoch_id: EpochId,
815        checkpoint: &VerifiedCheckpoint,
816    ) -> IotaResult {
817        self.tables
818            .epoch_last_checkpoint_map
819            .insert(&epoch_id, checkpoint.sequence_number())?;
820        Ok(())
821    }
822
823    pub fn get_epoch_state_commitments(
824        &self,
825        epoch: EpochId,
826    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
827        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
828            checkpoint
829                .end_of_epoch_data
830                .as_ref()
831                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
832                .epoch_commitments
833                .clone()
834        });
835        Ok(commitments)
836    }
837
838    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
839    /// statistics of the epoch.
840    pub fn get_epoch_stats(
841        &self,
842        epoch: EpochId,
843        last_checkpoint: &CheckpointSummary,
844    ) -> Option<EpochStats> {
845        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
846            (0, 0)
847        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
848            (
849                checkpoint.sequence_number + 1,
850                checkpoint.network_total_transactions,
851            )
852        } else {
853            return None;
854        };
855        Some(EpochStats {
856            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
857            transaction_count: last_checkpoint.network_total_transactions
858                - prev_epoch_network_transactions,
859            total_gas_reward: last_checkpoint
860                .epoch_rolling_gas_cost_summary
861                .computation_cost,
862        })
863    }
864
865    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
866        // This checkpoints the entire db and not one column family
867        self.tables
868            .checkpoint_content
869            .checkpoint_db(path)
870            .map_err(Into::into)
871    }
872
873    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
874        let mut wb = self.tables.watermarks.batch();
875        wb.delete_batch(
876            &self.tables.watermarks,
877            std::iter::once(CheckpointWatermark::HighestExecuted),
878        )?;
879        wb.write()?;
880        Ok(())
881    }
882
883    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
884        self.delete_highest_executed_checkpoint_test_only()?;
885        self.tables.watermarks.rocksdb.flush()?;
886        Ok(())
887    }
888}
889
890#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
891pub enum CheckpointWatermark {
892    HighestVerified,
893    HighestSynced,
894    HighestExecuted,
895    HighestPruned,
896}
897
898pub struct CheckpointBuilder {
899    state: Arc<AuthorityState>,
900    store: Arc<CheckpointStore>,
901    epoch_store: Arc<AuthorityPerEpochStore>,
902    notify: Arc<Notify>,
903    notify_aggregator: Arc<Notify>,
904    last_built: watch::Sender<CheckpointSequenceNumber>,
905    effects_store: Arc<dyn TransactionCacheRead>,
906    accumulator: Weak<StateAccumulator>,
907    output: Box<dyn CheckpointOutput>,
908    metrics: Arc<CheckpointMetrics>,
909    max_transactions_per_checkpoint: usize,
910    max_checkpoint_size_bytes: usize,
911}
912
913pub struct CheckpointAggregator {
914    store: Arc<CheckpointStore>,
915    epoch_store: Arc<AuthorityPerEpochStore>,
916    notify: Arc<Notify>,
917    current: Option<CheckpointSignatureAggregator>,
918    output: Box<dyn CertifiedCheckpointOutput>,
919    state: Arc<AuthorityState>,
920    metrics: Arc<CheckpointMetrics>,
921}
922
923// This holds information to aggregate signatures for one checkpoint
924pub struct CheckpointSignatureAggregator {
925    next_index: u64,
926    summary: CheckpointSummary,
927    digest: CheckpointDigest,
928    /// Aggregates voting stake for each signed checkpoint proposal by authority
929    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
930    store: Arc<CheckpointStore>,
931    state: Arc<AuthorityState>,
932    metrics: Arc<CheckpointMetrics>,
933}
934
935impl CheckpointBuilder {
936    fn new(
937        state: Arc<AuthorityState>,
938        store: Arc<CheckpointStore>,
939        epoch_store: Arc<AuthorityPerEpochStore>,
940        notify: Arc<Notify>,
941        effects_store: Arc<dyn TransactionCacheRead>,
942        accumulator: Weak<StateAccumulator>,
943        output: Box<dyn CheckpointOutput>,
944        notify_aggregator: Arc<Notify>,
945        last_built: watch::Sender<CheckpointSequenceNumber>,
946        metrics: Arc<CheckpointMetrics>,
947        max_transactions_per_checkpoint: usize,
948        max_checkpoint_size_bytes: usize,
949    ) -> Self {
950        Self {
951            state,
952            store,
953            epoch_store,
954            notify,
955            effects_store,
956            accumulator,
957            output,
958            notify_aggregator,
959            last_built,
960            metrics,
961            max_transactions_per_checkpoint,
962            max_checkpoint_size_bytes,
963        }
964    }
965
966    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
967    /// creation of checkpoints.
968    async fn run(mut self) {
969        info!("Starting CheckpointBuilder");
970        loop {
971            self.maybe_build_checkpoints().await;
972
973            self.notify.notified().await;
974        }
975    }
976
977    async fn maybe_build_checkpoints(&mut self) {
978        let _scope = monitored_scope("BuildCheckpoints");
979
980        // Collect info about the most recently built checkpoint.
981        let summary = self
982            .epoch_store
983            .last_built_checkpoint_builder_summary()
984            .expect("epoch should not have ended");
985        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
986        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
987
988        let min_checkpoint_interval_ms = self
989            .epoch_store
990            .protocol_config()
991            .min_checkpoint_interval_ms_as_option()
992            .unwrap_or_default();
993        let mut grouped_pending_checkpoints = Vec::new();
994        let mut checkpoints_iter = self
995            .epoch_store
996            .get_pending_checkpoints(last_height)
997            .expect("unexpected epoch store error")
998            .into_iter()
999            .peekable();
1000        while let Some((height, pending)) = checkpoints_iter.next() {
1001            // Group PendingCheckpoints until:
1002            // - minimum interval has elapsed ...
1003            let current_timestamp = pending.details().timestamp_ms;
1004            let can_build = match last_timestamp {
1005                    Some(last_timestamp) => {
1006                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1007                    }
1008                    None => true,
1009                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1010                //   should be written separately) ...
1011                } || checkpoints_iter
1012                    .peek()
1013                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1014                // - or, we have reached end of epoch.
1015                    || pending.details().last_of_epoch;
1016            grouped_pending_checkpoints.push(pending);
1017            if !can_build {
1018                debug!(
1019                    checkpoint_commit_height = height,
1020                    ?last_timestamp,
1021                    ?current_timestamp,
1022                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1023                );
1024                continue;
1025            }
1026
1027            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1028            last_height = Some(height);
1029            last_timestamp = Some(current_timestamp);
1030            debug!(
1031                checkpoint_commit_height = height,
1032                "Making checkpoint at commit height"
1033            );
1034
1035            match self
1036                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1037                .await
1038            {
1039                Ok(seq) => {
1040                    self.last_built.send_if_modified(|cur| {
1041                        // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1042                        if seq > *cur {
1043                            *cur = seq;
1044                            true
1045                        } else {
1046                            false
1047                        }
1048                    });
1049                }
1050                Err(e) => {
1051                    error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1052                    tokio::time::sleep(Duration::from_secs(1)).await;
1053                    self.metrics.checkpoint_errors.inc();
1054                    return;
1055                }
1056            }
1057            // Ensure that the task can be cancelled at end of epoch, even if no other await
1058            // yields execution.
1059            tokio::task::yield_now().await;
1060        }
1061        debug!(
1062            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1063            grouped_pending_checkpoints.len(),
1064        );
1065    }
1066
1067    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1068    async fn make_checkpoint(
1069        &self,
1070        pendings: Vec<PendingCheckpoint>,
1071    ) -> anyhow::Result<CheckpointSequenceNumber> {
1072        let last_details = pendings.last().unwrap().details().clone();
1073
1074        // Keeps track of the effects that are already included in the current
1075        // checkpoint. This is used when there are multiple pending checkpoints
1076        // to create a single checkpoint because in such scenarios, dependencies
1077        // of a transaction may in earlier created checkpoints, or in earlier
1078        // pending checkpoints.
1079        let mut effects_in_current_checkpoint = BTreeSet::new();
1080
1081        // Stores the transactions that should be included in the checkpoint.
1082        // Transactions will be recorded in the checkpoint in this order.
1083        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1084        for pending_checkpoint in pendings.into_iter() {
1085            let pending = pending_checkpoint.into_v1();
1086            let txn_in_checkpoint = self
1087                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1088                .await?;
1089            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1090        }
1091        let new_checkpoints = self
1092            .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1093            .await?;
1094        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1095        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1096            .await?;
1097        Ok(highest_sequence)
1098    }
1099
1100    // Given the root transactions of a pending checkpoint, resolve the transactions
1101    // should be included in the checkpoint, and return them in the order they
1102    // should be included in the checkpoint. `effects_in_current_checkpoint`
1103    // tracks the transactions that already exist in the current checkpoint.
1104    #[instrument(level = "debug", skip_all)]
1105    async fn resolve_checkpoint_transactions(
1106        &self,
1107        roots: Vec<TransactionKey>,
1108        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1109    ) -> IotaResult<Vec<TransactionEffects>> {
1110        self.metrics
1111            .checkpoint_roots_count
1112            .inc_by(roots.len() as u64);
1113
1114        let root_digests = self
1115            .epoch_store
1116            .notify_read_executed_digests(&roots)
1117            .in_monitored_scope("CheckpointNotifyDigests")
1118            .await?;
1119        let root_effects = self
1120            .effects_store
1121            .try_notify_read_executed_effects(&root_digests)
1122            .in_monitored_scope("CheckpointNotifyRead")
1123            .await?;
1124
1125        let _scope = monitored_scope("CheckpointBuilder");
1126
1127        let consensus_commit_prologue = {
1128            // If the roots contains consensus commit prologue transaction, we want to
1129            // extract it, and put it to the front of the checkpoint.
1130
1131            let consensus_commit_prologue = self
1132                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1133                .await?;
1134
1135            // Get the un-included dependencies of the consensus commit prologue. We should
1136            // expect no other dependencies that haven't been included in any
1137            // previous checkpoints.
1138            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1139                let unsorted_ccp = self.complete_checkpoint_effects(
1140                    vec![ccp_effects.clone()],
1141                    effects_in_current_checkpoint,
1142                )?;
1143
1144                // No other dependencies of this consensus commit prologue that haven't been
1145                // included in any previous checkpoint.
1146                if unsorted_ccp.len() != 1 {
1147                    fatal!(
1148                        "Expected 1 consensus commit prologue, got {:?}",
1149                        unsorted_ccp
1150                            .iter()
1151                            .map(|e| e.transaction_digest())
1152                            .collect::<Vec<_>>()
1153                    );
1154                }
1155                assert_eq!(unsorted_ccp.len(), 1);
1156                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1157            }
1158            consensus_commit_prologue
1159        };
1160
1161        let unsorted =
1162            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1163
1164        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1165        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1166        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1167            #[cfg(debug_assertions)]
1168            {
1169                // When consensus_commit_prologue is extracted, it should not be included in the
1170                // `unsorted`.
1171                for tx in unsorted.iter() {
1172                    assert!(tx.transaction_digest() != &_ccp_digest);
1173                }
1174            }
1175            sorted.push(ccp_effects);
1176        }
1177        sorted.extend(CausalOrder::causal_sort(unsorted));
1178
1179        #[cfg(msim)]
1180        {
1181            // Check consensus commit prologue invariants in sim test.
1182            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1183        }
1184
1185        Ok(sorted)
1186    }
1187
1188    // This function is used to extract the consensus commit prologue digest and
1189    // effects from the root transactions.
1190    // The consensus commit prologue is expected to be the first transaction in the
1191    // roots.
1192    async fn extract_consensus_commit_prologue(
1193        &self,
1194        root_digests: &[TransactionDigest],
1195        root_effects: &[TransactionEffects],
1196    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1197        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1198        if root_digests.is_empty() {
1199            return Ok(None);
1200        }
1201
1202        // Reads the first transaction in the roots, and checks whether it is a
1203        // consensus commit prologue transaction.
1204        // The consensus commit prologue transaction should be the first transaction
1205        // in the roots written by the consensus handler.
1206        let first_tx = self
1207            .state
1208            .get_transaction_cache_reader()
1209            .try_get_transaction_block(&root_digests[0])?
1210            .expect("Transaction block must exist");
1211
1212        Ok(match first_tx.transaction_data().kind() {
1213            TransactionKind::ConsensusCommitPrologueV1(_) => {
1214                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1215                Some((*first_tx.digest(), root_effects[0].clone()))
1216            }
1217            _ => None,
1218        })
1219    }
1220
1221    /// Writes the new checkpoints to the DB storage and processes them.
1222    #[instrument(level = "debug", skip_all)]
1223    async fn write_checkpoints(
1224        &self,
1225        height: CheckpointHeight,
1226        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1227    ) -> IotaResult {
1228        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1229        let mut batch = self.store.tables.checkpoint_content.batch();
1230        let mut all_tx_digests =
1231            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1232
1233        for (summary, contents) in &new_checkpoints {
1234            debug!(
1235                checkpoint_commit_height = height,
1236                checkpoint_seq = summary.sequence_number,
1237                contents_digest = ?contents.digest(),
1238                "writing checkpoint",
1239            );
1240
1241            if let Some(previously_computed_summary) = self
1242                .store
1243                .tables
1244                .locally_computed_checkpoints
1245                .get(&summary.sequence_number)?
1246            {
1247                if previously_computed_summary != *summary {
1248                    // Panic so that we don't send out an equivocating checkpoint sig.
1249                    fatal!(
1250                        "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1251                        summary.sequence_number,
1252                    );
1253                }
1254            }
1255
1256            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1257
1258            self.metrics
1259                .transactions_included_in_checkpoint
1260                .inc_by(contents.size() as u64);
1261            let sequence_number = summary.sequence_number;
1262            self.metrics
1263                .last_constructed_checkpoint
1264                .set(sequence_number as i64);
1265
1266            batch.insert_batch(
1267                &self.store.tables.checkpoint_content,
1268                [(contents.digest(), contents)],
1269            )?;
1270
1271            batch.insert_batch(
1272                &self.store.tables.locally_computed_checkpoints,
1273                [(sequence_number, summary)],
1274            )?;
1275        }
1276
1277        batch.write()?;
1278
1279        // Send all checkpoint sigs to consensus. The messages including
1280        // MisbehaviorReports are also sent in this step.
1281        for (summary, contents) in &new_checkpoints {
1282            self.output
1283                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1284                .await?;
1285        }
1286
1287        for (local_checkpoint, _) in &new_checkpoints {
1288            if let Some(certified_checkpoint) = self
1289                .store
1290                .tables
1291                .certified_checkpoints
1292                .get(local_checkpoint.sequence_number())?
1293            {
1294                self.store
1295                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1296            }
1297        }
1298
1299        self.notify_aggregator.notify_one();
1300        self.epoch_store
1301            .process_constructed_checkpoint(height, new_checkpoints);
1302        Ok(())
1303    }
1304
1305    #[expect(clippy::type_complexity)]
1306    fn split_checkpoint_chunks(
1307        &self,
1308        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1309        signatures: Vec<Vec<GenericSignature>>,
1310    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1311        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1312        let mut chunks = Vec::new();
1313        let mut chunk = Vec::new();
1314        let mut chunk_size: usize = 0;
1315        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1316            .into_iter()
1317            .zip(signatures.into_iter())
1318        {
1319            // Roll over to a new chunk after either max count or max size is reached.
1320            // The size calculation here is intended to estimate the size of the
1321            // FullCheckpointContents struct. If this code is modified, that struct
1322            // should also be updated accordingly.
1323            let size = transaction_size
1324                + bcs::serialized_size(&effects)?
1325                + bcs::serialized_size(&signatures)?;
1326            if chunk.len() == self.max_transactions_per_checkpoint
1327                || (chunk_size + size) > self.max_checkpoint_size_bytes
1328            {
1329                if chunk.is_empty() {
1330                    // Always allow at least one tx in a checkpoint.
1331                    warn!(
1332                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1333                        self.max_checkpoint_size_bytes
1334                    );
1335                } else {
1336                    chunks.push(chunk);
1337                    chunk = Vec::new();
1338                    chunk_size = 0;
1339                }
1340            }
1341
1342            chunk.push((effects, signatures));
1343            chunk_size += size;
1344        }
1345
1346        if !chunk.is_empty() || chunks.is_empty() {
1347            // We intentionally create an empty checkpoint if there is no content provided
1348            // to make a 'heartbeat' checkpoint.
1349            // Important: if some conditions are added here later, we need to make sure we
1350            // always have at least one chunk if last_pending_of_epoch is set
1351            chunks.push(chunk);
1352            // Note: empty checkpoints are ok - they shouldn't happen at all on
1353            // a network with even modest load. Even if they do
1354            // happen, it is still useful as it allows fullnodes to
1355            // distinguish between "no transactions have happened" and "i am not
1356            // receiving new checkpoints".
1357        }
1358        Ok(chunks)
1359    }
1360
1361    /// Creates checkpoints using the provided transaction effects and pending
1362    /// checkpoint information.
1363    fn load_last_built_checkpoint_summary(
1364        epoch_store: &AuthorityPerEpochStore,
1365        store: &CheckpointStore,
1366    ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1367        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1368        if last_checkpoint.is_none() {
1369            let epoch = epoch_store.epoch();
1370            if epoch > 0 {
1371                let previous_epoch = epoch - 1;
1372                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1373                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1374                if let Some((ref seq, _)) = last_checkpoint {
1375                    debug!(
1376                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1377                    );
1378                } else {
1379                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1380                    // via panic
1381                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1382                }
1383            }
1384        }
1385        Ok(last_checkpoint)
1386    }
1387
1388    #[instrument(level = "debug", skip_all)]
1389    async fn create_checkpoints(
1390        &self,
1391        all_effects: Vec<TransactionEffects>,
1392        details: &PendingCheckpointInfo,
1393    ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1394        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1395        let total = all_effects.len();
1396        let mut last_checkpoint =
1397            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1398        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1399        debug!(
1400            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1401            checkpoint_timestamp = details.timestamp_ms,
1402            "Creating checkpoint(s) for {} transactions",
1403            all_effects.len(),
1404        );
1405
1406        let all_digests: Vec<_> = all_effects
1407            .iter()
1408            .map(|effect| *effect.transaction_digest())
1409            .collect();
1410        let transactions_and_sizes = self
1411            .state
1412            .get_transaction_cache_reader()
1413            .try_get_transactions_and_serialized_sizes(&all_digests)?;
1414        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1415        let mut transactions = Vec::with_capacity(all_effects.len());
1416        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1417        let mut randomness_rounds = BTreeMap::new();
1418        {
1419            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1420            debug!(
1421                ?last_checkpoint_seq,
1422                "Waiting for {:?} certificates to appear in consensus",
1423                all_effects.len()
1424            );
1425
1426            for (effects, transaction_and_size) in all_effects
1427                .into_iter()
1428                .zip(transactions_and_sizes.into_iter())
1429            {
1430                let (transaction, size) = transaction_and_size
1431                    .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1432                match transaction.inner().transaction_data().kind() {
1433                    TransactionKind::ConsensusCommitPrologueV1(_)
1434                    | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1435                        // ConsensusCommitPrologue and
1436                        // AuthenticatorStateUpdateV1
1437                        // are guaranteed to be
1438                        // processed before we reach here.
1439                    }
1440                    TransactionKind::RandomnessStateUpdate(rsu) => {
1441                        randomness_rounds
1442                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1443                    }
1444                    _ => {
1445                        // All other tx should be included in the call to
1446                        // `consensus_messages_processed_notify`.
1447                        transaction_keys.push(SequencedConsensusTransactionKey::External(
1448                            ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1449                        ));
1450                    }
1451                }
1452                transactions.push(transaction);
1453                all_effects_and_transaction_sizes.push((effects, size));
1454            }
1455
1456            self.epoch_store
1457                .consensus_messages_processed_notify(transaction_keys)
1458                .await?;
1459        }
1460
1461        let signatures = self
1462            .epoch_store
1463            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1464        debug!(
1465            ?last_checkpoint_seq,
1466            "Received {} checkpoint user signatures from consensus",
1467            signatures.len()
1468        );
1469
1470        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1471        let chunks_count = chunks.len();
1472
1473        let mut checkpoints = Vec::with_capacity(chunks_count);
1474        debug!(
1475            ?last_checkpoint_seq,
1476            "Creating {} checkpoints with {} transactions", chunks_count, total,
1477        );
1478
1479        let epoch = self.epoch_store.epoch();
1480        for (index, transactions) in chunks.into_iter().enumerate() {
1481            let first_checkpoint_of_epoch = index == 0
1482                && last_checkpoint
1483                    .as_ref()
1484                    .map(|(_, c)| c.epoch != epoch)
1485                    .unwrap_or(true);
1486            if first_checkpoint_of_epoch {
1487                self.epoch_store
1488                    .record_epoch_first_checkpoint_creation_time_metric();
1489            }
1490            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1491
1492            let sequence_number = last_checkpoint
1493                .as_ref()
1494                .map(|(_, c)| c.sequence_number + 1)
1495                .unwrap_or_default();
1496            let mut timestamp_ms = details.timestamp_ms;
1497            if let Some((_, last_checkpoint)) = &last_checkpoint {
1498                if last_checkpoint.timestamp_ms > timestamp_ms {
1499                    // The first consensus commit of an epoch can have zero timestamp.
1500                    debug!(
1501                        "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1502                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1503                    );
1504                    if self
1505                        .epoch_store
1506                        .protocol_config()
1507                        .consensus_median_timestamp_with_checkpoint_enforcement()
1508                    {
1509                        timestamp_ms = last_checkpoint.timestamp_ms;
1510                    }
1511                }
1512            }
1513
1514            if self
1515                .epoch_store
1516                .protocol_config()
1517                .calculate_validator_scores()
1518            {
1519                // We update the validator scores based on the information contained in the
1520                // Scorer. We choose this point in time to do so because we must guarantee that
1521                // scores are up to date right before the epoch changes. It also provides a good
1522                // update periodicity: updating scores each time a report is received could be
1523                // too frequent and not needed, since scores are not used during the epoch
1524                // (except for monitoring purposes, which does not need to be 100% exact)
1525                self.epoch_store.scorer.update_scores();
1526            }
1527
1528            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1529            let epoch_rolling_gas_cost_summary =
1530                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1531
1532            let end_of_epoch_data = if last_checkpoint_of_epoch {
1533                let scores: Vec<u64> = if self
1534                    .epoch_store
1535                    .protocol_config()
1536                    .pass_calculated_validator_scores_to_advance_epoch()
1537                {
1538                    self.epoch_store
1539                        .scorer
1540                        .current_scores
1541                        .iter()
1542                        .map(|x| x.load(std::sync::atomic::Ordering::Relaxed))
1543                        .collect()
1544                } else {
1545                    // Give everyone in the committee the max score
1546                    vec![MAX_SCORE; self.epoch_store.committee().num_members()]
1547                };
1548
1549                let (system_state_obj, system_epoch_info_event) = self
1550                    .augment_epoch_last_checkpoint(
1551                        &epoch_rolling_gas_cost_summary,
1552                        timestamp_ms,
1553                        &mut effects,
1554                        &mut signatures,
1555                        sequence_number,
1556                        scores,
1557                    )
1558                    .await?;
1559
1560                // The system epoch info event can be `None` in case if the `advance_epoch`
1561                // Move function call failed and was executed in the safe mode.
1562                // In this case, the tokens supply should be unchanged.
1563                //
1564                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1565                // to those small numbers, no overflows will occur during conversion or
1566                // subtraction.
1567                let epoch_supply_change =
1568                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1569
1570                let committee = system_state_obj
1571                    .get_current_epoch_committee()
1572                    .committee()
1573                    .clone();
1574
1575                // This must happen after the call to augment_epoch_last_checkpoint,
1576                // otherwise we will not capture the change_epoch tx.
1577                let root_state_digest = {
1578                    let state_acc = self
1579                        .accumulator
1580                        .upgrade()
1581                        .expect("No checkpoints should be getting built after local configuration");
1582                    let acc = state_acc.accumulate_checkpoint(
1583                        &effects,
1584                        sequence_number,
1585                        &self.epoch_store,
1586                    )?;
1587
1588                    state_acc
1589                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1590                        .await?;
1591
1592                    state_acc.accumulate_running_root(
1593                        &self.epoch_store,
1594                        sequence_number,
1595                        Some(acc),
1596                    )?;
1597                    state_acc
1598                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1599                        .await?
1600                };
1601                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1602                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1603
1604                let epoch_commitments = vec![root_state_digest.into()];
1605
1606                Some(EndOfEpochData {
1607                    next_epoch_committee: committee.voting_rights,
1608                    next_epoch_protocol_version: ProtocolVersion::new(
1609                        system_state_obj.protocol_version(),
1610                    ),
1611                    epoch_commitments,
1612                    epoch_supply_change,
1613                })
1614            } else {
1615                None
1616            };
1617            let contents = CheckpointContents::new_with_digests_and_signatures(
1618                effects.iter().map(TransactionEffects::execution_digests),
1619                signatures,
1620            );
1621
1622            let num_txns = contents.size() as u64;
1623
1624            let network_total_transactions = last_checkpoint
1625                .as_ref()
1626                .map(|(_, c)| c.network_total_transactions + num_txns)
1627                .unwrap_or(num_txns);
1628
1629            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1630
1631            let matching_randomness_rounds: Vec<_> = effects
1632                .iter()
1633                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1634                .copied()
1635                .collect();
1636
1637            let summary = CheckpointSummary::new(
1638                self.epoch_store.protocol_config(),
1639                epoch,
1640                sequence_number,
1641                network_total_transactions,
1642                &contents,
1643                previous_digest,
1644                epoch_rolling_gas_cost_summary,
1645                end_of_epoch_data,
1646                timestamp_ms,
1647                matching_randomness_rounds,
1648            );
1649            summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1650            if last_checkpoint_of_epoch {
1651                info!(
1652                    checkpoint_seq = sequence_number,
1653                    "creating last checkpoint of epoch {}", epoch
1654                );
1655                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1656                    self.epoch_store
1657                        .report_epoch_metrics_at_last_checkpoint(stats);
1658                }
1659            }
1660            last_checkpoint = Some((sequence_number, summary.clone()));
1661            checkpoints.push((summary, contents));
1662        }
1663
1664        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1665    }
1666
1667    fn get_epoch_total_gas_cost(
1668        &self,
1669        last_checkpoint: Option<&CheckpointSummary>,
1670        cur_checkpoint_effects: &[TransactionEffects],
1671    ) -> GasCostSummary {
1672        let (previous_epoch, previous_gas_costs) = last_checkpoint
1673            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1674            .unwrap_or_default();
1675        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1676        if previous_epoch == self.epoch_store.epoch() {
1677            // sum only when we are within the same epoch
1678            GasCostSummary::new(
1679                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1680                previous_gas_costs.computation_cost_burned
1681                    + current_gas_costs.computation_cost_burned,
1682                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1683                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1684                previous_gas_costs.non_refundable_storage_fee
1685                    + current_gas_costs.non_refundable_storage_fee,
1686            )
1687        } else {
1688            current_gas_costs
1689        }
1690    }
1691
1692    /// Augments the last checkpoint of the epoch by creating and executing an
1693    /// advance epoch transaction.
1694    #[instrument(level = "error", skip_all)]
1695    async fn augment_epoch_last_checkpoint(
1696        &self,
1697        epoch_total_gas_cost: &GasCostSummary,
1698        epoch_start_timestamp_ms: CheckpointTimestamp,
1699        checkpoint_effects: &mut Vec<TransactionEffects>,
1700        signatures: &mut Vec<Vec<GenericSignature>>,
1701        checkpoint: CheckpointSequenceNumber,
1702        scores: Vec<u64>,
1703    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1704        let (system_state, system_epoch_info_event, effects) = self
1705            .state
1706            .create_and_execute_advance_epoch_tx(
1707                &self.epoch_store,
1708                epoch_total_gas_cost,
1709                checkpoint,
1710                epoch_start_timestamp_ms,
1711                scores,
1712            )
1713            .await?;
1714        checkpoint_effects.push(effects);
1715        signatures.push(vec![]);
1716        Ok((system_state, system_epoch_info_event))
1717    }
1718
1719    /// For the given roots return complete list of effects to include in
1720    /// checkpoint This list includes the roots and all their dependencies,
1721    /// which are not part of checkpoint already. Note that this function
1722    /// may be called multiple times to construct the checkpoint.
1723    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1724    /// that are already included in the checkpoint. Txs in `roots` that
1725    /// need to be included in the checkpoint will be added to
1726    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1727    #[instrument(level = "debug", skip_all)]
1728    fn complete_checkpoint_effects(
1729        &self,
1730        mut roots: Vec<TransactionEffects>,
1731        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1732    ) -> IotaResult<Vec<TransactionEffects>> {
1733        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1734        let mut results = vec![];
1735        let mut seen = HashSet::new();
1736        loop {
1737            let mut pending = HashSet::new();
1738
1739            let transactions_included = self
1740                .epoch_store
1741                .builder_included_transactions_in_checkpoint(
1742                    roots.iter().map(|e| e.transaction_digest()),
1743                )?;
1744
1745            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1746                let digest = effect.transaction_digest();
1747                // Unnecessary to read effects of a dependency if the effect is already
1748                // processed.
1749                seen.insert(*digest);
1750
1751                // Skip roots that are already included in the checkpoint.
1752                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1753                    continue;
1754                }
1755
1756                // Skip roots already included in checkpoints or roots from previous epochs
1757                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1758                    continue;
1759                }
1760
1761                let existing_effects = self
1762                    .epoch_store
1763                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
1764
1765                for (dependency, effects_signature_exists) in
1766                    effect.dependencies().iter().zip(existing_effects.iter())
1767                {
1768                    // Skip here if dependency not executed in the current epoch.
1769                    // Note that the existence of an effects signature in the
1770                    // epoch store for the given digest indicates that the transaction
1771                    // was locally executed in the current epoch
1772                    if !effects_signature_exists {
1773                        continue;
1774                    }
1775                    if seen.insert(*dependency) {
1776                        pending.insert(*dependency);
1777                    }
1778                }
1779                results.push(effect);
1780            }
1781            if pending.is_empty() {
1782                break;
1783            }
1784            let pending = pending.into_iter().collect::<Vec<_>>();
1785            let effects = self
1786                .effects_store
1787                .try_multi_get_executed_effects(&pending)?;
1788            let effects = effects
1789                .into_iter()
1790                .zip(pending)
1791                .map(|(opt, digest)| match opt {
1792                    Some(x) => x,
1793                    None => panic!(
1794                        "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1795                    ),
1796                })
1797                .collect::<Vec<_>>();
1798            roots = effects;
1799        }
1800
1801        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1802        Ok(results)
1803    }
1804
1805    // This function is used to check the invariants of the consensus commit
1806    // prologue transactions in the checkpoint in simtest.
1807    #[cfg(msim)]
1808    fn expensive_consensus_commit_prologue_invariants_check(
1809        &self,
1810        root_digests: &[TransactionDigest],
1811        sorted: &[TransactionEffects],
1812    ) {
1813        // Gets all the consensus commit prologue transactions from the roots.
1814        let root_txs = self
1815            .state
1816            .get_transaction_cache_reader()
1817            .multi_get_transaction_blocks(root_digests);
1818        let ccps = root_txs
1819            .iter()
1820            .filter_map(|tx| {
1821                tx.as_ref().filter(|tx| {
1822                    matches!(
1823                        tx.transaction_data().kind(),
1824                        TransactionKind::ConsensusCommitPrologueV1(_)
1825                    )
1826                })
1827            })
1828            .collect::<Vec<_>>();
1829
1830        // There should be at most one consensus commit prologue transaction in the
1831        // roots.
1832        assert!(ccps.len() <= 1);
1833
1834        // Get all the transactions in the checkpoint.
1835        let txs = self
1836            .state
1837            .get_transaction_cache_reader()
1838            .multi_get_transaction_blocks(
1839                &sorted
1840                    .iter()
1841                    .map(|tx| *tx.transaction_digest())
1842                    .collect::<Vec<_>>(),
1843            );
1844
1845        if ccps.is_empty() {
1846            // If there is no consensus commit prologue transaction in the roots, then there
1847            // should be no consensus commit prologue transaction in the
1848            // checkpoint.
1849            for tx in txs.iter().flatten() {
1850                assert!(!matches!(
1851                    tx.transaction_data().kind(),
1852                    TransactionKind::ConsensusCommitPrologueV1(_)
1853                ));
1854            }
1855        } else {
1856            // If there is one consensus commit prologue, it must be the first one in the
1857            // checkpoint.
1858            assert!(matches!(
1859                txs[0].as_ref().unwrap().transaction_data().kind(),
1860                TransactionKind::ConsensusCommitPrologueV1(_)
1861            ));
1862
1863            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1864
1865            for tx in txs.iter().skip(1).flatten() {
1866                assert!(!matches!(
1867                    tx.transaction_data().kind(),
1868                    TransactionKind::ConsensusCommitPrologueV1(_)
1869                ));
1870            }
1871        }
1872    }
1873}
1874
1875impl CheckpointAggregator {
1876    fn new(
1877        tables: Arc<CheckpointStore>,
1878        epoch_store: Arc<AuthorityPerEpochStore>,
1879        notify: Arc<Notify>,
1880        output: Box<dyn CertifiedCheckpointOutput>,
1881        state: Arc<AuthorityState>,
1882        metrics: Arc<CheckpointMetrics>,
1883    ) -> Self {
1884        let current = None;
1885        Self {
1886            store: tables,
1887            epoch_store,
1888            notify,
1889            current,
1890            output,
1891            state,
1892            metrics,
1893        }
1894    }
1895
1896    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1897    /// aggregation of checkpoints.
1898    /// The function ensures continuous aggregation of checkpoints, handling
1899    /// errors and retries gracefully, and allowing for proper shutdown on
1900    /// receiving an exit signal.
1901    async fn run(mut self) {
1902        info!("Starting CheckpointAggregator");
1903        loop {
1904            if let Err(e) = self.run_and_notify().await {
1905                error!(
1906                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1907                    e
1908                );
1909                self.metrics.checkpoint_errors.inc();
1910                tokio::time::sleep(Duration::from_secs(1)).await;
1911                continue;
1912            }
1913
1914            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1915        }
1916    }
1917
1918    async fn run_and_notify(&mut self) -> IotaResult {
1919        let summaries = self.run_inner()?;
1920        for summary in summaries {
1921            self.output.certified_checkpoint_created(&summary).await?;
1922        }
1923        Ok(())
1924    }
1925
1926    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1927        let _scope = monitored_scope("CheckpointAggregator");
1928        let mut result = vec![];
1929        'outer: loop {
1930            let next_to_certify = self.next_checkpoint_to_certify()?;
1931            let current = if let Some(current) = &mut self.current {
1932                // It's possible that the checkpoint was already certified by
1933                // the rest of the network and we've already received the
1934                // certified checkpoint via StateSync. In this case, we reset
1935                // the current signature aggregator to the next checkpoint to
1936                // be certified
1937                if current.summary.sequence_number < next_to_certify {
1938                    self.current = None;
1939                    continue;
1940                }
1941                current
1942            } else {
1943                let Some(summary) = self
1944                    .epoch_store
1945                    .get_built_checkpoint_summary(next_to_certify)?
1946                else {
1947                    return Ok(result);
1948                };
1949                self.current = Some(CheckpointSignatureAggregator {
1950                    next_index: 0,
1951                    digest: summary.digest(),
1952                    summary,
1953                    signatures_by_digest: MultiStakeAggregator::new(
1954                        self.epoch_store.committee().clone(),
1955                    ),
1956                    store: self.store.clone(),
1957                    state: self.state.clone(),
1958                    metrics: self.metrics.clone(),
1959                });
1960                self.current.as_mut().unwrap()
1961            };
1962
1963            let epoch_tables = self
1964                .epoch_store
1965                .tables()
1966                .expect("should not run past end of epoch");
1967            let iter = epoch_tables
1968                .pending_checkpoint_signatures
1969                .safe_iter_with_bounds(
1970                    Some((current.summary.sequence_number, current.next_index)),
1971                    None,
1972                );
1973            for item in iter {
1974                let ((seq, index), data) = item?;
1975                if seq != current.summary.sequence_number {
1976                    trace!(
1977                        checkpoint_seq =? current.summary.sequence_number,
1978                        "Not enough checkpoint signatures",
1979                    );
1980                    // No more signatures (yet) for this checkpoint
1981                    return Ok(result);
1982                }
1983                trace!(
1984                    checkpoint_seq = current.summary.sequence_number,
1985                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
1986                    current.summary.digest(),
1987                    data.summary.auth_sig().authority.concise()
1988                );
1989                self.metrics
1990                    .checkpoint_participation
1991                    .with_label_values(&[&format!(
1992                        "{:?}",
1993                        data.summary.auth_sig().authority.concise()
1994                    )])
1995                    .inc();
1996                if let Ok(auth_signature) = current.try_aggregate(data) {
1997                    debug!(
1998                        checkpoint_seq = current.summary.sequence_number,
1999                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2000                        current.summary.digest(),
2001                    );
2002                    let summary = VerifiedCheckpoint::new_unchecked(
2003                        CertifiedCheckpointSummary::new_from_data_and_sig(
2004                            current.summary.clone(),
2005                            auth_signature,
2006                        ),
2007                    );
2008
2009                    self.store.insert_certified_checkpoint(&summary)?;
2010                    self.metrics
2011                        .last_certified_checkpoint
2012                        .set(current.summary.sequence_number as i64);
2013                    current
2014                        .summary
2015                        .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2016                    result.push(summary.into_inner());
2017                    self.current = None;
2018                    continue 'outer;
2019                } else {
2020                    current.next_index = index + 1;
2021                }
2022            }
2023            break;
2024        }
2025        Ok(result)
2026    }
2027
2028    fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2029        Ok(self
2030            .store
2031            .tables
2032            .certified_checkpoints
2033            .reversed_safe_iter_with_bounds(None, None)?
2034            .next()
2035            .transpose()?
2036            .map(|(seq, _)| seq + 1)
2037            .unwrap_or_default())
2038    }
2039}
2040
2041impl CheckpointSignatureAggregator {
2042    #[expect(clippy::result_unit_err)]
2043    pub fn try_aggregate(
2044        &mut self,
2045        data: CheckpointSignatureMessage,
2046    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2047        let their_digest = *data.summary.digest();
2048        let (_, signature) = data.summary.into_data_and_sig();
2049        let author = signature.authority;
2050        let envelope =
2051            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2052        match self.signatures_by_digest.insert(their_digest, envelope) {
2053            // ignore repeated signatures
2054            InsertResult::Failed {
2055                error:
2056                    IotaError::StakeAggregatorRepeatedSigner {
2057                        conflicting_sig: false,
2058                        ..
2059                    },
2060            } => Err(()),
2061            InsertResult::Failed { error } => {
2062                warn!(
2063                    checkpoint_seq = self.summary.sequence_number,
2064                    "Failed to aggregate new signature from validator {:?}: {:?}",
2065                    author.concise(),
2066                    error
2067                );
2068                self.check_for_split_brain();
2069                Err(())
2070            }
2071            InsertResult::QuorumReached(cert) => {
2072                // It is not guaranteed that signature.authority == consensus_cert.author, but
2073                // we do verify the signature so we know that the author signed
2074                // the message at some point.
2075                if their_digest != self.digest {
2076                    self.metrics.remote_checkpoint_forks.inc();
2077                    warn!(
2078                        checkpoint_seq = self.summary.sequence_number,
2079                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2080                        author.concise(),
2081                        their_digest,
2082                        self.digest
2083                    );
2084                    return Err(());
2085                }
2086                Ok(cert)
2087            }
2088            InsertResult::NotEnoughVotes {
2089                bad_votes: _,
2090                bad_authorities: _,
2091            } => {
2092                self.check_for_split_brain();
2093                Err(())
2094            }
2095        }
2096    }
2097
2098    /// Check if there is a split brain condition in checkpoint signature
2099    /// aggregation, defined as any state wherein it is no longer possible
2100    /// to achieve quorum on a checkpoint proposal, irrespective of the
2101    /// outcome of any outstanding votes.
2102    fn check_for_split_brain(&self) {
2103        debug!(
2104            checkpoint_seq = self.summary.sequence_number,
2105            "Checking for split brain condition"
2106        );
2107        if self.signatures_by_digest.quorum_unreachable() {
2108            // TODO: at this point we should immediately halt processing
2109            // of new transaction certificates to avoid building on top of
2110            // forked output
2111            // self.halt_all_execution();
2112
2113            let digests_by_stake_messages = self
2114                .signatures_by_digest
2115                .get_all_unique_values()
2116                .into_iter()
2117                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2118                .map(|(digest, (_authorities, total_stake))| {
2119                    format!("{digest:?} (total stake: {total_stake})")
2120                })
2121                .collect::<Vec<String>>();
2122            error!(
2123                checkpoint_seq = self.summary.sequence_number,
2124                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2125                self.signatures_by_digest.uncommitted_stake(),
2126                digests_by_stake_messages,
2127            );
2128            self.metrics.split_brain_checkpoint_forks.inc();
2129
2130            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2131            let local_summary = self.summary.clone();
2132            let state = self.state.clone();
2133            let tables = self.store.clone();
2134
2135            tokio::spawn(async move {
2136                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2137            });
2138        }
2139    }
2140}
2141
2142/// Create data dump containing relevant data for diagnosing cause of the
2143/// split brain by querying one disagreeing validator for full checkpoint
2144/// contents. To minimize peer chatter, we only query one validator at random
2145/// from each disagreeing faction, as all honest validators that participated in
2146/// this round may inevitably run the same process.
2147async fn diagnose_split_brain(
2148    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2149    local_summary: CheckpointSummary,
2150    state: Arc<AuthorityState>,
2151    tables: Arc<CheckpointStore>,
2152) {
2153    debug!(
2154        checkpoint_seq = local_summary.sequence_number,
2155        "Running split brain diagnostics..."
2156    );
2157    let time = SystemTime::now();
2158    // collect one random disagreeing validator per differing digest
2159    let digest_to_validator = all_unique_values
2160        .iter()
2161        .filter_map(|(digest, (validators, _))| {
2162            if *digest != local_summary.digest() {
2163                let random_validator = validators.choose(&mut OsRng).unwrap();
2164                Some((*digest, *random_validator))
2165            } else {
2166                None
2167            }
2168        })
2169        .collect::<HashMap<_, _>>();
2170    if digest_to_validator.is_empty() {
2171        panic!(
2172            "Given split brain condition, there should be at \
2173                least one validator that disagrees with local signature"
2174        );
2175    }
2176
2177    let epoch_store = state.load_epoch_store_one_call_per_task();
2178    let committee = epoch_store
2179        .epoch_start_state()
2180        .get_iota_committee_with_network_metadata();
2181    let network_config = default_iota_network_config();
2182    let network_clients =
2183        make_network_authority_clients_with_network_config(&committee, &network_config);
2184
2185    // Query all disagreeing validators
2186    let response_futures = digest_to_validator
2187        .values()
2188        .cloned()
2189        .map(|validator| {
2190            let client = network_clients
2191                .get(&validator)
2192                .expect("Failed to get network client");
2193            let request = CheckpointRequest {
2194                sequence_number: Some(local_summary.sequence_number),
2195                request_content: true,
2196                certified: false,
2197            };
2198            client.handle_checkpoint(request)
2199        })
2200        .collect::<Vec<_>>();
2201
2202    let digest_name_pair = digest_to_validator.iter();
2203    let response_data = futures::future::join_all(response_futures)
2204        .await
2205        .into_iter()
2206        .zip(digest_name_pair)
2207        .filter_map(|(response, (digest, name))| match response {
2208            Ok(response) => match response {
2209                CheckpointResponse {
2210                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2211                    contents: Some(contents),
2212                } => Some((*name, *digest, summary, contents)),
2213                CheckpointResponse {
2214                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2215                    contents: _,
2216                } => {
2217                    panic!("Expected pending checkpoint, but got certified checkpoint");
2218                }
2219                CheckpointResponse {
2220                    checkpoint: None,
2221                    contents: _,
2222                } => {
2223                    error!(
2224                        "Summary for checkpoint {:?} not found on validator {:?}",
2225                        local_summary.sequence_number, name
2226                    );
2227                    None
2228                }
2229                CheckpointResponse {
2230                    checkpoint: _,
2231                    contents: None,
2232                } => {
2233                    error!(
2234                        "Contents for checkpoint {:?} not found on validator {:?}",
2235                        local_summary.sequence_number, name
2236                    );
2237                    None
2238                }
2239            },
2240            Err(e) => {
2241                error!(
2242                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2243                    e
2244                );
2245                None
2246            }
2247        })
2248        .collect::<Vec<_>>();
2249
2250    let local_checkpoint_contents = tables
2251        .get_checkpoint_contents(&local_summary.content_digest)
2252        .unwrap_or_else(|_| {
2253            panic!(
2254                "Could not find checkpoint contents for digest {:?}",
2255                local_summary.digest()
2256            )
2257        })
2258        .unwrap_or_else(|| {
2259            panic!(
2260                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2261                local_summary.sequence_number,
2262                local_summary.digest()
2263            )
2264        });
2265    let local_contents_text = format!("{local_checkpoint_contents:?}");
2266
2267    let local_summary_text = format!("{local_summary:?}");
2268    let local_validator = state.name.concise();
2269    let diff_patches = response_data
2270        .iter()
2271        .map(|(name, other_digest, other_summary, contents)| {
2272            let other_contents_text = format!("{contents:?}");
2273            let other_summary_text = format!("{other_summary:?}");
2274            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2275                .enumerate_transactions(&local_summary)
2276                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2277                .unzip();
2278            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2279                .enumerate_transactions(other_summary)
2280                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2281                .unzip();
2282            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2283            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2284            let local_transactions_text = format!("{local_transactions:#?}");
2285            let other_transactions_text = format!("{other_transactions:#?}");
2286            let transactions_patch =
2287                create_patch(&local_transactions_text, &other_transactions_text);
2288            let local_effects_text = format!("{local_effects:#?}");
2289            let other_effects_text = format!("{other_effects:#?}");
2290            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2291            let seq_number = local_summary.sequence_number;
2292            let local_digest = local_summary.digest();
2293            let other_validator = name.concise();
2294            format!(
2295                "Checkpoint: {seq_number:?}\n\
2296                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2297                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2298                Summary Diff: \n{summary_patch}\n\n\
2299                Contents Diff: \n{contents_patch}\n\n\
2300                Transactions Diff: \n{transactions_patch}\n\n\
2301                Effects Diff: \n{effects_patch}",
2302            )
2303        })
2304        .collect::<Vec<_>>()
2305        .join("\n\n\n");
2306
2307    let header = format!(
2308        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2309        Datetime: {time:?}"
2310    );
2311    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2312    let path = tempfile::tempdir()
2313        .expect("Failed to create tempdir")
2314        .keep()
2315        .join(Path::new("checkpoint_fork_dump.txt"));
2316    let mut file = File::create(path).unwrap();
2317    write!(file, "{fork_logs_text}").unwrap();
2318    debug!("{}", fork_logs_text);
2319
2320    fail_point!("split_brain_reached");
2321}
2322
2323pub trait CheckpointServiceNotify {
2324    fn notify_checkpoint_signature(
2325        &self,
2326        epoch_store: &AuthorityPerEpochStore,
2327        info: &CheckpointSignatureMessage,
2328    ) -> IotaResult;
2329
2330    fn notify_checkpoint(&self) -> IotaResult;
2331}
2332
2333enum CheckpointServiceState {
2334    Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2335    Started,
2336}
2337
2338impl CheckpointServiceState {
2339    fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2340        let mut state = CheckpointServiceState::Started;
2341        std::mem::swap(self, &mut state);
2342
2343        match state {
2344            CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2345            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2346        }
2347    }
2348}
2349
2350pub struct CheckpointService {
2351    tables: Arc<CheckpointStore>,
2352    notify_builder: Arc<Notify>,
2353    notify_aggregator: Arc<Notify>,
2354    last_signature_index: Mutex<u64>,
2355    // A notification for the current highest built sequence number.
2356    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2357    // The highest sequence number that had already been built at the time CheckpointService
2358    // was constructed
2359    highest_previously_built_seq: CheckpointSequenceNumber,
2360    metrics: Arc<CheckpointMetrics>,
2361    state: Mutex<CheckpointServiceState>,
2362}
2363
2364impl CheckpointService {
2365    /// Spawns the checkpoint service, initializing and starting the checkpoint
2366    /// builder and aggregator tasks.
2367    /// Constructs a new CheckpointService in an un-started state.
2368    pub fn build(
2369        state: Arc<AuthorityState>,
2370        checkpoint_store: Arc<CheckpointStore>,
2371        epoch_store: Arc<AuthorityPerEpochStore>,
2372        effects_store: Arc<dyn TransactionCacheRead>,
2373        accumulator: Weak<StateAccumulator>,
2374        checkpoint_output: Box<dyn CheckpointOutput>,
2375        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2376        metrics: Arc<CheckpointMetrics>,
2377        max_transactions_per_checkpoint: usize,
2378        max_checkpoint_size_bytes: usize,
2379    ) -> Arc<Self> {
2380        info!(
2381            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2382        );
2383        let notify_builder = Arc::new(Notify::new());
2384        let notify_aggregator = Arc::new(Notify::new());
2385
2386        // We may have built higher checkpoint numbers before restarting.
2387        let highest_previously_built_seq = checkpoint_store
2388            .get_latest_locally_computed_checkpoint()
2389            .expect("failed to get latest locally computed checkpoint")
2390            .map(|s| s.sequence_number)
2391            .unwrap_or(0);
2392
2393        let highest_currently_built_seq =
2394            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2395                .expect("epoch should not have ended")
2396                .map(|(seq, _)| seq)
2397                .unwrap_or(0);
2398
2399        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2400
2401        let aggregator = CheckpointAggregator::new(
2402            checkpoint_store.clone(),
2403            epoch_store.clone(),
2404            notify_aggregator.clone(),
2405            certified_checkpoint_output,
2406            state.clone(),
2407            metrics.clone(),
2408        );
2409
2410        let builder = CheckpointBuilder::new(
2411            state.clone(),
2412            checkpoint_store.clone(),
2413            epoch_store.clone(),
2414            notify_builder.clone(),
2415            effects_store,
2416            accumulator,
2417            checkpoint_output,
2418            notify_aggregator.clone(),
2419            highest_currently_built_seq_tx.clone(),
2420            metrics.clone(),
2421            max_transactions_per_checkpoint,
2422            max_checkpoint_size_bytes,
2423        );
2424
2425        let last_signature_index = epoch_store
2426            .get_last_checkpoint_signature_index()
2427            .expect("should not cross end of epoch");
2428        let last_signature_index = Mutex::new(last_signature_index);
2429
2430        Arc::new(Self {
2431            tables: checkpoint_store,
2432            notify_builder,
2433            notify_aggregator,
2434            last_signature_index,
2435            highest_currently_built_seq_tx,
2436            highest_previously_built_seq,
2437            metrics,
2438            state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2439                builder, aggregator,
2440            )))),
2441        })
2442    }
2443
2444    /// Starts the CheckpointService.
2445    ///
2446    /// This function blocks until the CheckpointBuilder re-builds all
2447    /// checkpoints that had been built before the most recent restart. You
2448    /// can think of this as a WAL replay operation. Upon startup, we may
2449    /// have a number of consensus commits and resulting checkpoints that
2450    /// were built but not committed to disk. We want to reprocess the
2451    /// commits and rebuild the checkpoints before starting normal operation.
2452    pub async fn spawn(&self) -> JoinSet<()> {
2453        let mut tasks = JoinSet::new();
2454
2455        let (builder, aggregator) = self.state.lock().take_unstarted();
2456        tasks.spawn(monitored_future!(builder.run()));
2457        tasks.spawn(monitored_future!(aggregator.run()));
2458
2459        // If this times out, the validator may still start up. The worst that can
2460        // happen is that we will crash later on instead of immediately. The eventual
2461        // crash would occur because we may be missing transactions that are below the
2462        // highest_synced_checkpoint watermark, which can cause a crash in
2463        // `CheckpointExecutor::extract_randomness_rounds`.
2464        if tokio::time::timeout(
2465            Duration::from_secs(120),
2466            self.wait_for_rebuilt_checkpoints(),
2467        )
2468        .await
2469        .is_err()
2470        {
2471            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2472        }
2473
2474        tasks
2475    }
2476}
2477
2478impl CheckpointService {
2479    /// Waits until all checkpoints had been built before the node restarted
2480    /// are rebuilt. This is required to preserve the invariant that all
2481    /// checkpoints (and their transactions) below the
2482    /// highest_synced_checkpoint watermark are available. Once the
2483    /// checkpoints are constructed, we can be sure that the transactions
2484    /// have also been executed.
2485    pub async fn wait_for_rebuilt_checkpoints(&self) {
2486        let highest_previously_built_seq = self.highest_previously_built_seq;
2487        let mut rx = self.highest_currently_built_seq_tx.subscribe();
2488        let mut highest_currently_built_seq = *rx.borrow_and_update();
2489        info!(
2490            "Waiting for checkpoints to be rebuilt, previously built seq: \
2491            {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2492        );
2493        loop {
2494            if highest_currently_built_seq >= highest_previously_built_seq {
2495                info!("Checkpoint rebuild complete");
2496                break;
2497            }
2498            rx.changed().await.unwrap();
2499            highest_currently_built_seq = *rx.borrow_and_update();
2500        }
2501    }
2502
2503    #[cfg(test)]
2504    fn write_and_notify_checkpoint_for_testing(
2505        &self,
2506        epoch_store: &AuthorityPerEpochStore,
2507        checkpoint: PendingCheckpoint,
2508    ) -> IotaResult {
2509        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2510
2511        let mut output = ConsensusCommitOutput::new(0);
2512        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2513        output.set_default_commit_stats_for_testing();
2514        epoch_store.push_consensus_output_for_tests(output);
2515        self.notify_checkpoint()?;
2516        Ok(())
2517    }
2518}
2519
2520impl CheckpointServiceNotify for CheckpointService {
2521    fn notify_checkpoint_signature(
2522        &self,
2523        epoch_store: &AuthorityPerEpochStore,
2524        info: &CheckpointSignatureMessage,
2525    ) -> IotaResult {
2526        let sequence = info.summary.sequence_number;
2527        let signer = info.summary.auth_sig().authority.concise();
2528
2529        if let Some(highest_verified_checkpoint) = self
2530            .tables
2531            .get_highest_verified_checkpoint()?
2532            .map(|x| *x.sequence_number())
2533        {
2534            if sequence <= highest_verified_checkpoint {
2535                trace!(
2536                    checkpoint_seq = sequence,
2537                    "Ignore checkpoint signature from {} - already certified", signer,
2538                );
2539                self.metrics
2540                    .last_ignored_checkpoint_signature_received
2541                    .set(sequence as i64);
2542                return Ok(());
2543            }
2544        }
2545        trace!(
2546            checkpoint_seq = sequence,
2547            "Received checkpoint signature, digest {} from {}",
2548            info.summary.digest(),
2549            signer,
2550        );
2551        self.metrics
2552            .last_received_checkpoint_signatures
2553            .with_label_values(&[&signer.to_string()])
2554            .set(sequence as i64);
2555        // While it can be tempting to make last_signature_index into AtomicU64, this
2556        // won't work We need to make sure we write to `pending_signatures` and
2557        // trigger `notify_aggregator` without race conditions
2558        let mut index = self.last_signature_index.lock();
2559        *index += 1;
2560        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2561        self.notify_aggregator.notify_one();
2562        Ok(())
2563    }
2564
2565    fn notify_checkpoint(&self) -> IotaResult {
2566        self.notify_builder.notify_one();
2567        Ok(())
2568    }
2569}
2570
2571// test helper
2572pub struct CheckpointServiceNoop {}
2573impl CheckpointServiceNotify for CheckpointServiceNoop {
2574    fn notify_checkpoint_signature(
2575        &self,
2576        _: &AuthorityPerEpochStore,
2577        _: &CheckpointSignatureMessage,
2578    ) -> IotaResult {
2579        Ok(())
2580    }
2581
2582    fn notify_checkpoint(&self) -> IotaResult {
2583        Ok(())
2584    }
2585}
2586
2587#[cfg(test)]
2588mod tests {
2589    use std::{
2590        collections::{BTreeMap, HashMap},
2591        ops::Deref,
2592    };
2593
2594    use futures::{FutureExt as _, future::BoxFuture};
2595    use iota_macros::sim_test;
2596    use iota_protocol_config::{Chain, ProtocolConfig};
2597    use iota_types::{
2598        base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2599        crypto::Signature,
2600        digests::TransactionEventsDigest,
2601        effects::{TransactionEffects, TransactionEvents},
2602        messages_checkpoint::SignedCheckpointSummary,
2603        move_package::MovePackage,
2604        object,
2605        transaction::{GenesisObject, VerifiedTransaction},
2606    };
2607    use tokio::sync::mpsc;
2608
2609    use super::*;
2610    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2611
2612    #[sim_test]
2613    pub async fn checkpoint_builder_test() {
2614        telemetry_subscribers::init_for_testing();
2615
2616        let mut protocol_config =
2617            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2618        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2619        let state = TestAuthorityBuilder::new()
2620            .with_protocol_config(protocol_config)
2621            .build()
2622            .await;
2623
2624        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2625        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2626            vec![GenesisObject::RawObject {
2627                data: object::Data::Package(
2628                    MovePackage::new(
2629                        ObjectID::random(),
2630                        SequenceNumber::new(),
2631                        BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2632                        100_000,
2633                        // no modules so empty type_origin_table as no types are defined in this
2634                        // package
2635                        Vec::new(),
2636                        // no modules so empty linkage_table as no dependencies of this package
2637                        // exist
2638                        BTreeMap::new(),
2639                    )
2640                    .unwrap(),
2641                ),
2642                owner: object::Owner::Immutable,
2643            }],
2644            vec![],
2645        );
2646        for i in 0..15 {
2647            state
2648                .database_for_testing()
2649                .perpetual_tables
2650                .transactions
2651                .insert(&d(i), dummy_tx.serializable_ref())
2652                .unwrap();
2653        }
2654        for i in 15..20 {
2655            state
2656                .database_for_testing()
2657                .perpetual_tables
2658                .transactions
2659                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2660                .unwrap();
2661        }
2662
2663        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2664        commit_cert_for_test(
2665            &mut store,
2666            state.clone(),
2667            d(1),
2668            vec![d(2), d(3)],
2669            GasCostSummary::new(11, 11, 12, 11, 1),
2670        );
2671        commit_cert_for_test(
2672            &mut store,
2673            state.clone(),
2674            d(2),
2675            vec![d(3), d(4)],
2676            GasCostSummary::new(21, 21, 22, 21, 1),
2677        );
2678        commit_cert_for_test(
2679            &mut store,
2680            state.clone(),
2681            d(3),
2682            vec![],
2683            GasCostSummary::new(31, 31, 32, 31, 1),
2684        );
2685        commit_cert_for_test(
2686            &mut store,
2687            state.clone(),
2688            d(4),
2689            vec![],
2690            GasCostSummary::new(41, 41, 42, 41, 1),
2691        );
2692        for i in [5, 6, 7, 10, 11, 12, 13] {
2693            commit_cert_for_test(
2694                &mut store,
2695                state.clone(),
2696                d(i),
2697                vec![],
2698                GasCostSummary::new(41, 41, 42, 41, 1),
2699            );
2700        }
2701        for i in [15, 16, 17] {
2702            commit_cert_for_test(
2703                &mut store,
2704                state.clone(),
2705                d(i),
2706                vec![],
2707                GasCostSummary::new(51, 51, 52, 51, 1),
2708            );
2709        }
2710        let all_digests: Vec<_> = store.keys().copied().collect();
2711        for digest in all_digests {
2712            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2713            state
2714                .epoch_store_for_testing()
2715                .test_insert_user_signature(digest, vec![signature]);
2716        }
2717
2718        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2719        let (certified_output, mut certified_result) =
2720            mpsc::channel::<CertifiedCheckpointSummary>(10);
2721        let store = Arc::new(store);
2722
2723        let ckpt_dir = tempfile::tempdir().unwrap();
2724        let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2725        let epoch_store = state.epoch_store_for_testing();
2726
2727        let accumulator = Arc::new(StateAccumulator::new_for_tests(
2728            state.get_accumulator_store().clone(),
2729        ));
2730
2731        let checkpoint_service = CheckpointService::build(
2732            state.clone(),
2733            checkpoint_store,
2734            epoch_store.clone(),
2735            store,
2736            Arc::downgrade(&accumulator),
2737            Box::new(output),
2738            Box::new(certified_output),
2739            CheckpointMetrics::new_for_tests(),
2740            3,
2741            100_000,
2742        );
2743        let _tasks = checkpoint_service.spawn().await;
2744
2745        checkpoint_service
2746            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2747            .unwrap();
2748        checkpoint_service
2749            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2750            .unwrap();
2751        checkpoint_service
2752            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2753            .unwrap();
2754        checkpoint_service
2755            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2756            .unwrap();
2757        checkpoint_service
2758            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2759            .unwrap();
2760        checkpoint_service
2761            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2762            .unwrap();
2763
2764        let (c1c, c1s) = result.recv().await.unwrap();
2765        let (c2c, c2s) = result.recv().await.unwrap();
2766
2767        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2768        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2769        assert_eq!(c1t, vec![d(4)]);
2770        assert_eq!(c1s.previous_digest, None);
2771        assert_eq!(c1s.sequence_number, 0);
2772        assert_eq!(
2773            c1s.epoch_rolling_gas_cost_summary,
2774            GasCostSummary::new(41, 41, 42, 41, 1)
2775        );
2776
2777        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2778        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2779        assert_eq!(c2s.sequence_number, 1);
2780        assert_eq!(
2781            c2s.epoch_rolling_gas_cost_summary,
2782            GasCostSummary::new(104, 104, 108, 104, 4)
2783        );
2784
2785        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2786        // Verify that we split into 2 checkpoints.
2787        let (c3c, c3s) = result.recv().await.unwrap();
2788        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2789        let (c4c, c4s) = result.recv().await.unwrap();
2790        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2791        assert_eq!(c3s.sequence_number, 2);
2792        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2793        assert_eq!(c4s.sequence_number, 3);
2794        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2795        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2796        assert_eq!(c4t, vec![d(13)]);
2797
2798        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2799        // max. Verify that we split into 2 checkpoints.
2800        let (c5c, c5s) = result.recv().await.unwrap();
2801        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2802        let (c6c, c6s) = result.recv().await.unwrap();
2803        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2804        assert_eq!(c5s.sequence_number, 4);
2805        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2806        assert_eq!(c6s.sequence_number, 5);
2807        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2808        assert_eq!(c5t, vec![d(15), d(16)]);
2809        assert_eq!(c6t, vec![d(17)]);
2810
2811        // Pending at index 4 was too soon after the prior one and should be coalesced
2812        // into the next one.
2813        let (c7c, c7s) = result.recv().await.unwrap();
2814        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2815        assert_eq!(c7t, vec![d(5), d(6)]);
2816        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2817        assert_eq!(c7s.sequence_number, 6);
2818
2819        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2820        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2821
2822        checkpoint_service
2823            .notify_checkpoint_signature(
2824                &epoch_store,
2825                &CheckpointSignatureMessage { summary: c2ss },
2826            )
2827            .unwrap();
2828        checkpoint_service
2829            .notify_checkpoint_signature(
2830                &epoch_store,
2831                &CheckpointSignatureMessage { summary: c1ss },
2832            )
2833            .unwrap();
2834
2835        let c1sc = certified_result.recv().await.unwrap();
2836        let c2sc = certified_result.recv().await.unwrap();
2837        assert_eq!(c1sc.sequence_number, 0);
2838        assert_eq!(c2sc.sequence_number, 1);
2839    }
2840
2841    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2842        fn try_notify_read_executed_effects(
2843            &self,
2844            digests: &[TransactionDigest],
2845        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2846            std::future::ready(Ok(digests
2847                .iter()
2848                .map(|d| self.get(d).expect("effects not found").clone())
2849                .collect()))
2850            .boxed()
2851        }
2852
2853        fn try_notify_read_executed_effects_digests(
2854            &self,
2855            digests: &[TransactionDigest],
2856        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2857            std::future::ready(Ok(digests
2858                .iter()
2859                .map(|d| {
2860                    self.get(d)
2861                        .map(|fx| fx.digest())
2862                        .expect("effects not found")
2863                })
2864                .collect()))
2865            .boxed()
2866        }
2867
2868        fn try_multi_get_executed_effects(
2869            &self,
2870            digests: &[TransactionDigest],
2871        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2872            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2873        }
2874
2875        // Unimplemented methods - its unfortunate to have this big blob of useless
2876        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2877        // for these tests, as it caused a ton of complication in non-test code.
2878        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2879        // implementors).
2880
2881        fn try_multi_get_transaction_blocks(
2882            &self,
2883            _: &[TransactionDigest],
2884        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2885            unimplemented!()
2886        }
2887
2888        fn try_multi_get_executed_effects_digests(
2889            &self,
2890            _: &[TransactionDigest],
2891        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2892            unimplemented!()
2893        }
2894
2895        fn try_multi_get_effects(
2896            &self,
2897            _: &[TransactionEffectsDigest],
2898        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2899            unimplemented!()
2900        }
2901
2902        fn try_multi_get_events(
2903            &self,
2904            _: &[TransactionEventsDigest],
2905        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2906            unimplemented!()
2907        }
2908    }
2909
2910    #[async_trait::async_trait]
2911    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2912        async fn checkpoint_created(
2913            &self,
2914            summary: &CheckpointSummary,
2915            contents: &CheckpointContents,
2916            _epoch_store: &Arc<AuthorityPerEpochStore>,
2917            _checkpoint_store: &Arc<CheckpointStore>,
2918        ) -> IotaResult {
2919            self.try_send((contents.clone(), summary.clone())).unwrap();
2920            Ok(())
2921        }
2922    }
2923
2924    #[async_trait::async_trait]
2925    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2926        async fn certified_checkpoint_created(
2927            &self,
2928            summary: &CertifiedCheckpointSummary,
2929        ) -> IotaResult {
2930            self.try_send(summary.clone()).unwrap();
2931            Ok(())
2932        }
2933    }
2934
2935    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2936        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2937            roots: t
2938                .into_iter()
2939                .map(|t| TransactionKey::Digest(d(t)))
2940                .collect(),
2941            details: PendingCheckpointInfo {
2942                timestamp_ms,
2943                last_of_epoch: false,
2944                checkpoint_height: i,
2945            },
2946        })
2947    }
2948
2949    fn d(i: u8) -> TransactionDigest {
2950        let mut bytes: [u8; 32] = Default::default();
2951        bytes[0] = i;
2952        TransactionDigest::new(bytes)
2953    }
2954
2955    fn e(
2956        transaction_digest: TransactionDigest,
2957        dependencies: Vec<TransactionDigest>,
2958        gas_used: GasCostSummary,
2959    ) -> TransactionEffects {
2960        let mut effects = TransactionEffects::default();
2961        *effects.transaction_digest_mut_for_testing() = transaction_digest;
2962        *effects.dependencies_mut_for_testing() = dependencies;
2963        *effects.gas_cost_summary_mut_for_testing() = gas_used;
2964        effects
2965    }
2966
2967    fn commit_cert_for_test(
2968        store: &mut HashMap<TransactionDigest, TransactionEffects>,
2969        state: Arc<AuthorityState>,
2970        digest: TransactionDigest,
2971        dependencies: Vec<TransactionDigest>,
2972        gas_used: GasCostSummary,
2973    ) {
2974        let epoch_store = state.epoch_store_for_testing();
2975        let effects = e(digest, dependencies, gas_used);
2976        store.insert(digest, effects.clone());
2977        epoch_store
2978            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2979            .expect("Inserting cert fx and sigs should not fail");
2980    }
2981}