Skip to main content

iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, random::get_rng, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_sdk_types::GasCostSummary;
26use iota_types::{
27    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
28    committee::StakeUnit,
29    crypto::AuthorityStrongQuorumSignInfo,
30    digests::{CheckpointContentsDigest, CheckpointDigest},
31    effects::{TransactionEffects, TransactionEffectsAPI, TransactionEffectsExt},
32    error::{IotaError, IotaResult},
33    event::SystemEpochInfoEvent,
34    iota_system_state::{
35        IotaSystemState, IotaSystemStateTrait,
36        epoch_start_iota_system_state::EpochStartSystemStateTrait,
37    },
38    message_envelope::Message,
39    messages_checkpoint::{
40        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
41        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
42        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
43        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
44        VerifiedCheckpointContents,
45    },
46    messages_consensus::ConsensusTransactionKey,
47    signature::GenericSignature,
48    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
49};
50use itertools::Itertools;
51use nonempty::NonEmpty;
52use parking_lot::Mutex;
53use rand::seq::SliceRandom;
54use serde::{Deserialize, Serialize};
55use tokio::{
56    sync::{Notify, mpsc, watch},
57    task::JoinSet,
58    time::timeout,
59};
60use tracing::{debug, error, info, instrument, trace, warn};
61use typed_store::{
62    DBMapUtils, Map, TypedStoreError,
63    rocks::{DBMap, MetricConf},
64};
65
66pub use crate::checkpoints::{
67    checkpoint_output::{
68        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
69    },
70    metrics::CheckpointMetrics,
71};
72use crate::{
73    authority::{
74        AuthorityState,
75        authority_per_epoch_store::{AuthorityPerEpochStore, scorer::MAX_SCORE},
76    },
77    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
78    checkpoints::{
79        causal_order::CausalOrder,
80        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
81    },
82    consensus_handler::SequencedConsensusTransactionKey,
83    execution_cache::TransactionCacheRead,
84    global_state_hasher::GlobalStateHasher,
85    stake_aggregator::{InsertResult, MultiStakeAggregator},
86};
87
88pub type CheckpointHeight = u64;
89
90pub struct EpochStats {
91    pub checkpoint_count: u64,
92    pub transaction_count: u64,
93    pub total_gas_reward: u64,
94}
95
96#[derive(Clone, Debug, Serialize, Deserialize)]
97pub struct PendingCheckpointInfo {
98    pub timestamp_ms: CheckpointTimestamp,
99    pub last_of_epoch: bool,
100    pub checkpoint_height: CheckpointHeight,
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub enum PendingCheckpoint {
105    // This is an enum for future upgradability, though at the moment there is only one variant.
106    V1(PendingCheckpointContentsV1),
107}
108
109#[derive(Clone, Debug, Serialize, Deserialize)]
110pub struct PendingCheckpointContentsV1 {
111    pub roots: Vec<TransactionKey>,
112    pub details: PendingCheckpointInfo,
113}
114
115impl PendingCheckpoint {
116    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
117        match self {
118            PendingCheckpoint::V1(contents) => contents,
119        }
120    }
121
122    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
123        match self {
124            PendingCheckpoint::V1(contents) => contents,
125        }
126    }
127
128    pub fn roots(&self) -> &Vec<TransactionKey> {
129        &self.as_v1().roots
130    }
131
132    pub fn details(&self) -> &PendingCheckpointInfo {
133        &self.as_v1().details
134    }
135
136    pub fn height(&self) -> CheckpointHeight {
137        self.details().checkpoint_height
138    }
139}
140
141#[derive(Clone, Debug, Serialize, Deserialize)]
142pub struct BuilderCheckpointSummary {
143    pub summary: CheckpointSummary,
144    // Height at which this checkpoint summary was built. None for genesis checkpoint
145    pub checkpoint_height: Option<CheckpointHeight>,
146    pub position_in_commit: usize,
147}
148
149#[derive(DBMapUtils)]
150pub struct CheckpointStoreTables {
151    /// Maps checkpoint contents digest to checkpoint contents
152    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
153
154    /// Maps checkpoint contents digest to checkpoint sequence number.
155    /// Entries from this table are deleted after state accumulation has
156    /// completed together with the corresponding full_checkpoint_content.
157    pub(crate) checkpoint_sequence_by_contents_digest:
158        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
159
160    /// Stores entire checkpoint contents from state sync, indexed by sequence
161    /// number, for efficient reads of full checkpoints. Entries from this
162    /// table are deleted after state accumulation has completed. See
163    /// NUM_SAVED_FULL_CHECKPOINT_CONTENTS.
164    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
165
166    /// Stores certified checkpoints
167    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
168    /// Map from checkpoint digest to certified checkpoint
169    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
170
171    /// Store locally computed checkpoint summaries so that we can detect forks
172    /// and log useful information. Can be pruned as soon as we verify that
173    /// we are in agreement with the latest certified checkpoint.
174    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
175
176    /// A map from epoch ID to the sequence number of the last checkpoint in
177    /// that epoch.
178    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
179
180    /// Watermarks used to determine the highest verified, fully synced, and
181    /// fully executed checkpoints
182    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
183}
184
185impl CheckpointStoreTables {
186    pub fn new(path: &Path, metric_name: &'static str) -> Self {
187        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
188    }
189    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
190        Self::get_read_only_handle(
191            path.to_path_buf(),
192            None,
193            None,
194            MetricConf::new("checkpoint_readonly"),
195        )
196    }
197}
198
199pub struct CheckpointStore {
200    pub(crate) tables: CheckpointStoreTables,
201    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
202    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
203}
204
205impl CheckpointStore {
206    pub fn new(path: &Path) -> Arc<Self> {
207        let tables = CheckpointStoreTables::new(path, "checkpoint");
208        Arc::new(Self {
209            tables,
210            synced_checkpoint_notify_read: NotifyRead::new(),
211            executed_checkpoint_notify_read: NotifyRead::new(),
212        })
213    }
214
215    pub fn new_for_tests() -> Arc<Self> {
216        let storage_dir = iota_common::tempdir().keep();
217        CheckpointStore::new(storage_dir.as_path())
218    }
219
220    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
221        let tables = CheckpointStoreTables::new(path, "db_checkpoint");
222        Arc::new(Self {
223            tables,
224            synced_checkpoint_notify_read: NotifyRead::new(),
225            executed_checkpoint_notify_read: NotifyRead::new(),
226        })
227    }
228
229    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
230        CheckpointStoreTables::open_readonly(path)
231    }
232
233    #[instrument(level = "info", skip_all)]
234    pub fn insert_genesis_checkpoint(
235        &self,
236        checkpoint: VerifiedCheckpoint,
237        contents: CheckpointContents,
238        epoch_store: &AuthorityPerEpochStore,
239    ) {
240        assert_eq!(
241            checkpoint.epoch(),
242            0,
243            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
244        );
245        assert_eq!(
246            *checkpoint.sequence_number(),
247            0,
248            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
249        );
250
251        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
252        // already
253        if self
254            .get_checkpoint_by_digest(checkpoint.digest())
255            .unwrap()
256            .is_none()
257        {
258            if epoch_store.epoch() == checkpoint.epoch {
259                epoch_store
260                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
261                    .unwrap();
262            } else {
263                debug!(
264                    validator_epoch =% epoch_store.epoch(),
265                    genesis_epoch =% checkpoint.epoch(),
266                    "Not inserting checkpoint builder data for genesis checkpoint",
267                );
268            }
269            self.insert_checkpoint_contents(contents).unwrap();
270            self.insert_verified_checkpoint(&checkpoint).unwrap();
271            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
272        }
273    }
274
275    pub fn get_checkpoint_by_digest(
276        &self,
277        digest: &CheckpointDigest,
278    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
279        self.tables
280            .checkpoint_by_digest
281            .get(digest)
282            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
283    }
284
285    pub fn get_checkpoint_by_sequence_number(
286        &self,
287        sequence_number: CheckpointSequenceNumber,
288    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
289        self.tables
290            .certified_checkpoints
291            .get(&sequence_number)
292            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
293    }
294
295    pub fn get_locally_computed_checkpoint(
296        &self,
297        sequence_number: CheckpointSequenceNumber,
298    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
299        self.tables
300            .locally_computed_checkpoints
301            .get(&sequence_number)
302    }
303
304    /// Get checkpoint sequence number by contents digest.
305    ///
306    /// Entries from this table are deleted after state accumulation has
307    /// completed together with the corresponding full_checkpoint_content.
308    pub fn get_sequence_number_by_contents_digest(
309        &self,
310        digest: &CheckpointContentsDigest,
311    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
312        self.tables
313            .checkpoint_sequence_by_contents_digest
314            .get(digest)
315    }
316
317    pub fn delete_contents_digest_sequence_number_mapping(
318        &self,
319        digest: &CheckpointContentsDigest,
320    ) -> Result<(), TypedStoreError> {
321        self.tables
322            .checkpoint_sequence_by_contents_digest
323            .remove(digest)
324    }
325
326    pub fn get_latest_certified_checkpoint(
327        &self,
328    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
329        Ok(self
330            .tables
331            .certified_checkpoints
332            .reversed_safe_iter_with_bounds(None, None)?
333            .next()
334            .transpose()?
335            .map(|(_, v)| v.into()))
336    }
337
338    pub fn get_latest_locally_computed_checkpoint(
339        &self,
340    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
341        Ok(self
342            .tables
343            .locally_computed_checkpoints
344            .reversed_safe_iter_with_bounds(None, None)?
345            .next()
346            .transpose()?
347            .map(|(_, v)| v))
348    }
349
350    pub fn multi_get_checkpoint_by_sequence_number(
351        &self,
352        sequence_numbers: &[CheckpointSequenceNumber],
353    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
354        let checkpoints = self
355            .tables
356            .certified_checkpoints
357            .multi_get(sequence_numbers)?
358            .into_iter()
359            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
360            .collect();
361
362        Ok(checkpoints)
363    }
364
365    pub fn multi_get_checkpoint_content(
366        &self,
367        contents_digest: &[CheckpointContentsDigest],
368    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
369        self.tables.checkpoint_content.multi_get(contents_digest)
370    }
371
372    pub fn get_highest_verified_checkpoint(
373        &self,
374    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
375        let highest_verified = if let Some(highest_verified) = self
376            .tables
377            .watermarks
378            .get(&CheckpointWatermark::HighestVerified)?
379        {
380            highest_verified
381        } else {
382            return Ok(None);
383        };
384        self.get_checkpoint_by_digest(&highest_verified.1)
385    }
386
387    pub fn get_highest_synced_checkpoint(
388        &self,
389    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
390        let highest_synced = if let Some(highest_synced) = self
391            .tables
392            .watermarks
393            .get(&CheckpointWatermark::HighestSynced)?
394        {
395            highest_synced
396        } else {
397            return Ok(None);
398        };
399        self.get_checkpoint_by_digest(&highest_synced.1)
400    }
401
402    pub fn get_highest_synced_checkpoint_seq_number(
403        &self,
404    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
405        if let Some(highest_synced) = self
406            .tables
407            .watermarks
408            .get(&CheckpointWatermark::HighestSynced)?
409        {
410            Ok(Some(highest_synced.0))
411        } else {
412            Ok(None)
413        }
414    }
415
416    pub fn get_highest_executed_checkpoint_seq_number(
417        &self,
418    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
419        if let Some(highest_executed) = self
420            .tables
421            .watermarks
422            .get(&CheckpointWatermark::HighestExecuted)?
423        {
424            Ok(Some(highest_executed.0))
425        } else {
426            Ok(None)
427        }
428    }
429
430    pub fn get_highest_executed_checkpoint(
431        &self,
432    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
433        let highest_executed = if let Some(highest_executed) = self
434            .tables
435            .watermarks
436            .get(&CheckpointWatermark::HighestExecuted)?
437        {
438            highest_executed
439        } else {
440            return Ok(None);
441        };
442        self.get_checkpoint_by_digest(&highest_executed.1)
443    }
444
445    pub fn get_highest_pruned_checkpoint_seq_number(
446        &self,
447    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
448        self.tables
449            .watermarks
450            .get(&CheckpointWatermark::HighestPruned)
451            .map(|watermark| watermark.map(|w| w.0))
452    }
453
454    pub fn get_checkpoint_contents(
455        &self,
456        digest: &CheckpointContentsDigest,
457    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
458        self.tables.checkpoint_content.get(digest)
459    }
460
461    pub fn get_full_checkpoint_contents_by_sequence_number(
462        &self,
463        seq: CheckpointSequenceNumber,
464    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
465        self.tables.full_checkpoint_content.get(&seq)
466    }
467
468    fn prune_local_summaries(&self) -> IotaResult {
469        if let Some((last_local_summary, _)) = self
470            .tables
471            .locally_computed_checkpoints
472            .reversed_safe_iter_with_bounds(None, None)?
473            .next()
474            .transpose()?
475        {
476            let mut batch = self.tables.locally_computed_checkpoints.batch();
477            batch.schedule_delete_range(
478                &self.tables.locally_computed_checkpoints,
479                &0,
480                &last_local_summary,
481            )?;
482            batch.write()?;
483            info!("Pruned local summaries up to {:?}", last_local_summary);
484        }
485        Ok(())
486    }
487
488    #[instrument(level = "trace", skip_all)]
489    fn check_for_checkpoint_fork(
490        &self,
491        local_checkpoint: &CheckpointSummary,
492        verified_checkpoint: &VerifiedCheckpoint,
493    ) {
494        if local_checkpoint != verified_checkpoint.data() {
495            let verified_contents = self
496                .get_checkpoint_contents(&verified_checkpoint.content_digest)
497                .map(|opt_contents| {
498                    opt_contents
499                        .map(|contents| format!("{contents:?}"))
500                        .unwrap_or_else(|| {
501                            format!(
502                                "Verified checkpoint contents not found, digest: {:?}",
503                                verified_checkpoint.content_digest,
504                            )
505                        })
506                })
507                .map_err(|e| {
508                    format!(
509                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
510                        verified_checkpoint.content_digest, e
511                    )
512                })
513                .unwrap_or_else(|err_msg| err_msg);
514
515            let local_contents = self
516                .get_checkpoint_contents(&local_checkpoint.content_digest)
517                .map(|opt_contents| {
518                    opt_contents
519                        .map(|contents| format!("{contents:?}"))
520                        .unwrap_or_else(|| {
521                            format!(
522                                "Local checkpoint contents not found, digest: {:?}",
523                                local_checkpoint.content_digest
524                            )
525                        })
526                })
527                .map_err(|e| {
528                    format!(
529                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
530                        local_checkpoint.content_digest, e
531                    )
532                })
533                .unwrap_or_else(|err_msg| err_msg);
534
535            // checkpoint contents may be too large for panic message.
536            error!(
537                verified_checkpoint = ?verified_checkpoint.data(),
538                ?verified_contents,
539                ?local_checkpoint,
540                ?local_contents,
541                "Local checkpoint fork detected!",
542            );
543            fatal!(
544                "Local checkpoint fork detected for sequence number: {}",
545                local_checkpoint.sequence_number()
546            );
547        }
548    }
549
550    // Called by consensus (ConsensusAggregator).
551    // Different from `insert_verified_checkpoint`, it does not touch
552    // the highest_verified_checkpoint watermark such that state sync
553    // will have a chance to process this checkpoint and perform some
554    // state-sync only things.
555    pub fn insert_certified_checkpoint(
556        &self,
557        checkpoint: &VerifiedCheckpoint,
558    ) -> Result<(), TypedStoreError> {
559        debug!(
560            checkpoint_seq = checkpoint.sequence_number(),
561            "Inserting certified checkpoint",
562        );
563        let mut batch = self.tables.certified_checkpoints.batch();
564        batch
565            .insert_batch(
566                &self.tables.certified_checkpoints,
567                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
568            )?
569            .insert_batch(
570                &self.tables.checkpoint_by_digest,
571                [(checkpoint.digest(), checkpoint.serializable_ref())],
572            )?;
573        if checkpoint.next_epoch_committee().is_some() {
574            batch.insert_batch(
575                &self.tables.epoch_last_checkpoint_map,
576                [(&checkpoint.epoch(), checkpoint.sequence_number())],
577            )?;
578        }
579        batch.write()?;
580
581        if let Some(local_checkpoint) = self
582            .tables
583            .locally_computed_checkpoints
584            .get(checkpoint.sequence_number())?
585        {
586            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
587        }
588
589        Ok(())
590    }
591
592    // Called by state sync, apart from inserting the checkpoint and updating
593    // related tables, it also bumps the highest_verified_checkpoint watermark.
594    #[instrument(level = "trace", skip_all)]
595    pub fn insert_verified_checkpoint(
596        &self,
597        checkpoint: &VerifiedCheckpoint,
598    ) -> Result<(), TypedStoreError> {
599        self.insert_certified_checkpoint(checkpoint)?;
600        self.update_highest_verified_checkpoint(checkpoint)
601    }
602
603    pub fn update_highest_verified_checkpoint(
604        &self,
605        checkpoint: &VerifiedCheckpoint,
606    ) -> Result<(), TypedStoreError> {
607        if Some(*checkpoint.sequence_number())
608            > self
609                .get_highest_verified_checkpoint()?
610                .map(|x| *x.sequence_number())
611        {
612            debug!(
613                checkpoint_seq = checkpoint.sequence_number(),
614                "Updating highest verified checkpoint",
615            );
616            self.tables.watermarks.insert(
617                &CheckpointWatermark::HighestVerified,
618                &(*checkpoint.sequence_number(), *checkpoint.digest()),
619            )?;
620        }
621
622        Ok(())
623    }
624
625    pub fn update_highest_synced_checkpoint(
626        &self,
627        checkpoint: &VerifiedCheckpoint,
628    ) -> Result<(), TypedStoreError> {
629        let seq = *checkpoint.sequence_number();
630        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
631        self.tables.watermarks.insert(
632            &CheckpointWatermark::HighestSynced,
633            &(seq, *checkpoint.digest()),
634        )?;
635        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
636        Ok(())
637    }
638
639    async fn notify_read_checkpoint_watermark<F>(
640        &self,
641        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
642        seq: CheckpointSequenceNumber,
643        get_watermark: F,
644    ) -> VerifiedCheckpoint
645    where
646        F: Fn() -> Option<CheckpointSequenceNumber>,
647    {
648        type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
649
650        notify_read
651            .read(&[seq], |seqs| {
652                let seq = seqs[0];
653                let Some(highest) = get_watermark() else {
654                    return Ok(vec![None]) as ReadResult;
655                };
656                if highest < seq {
657                    return Ok(vec![None]) as ReadResult;
658                }
659                let checkpoint = self
660                    .get_checkpoint_by_sequence_number(seq)
661                    .expect("db error")
662                    .expect("checkpoint not found");
663                Ok(vec![Some(checkpoint)]) as ReadResult
664            })
665            .await
666            .unwrap()
667            .into_iter()
668            .next()
669            .unwrap()
670    }
671
672    pub async fn notify_read_synced_checkpoint(
673        &self,
674        seq: CheckpointSequenceNumber,
675    ) -> VerifiedCheckpoint {
676        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
677            self.get_highest_synced_checkpoint_seq_number()
678                .expect("db error")
679        })
680        .await
681    }
682
683    pub async fn notify_read_executed_checkpoint(
684        &self,
685        seq: CheckpointSequenceNumber,
686    ) -> VerifiedCheckpoint {
687        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
688            self.get_highest_executed_checkpoint_seq_number()
689                .expect("db error")
690        })
691        .await
692    }
693
694    pub fn update_highest_executed_checkpoint(
695        &self,
696        checkpoint: &VerifiedCheckpoint,
697    ) -> Result<(), TypedStoreError> {
698        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
699            if seq_number >= *checkpoint.sequence_number() {
700                return Ok(());
701            }
702            assert_eq!(
703                seq_number + 1,
704                *checkpoint.sequence_number(),
705                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
706                checkpoint.sequence_number(),
707                seq_number
708            );
709        }
710        let seq = *checkpoint.sequence_number();
711        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
712        self.tables.watermarks.insert(
713            &CheckpointWatermark::HighestExecuted,
714            &(seq, *checkpoint.digest()),
715        )?;
716        self.executed_checkpoint_notify_read
717            .notify(&seq, checkpoint);
718        Ok(())
719    }
720
721    pub fn update_highest_pruned_checkpoint(
722        &self,
723        checkpoint: &VerifiedCheckpoint,
724    ) -> Result<(), TypedStoreError> {
725        self.tables.watermarks.insert(
726            &CheckpointWatermark::HighestPruned,
727            &(*checkpoint.sequence_number(), *checkpoint.digest()),
728        )
729    }
730
731    /// Sets highest executed checkpoint to any value.
732    ///
733    /// WARNING: This method is very subtle and can corrupt the database if used
734    /// incorrectly. It should only be used in one-off cases or tests after
735    /// fully understanding the risk.
736    pub fn set_highest_executed_checkpoint_subtle(
737        &self,
738        checkpoint: &VerifiedCheckpoint,
739    ) -> Result<(), TypedStoreError> {
740        self.tables.watermarks.insert(
741            &CheckpointWatermark::HighestExecuted,
742            &(*checkpoint.sequence_number(), *checkpoint.digest()),
743        )
744    }
745
746    pub fn insert_checkpoint_contents(
747        &self,
748        contents: CheckpointContents,
749    ) -> Result<(), TypedStoreError> {
750        debug!(
751            checkpoint_seq = ?contents.digest(),
752            "Inserting checkpoint contents",
753        );
754        self.tables
755            .checkpoint_content
756            .insert(contents.digest(), &contents)
757    }
758
759    /// Inserts the full checkpoint contents along with the mapping from
760    /// contents digest to sequence number, and the checkpoint contents.
761    ///
762    /// The entries for mapping the contents digest to sequence number,
763    /// and the full_checkpoint_content are deleted after state accumulation has
764    /// completed. See NUM_SAVED_FULL_CHECKPOINT_CONTENTS.
765    pub fn insert_verified_checkpoint_contents(
766        &self,
767        checkpoint: &VerifiedCheckpoint,
768        full_contents: VerifiedCheckpointContents,
769    ) -> Result<(), TypedStoreError> {
770        let mut batch = self.tables.full_checkpoint_content.batch();
771        batch.insert_batch(
772            &self.tables.checkpoint_sequence_by_contents_digest,
773            [(&checkpoint.content_digest, checkpoint.sequence_number())],
774        )?;
775        let full_contents = full_contents.into_inner();
776        batch.insert_batch(
777            &self.tables.full_checkpoint_content,
778            [(checkpoint.sequence_number(), &full_contents)],
779        )?;
780
781        let contents = full_contents.into_checkpoint_contents();
782        assert_eq!(&checkpoint.content_digest, contents.digest());
783
784        batch.insert_batch(
785            &self.tables.checkpoint_content,
786            [(contents.digest(), &contents)],
787        )?;
788
789        batch.write()
790    }
791
792    pub fn delete_full_checkpoint_contents(
793        &self,
794        seq: CheckpointSequenceNumber,
795    ) -> Result<(), TypedStoreError> {
796        self.tables.full_checkpoint_content.remove(&seq)
797    }
798
799    pub fn get_epoch_last_checkpoint(
800        &self,
801        epoch_id: EpochId,
802    ) -> IotaResult<Option<VerifiedCheckpoint>> {
803        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
804        let checkpoint = match seq {
805            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
806            None => None,
807        };
808        Ok(checkpoint)
809    }
810
811    pub fn insert_epoch_last_checkpoint(
812        &self,
813        epoch_id: EpochId,
814        checkpoint: &VerifiedCheckpoint,
815    ) -> IotaResult {
816        self.tables
817            .epoch_last_checkpoint_map
818            .insert(&epoch_id, checkpoint.sequence_number())?;
819        Ok(())
820    }
821
822    pub fn get_epoch_state_commitments(
823        &self,
824        epoch: EpochId,
825    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
826        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
827            checkpoint
828                .end_of_epoch_data
829                .as_ref()
830                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
831                .epoch_commitments
832                .clone()
833        });
834        Ok(commitments)
835    }
836
837    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
838    /// statistics of the epoch.
839    pub fn get_epoch_stats(
840        &self,
841        epoch: EpochId,
842        last_checkpoint: &CheckpointSummary,
843    ) -> Option<EpochStats> {
844        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
845            (0, 0)
846        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
847            (
848                checkpoint.sequence_number + 1,
849                checkpoint.network_total_transactions,
850            )
851        } else {
852            return None;
853        };
854        Some(EpochStats {
855            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
856            transaction_count: last_checkpoint.network_total_transactions
857                - prev_epoch_network_transactions,
858            total_gas_reward: last_checkpoint
859                .epoch_rolling_gas_cost_summary
860                .computation_cost,
861        })
862    }
863
864    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
865        // This checkpoints the entire db and not one column family
866        self.tables
867            .checkpoint_content
868            .checkpoint_db(path)
869            .map_err(Into::into)
870    }
871
872    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
873        let mut wb = self.tables.watermarks.batch();
874        wb.delete_batch(
875            &self.tables.watermarks,
876            std::iter::once(CheckpointWatermark::HighestExecuted),
877        )?;
878        wb.write()?;
879        Ok(())
880    }
881
882    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
883        self.delete_highest_executed_checkpoint_test_only()?;
884        Ok(())
885    }
886}
887
888#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
889pub enum CheckpointWatermark {
890    HighestVerified,
891    HighestSynced,
892    HighestExecuted,
893    HighestPruned,
894}
895
896struct CheckpointStateHasher {
897    epoch_store: Arc<AuthorityPerEpochStore>,
898    hasher: Weak<GlobalStateHasher>,
899    receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
900}
901
902impl CheckpointStateHasher {
903    fn new(
904        epoch_store: Arc<AuthorityPerEpochStore>,
905        hasher: Weak<GlobalStateHasher>,
906        receive_from_builder: mpsc::Receiver<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
907    ) -> Self {
908        Self {
909            epoch_store,
910            hasher,
911            receive_from_builder,
912        }
913    }
914
915    async fn run(self) {
916        let Self {
917            epoch_store,
918            hasher,
919            mut receive_from_builder,
920        } = self;
921        while let Some((seq, effects)) = receive_from_builder.recv().await {
922            let Some(hasher) = hasher.upgrade() else {
923                info!("GlobalStateHash was dropped, stopping checkpoint accumulation");
924                break;
925            };
926            hasher
927                .accumulate_checkpoint(&effects, seq, &epoch_store)
928                .expect("epoch ended while accumulating checkpoint");
929        }
930    }
931}
932
933pub struct CheckpointBuilder {
934    state: Arc<AuthorityState>,
935    store: Arc<CheckpointStore>,
936    epoch_store: Arc<AuthorityPerEpochStore>,
937    notify: Arc<Notify>,
938    notify_aggregator: Arc<Notify>,
939    last_built: watch::Sender<CheckpointSequenceNumber>,
940    effects_store: Arc<dyn TransactionCacheRead>,
941    global_state_hasher: Weak<GlobalStateHasher>,
942    send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
943    output: Box<dyn CheckpointOutput>,
944    metrics: Arc<CheckpointMetrics>,
945    max_transactions_per_checkpoint: usize,
946    max_checkpoint_size_bytes: usize,
947}
948
949pub struct CheckpointAggregator {
950    store: Arc<CheckpointStore>,
951    epoch_store: Arc<AuthorityPerEpochStore>,
952    notify: Arc<Notify>,
953    current: Option<CheckpointSignatureAggregator>,
954    output: Box<dyn CertifiedCheckpointOutput>,
955    state: Arc<AuthorityState>,
956    metrics: Arc<CheckpointMetrics>,
957}
958
959// This holds information to aggregate signatures for one checkpoint
960pub struct CheckpointSignatureAggregator {
961    next_index: u64,
962    summary: CheckpointSummary,
963    digest: CheckpointDigest,
964    /// Aggregates voting stake for each signed checkpoint proposal by authority
965    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
966    store: Arc<CheckpointStore>,
967    state: Arc<AuthorityState>,
968    metrics: Arc<CheckpointMetrics>,
969}
970
971impl CheckpointBuilder {
972    fn new(
973        state: Arc<AuthorityState>,
974        store: Arc<CheckpointStore>,
975        epoch_store: Arc<AuthorityPerEpochStore>,
976        notify: Arc<Notify>,
977        effects_store: Arc<dyn TransactionCacheRead>,
978        // for synchronous accumulation of end-of-epoch checkpoint
979        global_state_hasher: Weak<GlobalStateHasher>,
980        // for asynchronous/concurrent accumulation of regular checkpoints
981        send_to_hasher: mpsc::Sender<(CheckpointSequenceNumber, Vec<TransactionEffects>)>,
982        output: Box<dyn CheckpointOutput>,
983        notify_aggregator: Arc<Notify>,
984        last_built: watch::Sender<CheckpointSequenceNumber>,
985        metrics: Arc<CheckpointMetrics>,
986        max_transactions_per_checkpoint: usize,
987        max_checkpoint_size_bytes: usize,
988    ) -> Self {
989        Self {
990            state,
991            store,
992            epoch_store,
993            notify,
994            effects_store,
995            global_state_hasher,
996            send_to_hasher,
997            output,
998            notify_aggregator,
999            last_built,
1000            metrics,
1001            max_transactions_per_checkpoint,
1002            max_checkpoint_size_bytes,
1003        }
1004    }
1005
1006    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
1007    /// creation of checkpoints.
1008    async fn run(mut self) {
1009        info!("Starting CheckpointBuilder");
1010        loop {
1011            self.maybe_build_checkpoints().await;
1012
1013            self.notify.notified().await;
1014        }
1015    }
1016
1017    async fn maybe_build_checkpoints(&mut self) {
1018        let _scope = monitored_scope("BuildCheckpoints");
1019
1020        // Collect info about the most recently built checkpoint.
1021        let summary = self
1022            .epoch_store
1023            .last_built_checkpoint_builder_summary()
1024            .expect("epoch should not have ended");
1025        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1026        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1027
1028        let min_checkpoint_interval_ms = self
1029            .epoch_store
1030            .protocol_config()
1031            .min_checkpoint_interval_ms_as_option()
1032            .unwrap_or_default();
1033        let mut grouped_pending_checkpoints = Vec::new();
1034        let mut checkpoints_iter = self
1035            .epoch_store
1036            .get_pending_checkpoints(last_height)
1037            .expect("unexpected epoch store error")
1038            .into_iter()
1039            .peekable();
1040        while let Some((height, pending)) = checkpoints_iter.next() {
1041            // Group PendingCheckpoints until:
1042            // - minimum interval has elapsed ...
1043            let current_timestamp = pending.details().timestamp_ms;
1044            let can_build = match last_timestamp {
1045                    Some(last_timestamp) => {
1046                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1047                    }
1048                    None => true,
1049                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1050                //   should be written separately) ...
1051                } || checkpoints_iter
1052                    .peek()
1053                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1054                // - or, we have reached end of epoch.
1055                    || pending.details().last_of_epoch;
1056            grouped_pending_checkpoints.push(pending);
1057            if !can_build {
1058                debug!(
1059                    checkpoint_commit_height = height,
1060                    ?last_timestamp,
1061                    ?current_timestamp,
1062                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1063                );
1064                continue;
1065            }
1066
1067            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1068            last_height = Some(height);
1069            last_timestamp = Some(current_timestamp);
1070            debug!(
1071                checkpoint_commit_height = height,
1072                "Making checkpoint at commit height"
1073            );
1074
1075            match self
1076                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1077                .await
1078            {
1079                Ok(seq) => {
1080                    self.last_built.send_if_modified(|cur| {
1081                        // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1082                        if seq > *cur {
1083                            *cur = seq;
1084                            true
1085                        } else {
1086                            false
1087                        }
1088                    });
1089                }
1090                Err(e) => {
1091                    error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1092                    tokio::time::sleep(Duration::from_secs(1)).await;
1093                    self.metrics.checkpoint_errors.inc();
1094                    return;
1095                }
1096            }
1097            // Ensure that the task can be cancelled at end of epoch, even if no other await
1098            // yields execution.
1099            tokio::task::yield_now().await;
1100        }
1101        debug!(
1102            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1103            grouped_pending_checkpoints.len(),
1104        );
1105    }
1106
1107    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1108    async fn make_checkpoint(
1109        &self,
1110        pendings: Vec<PendingCheckpoint>,
1111    ) -> anyhow::Result<CheckpointSequenceNumber> {
1112        let _scope = monitored_scope("CheckpointBuilder::make_checkpoint");
1113        let last_details = pendings.last().unwrap().details().clone();
1114
1115        // Keeps track of the effects that are already included in the current
1116        // checkpoint. This is used when there are multiple pending checkpoints
1117        // to create a single checkpoint because in such scenarios, dependencies
1118        // of a transaction may in earlier created checkpoints, or in earlier
1119        // pending checkpoints.
1120        let mut effects_in_current_checkpoint = BTreeSet::new();
1121
1122        // Stores the transactions that should be included in the checkpoint.
1123        // Transactions will be recorded in the checkpoint in this order.
1124        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1125        let mut all_roots = HashSet::new();
1126        for pending_checkpoint in pendings.into_iter() {
1127            let pending = pending_checkpoint.into_v1();
1128            let (root_digests, txn_in_checkpoint) = self
1129                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1130                .await?;
1131            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1132            all_roots.extend(root_digests);
1133        }
1134        let new_checkpoints = self
1135            .create_checkpoints(
1136                sorted_tx_effects_included_in_checkpoint,
1137                &last_details,
1138                &all_roots,
1139            )
1140            .await?;
1141        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1142        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1143            .await?;
1144        Ok(highest_sequence)
1145    }
1146
1147    // Given the root transactions of a pending checkpoint, resolve the transactions
1148    // should be included in the checkpoint, and return them in the order they
1149    // should be included in the checkpoint. `effects_in_current_checkpoint`
1150    // tracks the transactions that already exist in the current checkpoint.
1151    #[instrument(level = "debug", skip_all)]
1152    async fn resolve_checkpoint_transactions(
1153        &self,
1154        roots: Vec<TransactionKey>,
1155        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1156    ) -> IotaResult<(Vec<TransactionDigest>, Vec<TransactionEffects>)> {
1157        let _scope = monitored_scope("CheckpointBuilder::resolve_checkpoint_transactions");
1158
1159        self.metrics
1160            .checkpoint_roots_count
1161            .inc_by(roots.len() as u64);
1162
1163        let root_digests = self
1164            .epoch_store
1165            .notify_read_executed_digests(&roots)
1166            .in_monitored_scope("CheckpointNotifyDigests")
1167            .await?;
1168        let root_effects = self
1169            .effects_store
1170            .try_notify_read_executed_effects(&root_digests)
1171            .in_monitored_scope("CheckpointNotifyRead")
1172            .await?;
1173
1174        let consensus_commit_prologue = {
1175            // If the roots contains consensus commit prologue transaction, we want to
1176            // extract it, and put it to the front of the checkpoint.
1177
1178            let consensus_commit_prologue = self
1179                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1180                .await?;
1181
1182            // Get the un-included dependencies of the consensus commit prologue. We should
1183            // expect no other dependencies that haven't been included in any
1184            // previous checkpoints.
1185            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1186                let unsorted_ccp = self.complete_checkpoint_effects(
1187                    vec![ccp_effects.clone()],
1188                    effects_in_current_checkpoint,
1189                )?;
1190
1191                // No other dependencies of this consensus commit prologue that haven't been
1192                // included in any previous checkpoint.
1193                if unsorted_ccp.len() != 1 {
1194                    fatal!(
1195                        "Expected 1 consensus commit prologue, got {:?}",
1196                        unsorted_ccp
1197                            .iter()
1198                            .map(|e| e.transaction_digest())
1199                            .collect::<Vec<_>>()
1200                    );
1201                }
1202                assert_eq!(unsorted_ccp.len(), 1);
1203                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1204            }
1205            consensus_commit_prologue
1206        };
1207
1208        let unsorted =
1209            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1210
1211        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1212        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1213        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1214            #[cfg(debug_assertions)]
1215            {
1216                // When consensus_commit_prologue is extracted, it should not be included in the
1217                // `unsorted`.
1218                for tx in unsorted.iter() {
1219                    assert!(tx.transaction_digest() != &_ccp_digest);
1220                }
1221            }
1222            sorted.push(ccp_effects);
1223        }
1224        sorted.extend(CausalOrder::causal_sort(unsorted));
1225
1226        #[cfg(msim)]
1227        {
1228            // Check consensus commit prologue invariants in sim test.
1229            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1230        }
1231
1232        Ok((root_digests, sorted))
1233    }
1234
1235    // This function is used to extract the consensus commit prologue digest and
1236    // effects from the root transactions.
1237    // The consensus commit prologue is expected to be the first transaction in the
1238    // roots.
1239    async fn extract_consensus_commit_prologue(
1240        &self,
1241        root_digests: &[TransactionDigest],
1242        root_effects: &[TransactionEffects],
1243    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1244        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1245        if root_digests.is_empty() {
1246            return Ok(None);
1247        }
1248
1249        // Reads the first transaction in the roots, and checks whether it is a
1250        // consensus commit prologue transaction.
1251        // The consensus commit prologue transaction should be the first transaction
1252        // in the roots written by the consensus handler.
1253        let first_tx = self
1254            .state
1255            .get_transaction_cache_reader()
1256            .try_get_transaction_block(&root_digests[0])?
1257            .expect("Transaction block must exist");
1258
1259        Ok(match first_tx.transaction_data().kind() {
1260            TransactionKind::ConsensusCommitPrologueV1(_) => {
1261                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1262                Some((*first_tx.digest(), root_effects[0].clone()))
1263            }
1264            _ => None,
1265        })
1266    }
1267
1268    /// Writes the new checkpoints to the DB storage and processes them.
1269    #[instrument(level = "debug", skip_all)]
1270    async fn write_checkpoints(
1271        &self,
1272        height: CheckpointHeight,
1273        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1274    ) -> IotaResult {
1275        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1276        let mut batch = self.store.tables.checkpoint_content.batch();
1277        let mut all_tx_digests =
1278            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1279
1280        for (summary, contents) in &new_checkpoints {
1281            debug!(
1282                checkpoint_commit_height = height,
1283                checkpoint_seq = summary.sequence_number,
1284                contents_digest = ?contents.digest(),
1285                "writing checkpoint",
1286            );
1287
1288            if let Some(previously_computed_summary) = self
1289                .store
1290                .tables
1291                .locally_computed_checkpoints
1292                .get(&summary.sequence_number)?
1293            {
1294                if previously_computed_summary != *summary {
1295                    // Panic so that we don't send out an equivocating checkpoint sig.
1296                    fatal!(
1297                        "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1298                        summary.sequence_number,
1299                    );
1300                }
1301            }
1302
1303            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1304
1305            self.metrics
1306                .transactions_included_in_checkpoint
1307                .inc_by(contents.size() as u64);
1308            let sequence_number = summary.sequence_number;
1309            self.metrics
1310                .last_constructed_checkpoint
1311                .set(sequence_number as i64);
1312
1313            batch.insert_batch(
1314                &self.store.tables.checkpoint_content,
1315                [(contents.digest(), contents)],
1316            )?;
1317
1318            batch.insert_batch(
1319                &self.store.tables.locally_computed_checkpoints,
1320                [(sequence_number, summary)],
1321            )?;
1322        }
1323
1324        batch.write()?;
1325
1326        // Send all checkpoint sigs to consensus. The messages including
1327        // MisbehaviorReports are also sent in this step.
1328        for (summary, contents) in &new_checkpoints {
1329            self.output
1330                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1331                .await?;
1332        }
1333
1334        for (local_checkpoint, _) in &new_checkpoints {
1335            if let Some(certified_checkpoint) = self
1336                .store
1337                .tables
1338                .certified_checkpoints
1339                .get(local_checkpoint.sequence_number())?
1340            {
1341                self.store
1342                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1343            }
1344        }
1345
1346        self.notify_aggregator.notify_one();
1347        self.epoch_store
1348            .process_constructed_checkpoint(height, new_checkpoints);
1349        Ok(())
1350    }
1351
1352    #[expect(clippy::type_complexity)]
1353    fn split_checkpoint_chunks(
1354        &self,
1355        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1356        signatures: Vec<Vec<GenericSignature>>,
1357    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1358        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1359        let mut chunks = Vec::new();
1360        let mut chunk = Vec::new();
1361        let mut chunk_size: usize = 0;
1362        for ((effects, transaction_size), signatures) in
1363            effects_and_transaction_sizes.into_iter().zip(signatures)
1364        {
1365            // Roll over to a new chunk after either max count or max size is reached.
1366            // The size calculation here is intended to estimate the size of the
1367            // FullCheckpointContents struct. If this code is modified, that struct
1368            // should also be updated accordingly.
1369            let size = transaction_size
1370                + bcs::serialized_size(&effects)?
1371                + bcs::serialized_size(&signatures)?;
1372            if chunk.len() == self.max_transactions_per_checkpoint
1373                || (chunk_size + size) > self.max_checkpoint_size_bytes
1374            {
1375                if chunk.is_empty() {
1376                    // Always allow at least one tx in a checkpoint.
1377                    warn!(
1378                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1379                        self.max_checkpoint_size_bytes
1380                    );
1381                } else {
1382                    chunks.push(chunk);
1383                    chunk = Vec::new();
1384                    chunk_size = 0;
1385                }
1386            }
1387
1388            chunk.push((effects, signatures));
1389            chunk_size += size;
1390        }
1391
1392        if !chunk.is_empty() || chunks.is_empty() {
1393            // We intentionally create an empty checkpoint if there is no content provided
1394            // to make a 'heartbeat' checkpoint.
1395            // Important: if some conditions are added here later, we need to make sure we
1396            // always have at least one chunk if last_pending_of_epoch is set
1397            chunks.push(chunk);
1398            // Note: empty checkpoints are ok - they shouldn't happen at all on
1399            // a network with even modest load. Even if they do
1400            // happen, it is still useful as it allows fullnodes to
1401            // distinguish between "no transactions have happened" and "i am not
1402            // receiving new checkpoints".
1403        }
1404        Ok(chunks)
1405    }
1406
1407    /// Creates checkpoints using the provided transaction effects and pending
1408    /// checkpoint information.
1409    fn load_last_built_checkpoint_summary(
1410        epoch_store: &AuthorityPerEpochStore,
1411        store: &CheckpointStore,
1412    ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1413        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1414        if last_checkpoint.is_none() {
1415            let epoch = epoch_store.epoch();
1416            if epoch > 0 {
1417                let previous_epoch = epoch - 1;
1418                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1419                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1420                if let Some((ref seq, _)) = last_checkpoint {
1421                    debug!(
1422                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1423                    );
1424                } else {
1425                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1426                    // via panic
1427                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1428                }
1429            }
1430        }
1431        Ok(last_checkpoint)
1432    }
1433
1434    #[instrument(level = "debug", skip_all)]
1435    async fn create_checkpoints(
1436        &self,
1437        all_effects: Vec<TransactionEffects>,
1438        details: &PendingCheckpointInfo,
1439        all_roots: &HashSet<TransactionDigest>,
1440    ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1441        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1442
1443        let total = all_effects.len();
1444        let mut last_checkpoint =
1445            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1446        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1447        debug!(
1448            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1449            checkpoint_timestamp = details.timestamp_ms,
1450            "Creating checkpoint(s) for {} transactions",
1451            all_effects.len(),
1452        );
1453
1454        let all_digests: Vec<_> = all_effects
1455            .iter()
1456            .map(|effect| *effect.transaction_digest())
1457            .collect();
1458        let transactions_and_sizes = self
1459            .state
1460            .get_transaction_cache_reader()
1461            .try_get_transactions_and_serialized_sizes(&all_digests)?;
1462        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1463        let mut transactions = Vec::with_capacity(all_effects.len());
1464        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1465        let mut randomness_rounds = BTreeMap::new();
1466        {
1467            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1468            debug!(
1469                ?last_checkpoint_seq,
1470                "Waiting for {:?} certificates to appear in consensus",
1471                all_effects.len()
1472            );
1473
1474            for (effects, transaction_and_size) in all_effects
1475                .into_iter()
1476                .zip(transactions_and_sizes.into_iter())
1477            {
1478                let (transaction, size) = transaction_and_size
1479                    .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1480                match transaction.inner().transaction_data().kind() {
1481                    #[allow(deprecated)]
1482                    TransactionKind::ConsensusCommitPrologueV1(_)
1483                    | TransactionKind::AuthenticatorStateUpdateV1Deprecated => {
1484                        // ConsensusCommitPrologue is guaranteed to be
1485                        // processed before we reach here.
1486                        //
1487                        // Deprecated: Authenticator state (JWK) is deprecated
1488                        // and was never enabled.
1489                        // These transaction kinds are retained
1490                        // only for BCS enum variant compatibility.
1491                    }
1492                    TransactionKind::RandomnessStateUpdate(rsu) => {
1493                        randomness_rounds
1494                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1495                    }
1496                    _ => {
1497                        // Only transactions that are not roots should be included in the call to
1498                        // `consensus_messages_processed_notify`. roots come directly from the
1499                        // consensus commit and so are known to be processed
1500                        // already.
1501                        let digest = *effects.transaction_digest();
1502                        if !all_roots.contains(&digest) {
1503                            transaction_keys.push(SequencedConsensusTransactionKey::External(
1504                                ConsensusTransactionKey::Certificate(digest),
1505                            ));
1506                        }
1507                    }
1508                }
1509                transactions.push(transaction);
1510                all_effects_and_transaction_sizes.push((effects, size));
1511            }
1512
1513            self.epoch_store
1514                .consensus_messages_processed_notify(transaction_keys)
1515                .await?;
1516        }
1517
1518        let signatures = self
1519            .epoch_store
1520            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1521        debug!(
1522            ?last_checkpoint_seq,
1523            "Received {} checkpoint user signatures from consensus",
1524            signatures.len()
1525        );
1526
1527        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1528        let chunks_count = chunks.len();
1529
1530        let mut checkpoints = Vec::with_capacity(chunks_count);
1531        debug!(
1532            ?last_checkpoint_seq,
1533            "Creating {} checkpoints with {} transactions", chunks_count, total,
1534        );
1535
1536        let epoch = self.epoch_store.epoch();
1537        for (index, transactions) in chunks.into_iter().enumerate() {
1538            let first_checkpoint_of_epoch = index == 0
1539                && last_checkpoint
1540                    .as_ref()
1541                    .map(|(_, c)| c.epoch != epoch)
1542                    .unwrap_or(true);
1543            if first_checkpoint_of_epoch {
1544                self.epoch_store
1545                    .record_epoch_first_checkpoint_creation_time_metric();
1546            }
1547            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1548
1549            let sequence_number = last_checkpoint
1550                .as_ref()
1551                .map(|(_, c)| c.sequence_number + 1)
1552                .unwrap_or_default();
1553            let mut timestamp_ms = details.timestamp_ms;
1554            if let Some((_, last_checkpoint)) = &last_checkpoint {
1555                if last_checkpoint.timestamp_ms > timestamp_ms {
1556                    // The first consensus commit of an epoch can have zero timestamp.
1557                    debug!(
1558                        "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1559                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1560                    );
1561                    if self
1562                        .epoch_store
1563                        .protocol_config()
1564                        .consensus_median_timestamp_with_checkpoint_enforcement()
1565                    {
1566                        timestamp_ms = last_checkpoint.timestamp_ms;
1567                    }
1568                }
1569            }
1570
1571            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1572            let epoch_rolling_gas_cost_summary =
1573                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1574
1575            let end_of_epoch_data = if last_checkpoint_of_epoch {
1576                let scores: Vec<u64> = if self
1577                    .epoch_store
1578                    .protocol_config()
1579                    .pass_calculated_validator_scores_to_advance_epoch()
1580                {
1581                    self.epoch_store.scoreboard.current_scores()
1582                } else {
1583                    // Give everyone in the committee the max score
1584                    vec![MAX_SCORE; self.epoch_store.committee().num_members()]
1585                };
1586
1587                let (system_state_obj, system_epoch_info_event) = self
1588                    .augment_epoch_last_checkpoint(
1589                        &epoch_rolling_gas_cost_summary,
1590                        timestamp_ms,
1591                        &mut effects,
1592                        &mut signatures,
1593                        sequence_number,
1594                        scores,
1595                    )
1596                    .await?;
1597
1598                // The system epoch info event can be `None` in case if the `advance_epoch`
1599                // Move function call failed and was executed in the safe mode.
1600                // In this case, the tokens supply should be unchanged.
1601                //
1602                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1603                // to those small numbers, no overflows will occur during conversion or
1604                // subtraction.
1605                let epoch_supply_change =
1606                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1607
1608                let committee = system_state_obj
1609                    .get_current_epoch_committee()
1610                    .committee()
1611                    .clone();
1612
1613                // This must happen after the call to augment_epoch_last_checkpoint,
1614                // otherwise we will not capture the change_epoch tx.
1615                let root_state_digest = {
1616                    let state_acc = self
1617                        .global_state_hasher
1618                        .upgrade()
1619                        .expect("No checkpoints should be getting built after local configuration");
1620                    let acc = state_acc.accumulate_checkpoint(
1621                        &effects,
1622                        sequence_number,
1623                        &self.epoch_store,
1624                    )?;
1625
1626                    state_acc
1627                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1628                        .await?;
1629
1630                    state_acc.accumulate_running_root(
1631                        &self.epoch_store,
1632                        sequence_number,
1633                        Some(acc),
1634                    )?;
1635                    state_acc
1636                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1637                        .await?
1638                };
1639                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1640                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1641
1642                let epoch_commitments = vec![root_state_digest.into()];
1643
1644                Some(EndOfEpochData {
1645                    next_epoch_committee: committee.voting_rights,
1646                    next_epoch_protocol_version: ProtocolVersion::new(
1647                        system_state_obj.protocol_version(),
1648                    ),
1649                    epoch_commitments,
1650                    epoch_supply_change,
1651                })
1652            } else {
1653                self.send_to_hasher
1654                    .send((sequence_number, effects.clone()))
1655                    .await?;
1656                None
1657            };
1658            let contents = CheckpointContents::new_with_digests_and_signatures(
1659                effects.iter().map(TransactionEffects::execution_digests),
1660                signatures,
1661            );
1662
1663            let num_txns = contents.size() as u64;
1664
1665            let network_total_transactions = last_checkpoint
1666                .as_ref()
1667                .map(|(_, c)| c.network_total_transactions + num_txns)
1668                .unwrap_or(num_txns);
1669
1670            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1671
1672            let matching_randomness_rounds: Vec<_> = effects
1673                .iter()
1674                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1675                .copied()
1676                .collect();
1677
1678            let summary = CheckpointSummary::new(
1679                self.epoch_store.protocol_config(),
1680                epoch,
1681                sequence_number,
1682                network_total_transactions,
1683                &contents,
1684                previous_digest,
1685                epoch_rolling_gas_cost_summary,
1686                end_of_epoch_data,
1687                timestamp_ms,
1688                matching_randomness_rounds,
1689            );
1690            summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1691            if last_checkpoint_of_epoch {
1692                info!(
1693                    checkpoint_seq = sequence_number,
1694                    "creating last checkpoint of epoch {}", epoch
1695                );
1696                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1697                    self.epoch_store
1698                        .report_epoch_metrics_at_last_checkpoint(stats);
1699                }
1700            }
1701            last_checkpoint = Some((sequence_number, summary.clone()));
1702            checkpoints.push((summary, contents));
1703        }
1704
1705        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1706    }
1707
1708    fn get_epoch_total_gas_cost(
1709        &self,
1710        last_checkpoint: Option<&CheckpointSummary>,
1711        cur_checkpoint_effects: &[TransactionEffects],
1712    ) -> GasCostSummary {
1713        let (previous_epoch, previous_gas_costs) = last_checkpoint
1714            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1715            .unwrap_or_default();
1716        let current_gas_costs =
1717            checked::new_gas_cost_summary_from_txn_effects(cur_checkpoint_effects.iter());
1718        if previous_epoch == self.epoch_store.epoch() {
1719            // sum only when we are within the same epoch
1720            GasCostSummary::new(
1721                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1722                previous_gas_costs.computation_cost_burned
1723                    + current_gas_costs.computation_cost_burned,
1724                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1725                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1726                previous_gas_costs.non_refundable_storage_fee
1727                    + current_gas_costs.non_refundable_storage_fee,
1728            )
1729        } else {
1730            current_gas_costs
1731        }
1732    }
1733
1734    /// Augments the last checkpoint of the epoch by creating and executing an
1735    /// advance epoch transaction.
1736    #[instrument(level = "error", skip_all)]
1737    async fn augment_epoch_last_checkpoint(
1738        &self,
1739        epoch_total_gas_cost: &GasCostSummary,
1740        epoch_start_timestamp_ms: CheckpointTimestamp,
1741        checkpoint_effects: &mut Vec<TransactionEffects>,
1742        signatures: &mut Vec<Vec<GenericSignature>>,
1743        checkpoint: CheckpointSequenceNumber,
1744        scores: Vec<u64>,
1745    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1746        let (system_state, system_epoch_info_event, effects) = self
1747            .state
1748            .create_and_execute_advance_epoch_tx(
1749                &self.epoch_store,
1750                epoch_total_gas_cost,
1751                checkpoint,
1752                epoch_start_timestamp_ms,
1753                scores,
1754            )
1755            .await?;
1756        checkpoint_effects.push(effects);
1757        signatures.push(vec![]);
1758        Ok((system_state, system_epoch_info_event))
1759    }
1760
1761    /// For the given roots return complete list of effects to include in
1762    /// checkpoint This list includes the roots and all their dependencies,
1763    /// which are not part of checkpoint already. Note that this function
1764    /// may be called multiple times to construct the checkpoint.
1765    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1766    /// that are already included in the checkpoint. Txs in `roots` that
1767    /// need to be included in the checkpoint will be added to
1768    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1769    #[instrument(level = "debug", skip_all)]
1770    fn complete_checkpoint_effects(
1771        &self,
1772        mut roots: Vec<TransactionEffects>,
1773        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1774    ) -> IotaResult<Vec<TransactionEffects>> {
1775        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1776        let mut results = vec![];
1777        let mut seen = HashSet::new();
1778        loop {
1779            let mut pending = HashSet::new();
1780
1781            let transactions_included = self
1782                .epoch_store
1783                .builder_included_transactions_in_checkpoint(
1784                    roots.iter().map(|e| e.transaction_digest()),
1785                )?;
1786
1787            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1788                let digest = effect.transaction_digest();
1789                // Unnecessary to read effects of a dependency if the effect is already
1790                // processed.
1791                seen.insert(*digest);
1792
1793                // Skip roots that are already included in the checkpoint.
1794                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1795                    continue;
1796                }
1797
1798                // Skip roots already included in checkpoints or roots from previous epochs
1799                if tx_included || effect.epoch() < self.epoch_store.epoch() {
1800                    continue;
1801                }
1802
1803                let existing_effects = self
1804                    .epoch_store
1805                    .transactions_executed_in_cur_epoch(effect.dependencies())?;
1806
1807                for (dependency, effects_signature_exists) in
1808                    effect.dependencies().iter().zip(existing_effects.iter())
1809                {
1810                    // Skip here if dependency not executed in the current epoch.
1811                    // Note that the existence of an effects signature in the
1812                    // epoch store for the given digest indicates that the transaction
1813                    // was locally executed in the current epoch
1814                    if !effects_signature_exists {
1815                        continue;
1816                    }
1817                    if seen.insert(*dependency) {
1818                        pending.insert(*dependency);
1819                    }
1820                }
1821                results.push(effect);
1822            }
1823            if pending.is_empty() {
1824                break;
1825            }
1826            let pending = pending.into_iter().collect::<Vec<_>>();
1827            let effects = self
1828                .effects_store
1829                .try_multi_get_executed_effects(&pending)?;
1830            let effects = effects
1831                .into_iter()
1832                .zip(pending)
1833                .map(|(opt, digest)| match opt {
1834                    Some(x) => x,
1835                    None => panic!(
1836                        "Can not find effect for transaction {digest}, however transaction that depend on it was already executed"
1837                    ),
1838                })
1839                .collect::<Vec<_>>();
1840            roots = effects;
1841        }
1842
1843        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1844        Ok(results)
1845    }
1846
1847    // This function is used to check the invariants of the consensus commit
1848    // prologue transactions in the checkpoint in simtest.
1849    #[cfg(msim)]
1850    fn expensive_consensus_commit_prologue_invariants_check(
1851        &self,
1852        root_digests: &[TransactionDigest],
1853        sorted: &[TransactionEffects],
1854    ) {
1855        // Gets all the consensus commit prologue transactions from the roots.
1856        let root_txs = self
1857            .state
1858            .get_transaction_cache_reader()
1859            .multi_get_transaction_blocks(root_digests);
1860        let ccps = root_txs
1861            .iter()
1862            .filter_map(|tx| {
1863                tx.as_ref().filter(|tx| {
1864                    matches!(
1865                        tx.transaction_data().kind(),
1866                        TransactionKind::ConsensusCommitPrologueV1(_)
1867                    )
1868                })
1869            })
1870            .collect::<Vec<_>>();
1871
1872        // There should be at most one consensus commit prologue transaction in the
1873        // roots.
1874        assert!(ccps.len() <= 1);
1875
1876        // Get all the transactions in the checkpoint.
1877        let txs = self
1878            .state
1879            .get_transaction_cache_reader()
1880            .multi_get_transaction_blocks(
1881                &sorted
1882                    .iter()
1883                    .map(|tx| *tx.transaction_digest())
1884                    .collect::<Vec<_>>(),
1885            );
1886
1887        if ccps.is_empty() {
1888            // If there is no consensus commit prologue transaction in the roots, then there
1889            // should be no consensus commit prologue transaction in the
1890            // checkpoint.
1891            for tx in txs.iter().flatten() {
1892                assert!(!matches!(
1893                    tx.transaction_data().kind(),
1894                    TransactionKind::ConsensusCommitPrologueV1(_)
1895                ));
1896            }
1897        } else {
1898            // If there is one consensus commit prologue, it must be the first one in the
1899            // checkpoint.
1900            assert!(matches!(
1901                txs[0].as_ref().unwrap().transaction_data().kind(),
1902                TransactionKind::ConsensusCommitPrologueV1(_)
1903            ));
1904
1905            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1906
1907            for tx in txs.iter().skip(1).flatten() {
1908                assert!(!matches!(
1909                    tx.transaction_data().kind(),
1910                    TransactionKind::ConsensusCommitPrologueV1(_)
1911                ));
1912            }
1913        }
1914    }
1915}
1916
1917impl CheckpointAggregator {
1918    fn new(
1919        tables: Arc<CheckpointStore>,
1920        epoch_store: Arc<AuthorityPerEpochStore>,
1921        notify: Arc<Notify>,
1922        output: Box<dyn CertifiedCheckpointOutput>,
1923        state: Arc<AuthorityState>,
1924        metrics: Arc<CheckpointMetrics>,
1925    ) -> Self {
1926        let current = None;
1927        Self {
1928            store: tables,
1929            epoch_store,
1930            notify,
1931            current,
1932            output,
1933            state,
1934            metrics,
1935        }
1936    }
1937
1938    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1939    /// aggregation of checkpoints.
1940    /// The function ensures continuous aggregation of checkpoints, handling
1941    /// errors and retries gracefully, and allowing for proper shutdown on
1942    /// receiving an exit signal.
1943    async fn run(mut self) {
1944        info!("Starting CheckpointAggregator");
1945        loop {
1946            if let Err(e) = self.run_and_notify().await {
1947                error!(
1948                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1949                    e
1950                );
1951                self.metrics.checkpoint_errors.inc();
1952                tokio::time::sleep(Duration::from_secs(1)).await;
1953                continue;
1954            }
1955
1956            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1957        }
1958    }
1959
1960    async fn run_and_notify(&mut self) -> IotaResult {
1961        let summaries = self.run_inner()?;
1962        for summary in summaries {
1963            self.output.certified_checkpoint_created(&summary).await?;
1964        }
1965        Ok(())
1966    }
1967
1968    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1969        let _scope = monitored_scope("CheckpointAggregator");
1970        let mut result = vec![];
1971        'outer: loop {
1972            let next_to_certify = self.next_checkpoint_to_certify()?;
1973            let current = if let Some(current) = &mut self.current {
1974                // It's possible that the checkpoint was already certified by
1975                // the rest of the network and we've already received the
1976                // certified checkpoint via StateSync. In this case, we reset
1977                // the current signature aggregator to the next checkpoint to
1978                // be certified
1979                if current.summary.sequence_number < next_to_certify {
1980                    self.current = None;
1981                    continue;
1982                }
1983                current
1984            } else {
1985                let Some(summary) = self
1986                    .epoch_store
1987                    .get_built_checkpoint_summary(next_to_certify)?
1988                else {
1989                    return Ok(result);
1990                };
1991                self.current = Some(CheckpointSignatureAggregator {
1992                    next_index: 0,
1993                    digest: summary.digest(),
1994                    summary,
1995                    signatures_by_digest: MultiStakeAggregator::new(
1996                        self.epoch_store.committee().clone(),
1997                    ),
1998                    store: self.store.clone(),
1999                    state: self.state.clone(),
2000                    metrics: self.metrics.clone(),
2001                });
2002                self.current.as_mut().unwrap()
2003            };
2004
2005            let epoch_tables = self
2006                .epoch_store
2007                .tables()
2008                .expect("should not run past end of epoch");
2009            let iter = epoch_tables
2010                .pending_checkpoint_signatures
2011                .safe_iter_with_bounds(
2012                    Some((current.summary.sequence_number, current.next_index)),
2013                    None,
2014                );
2015            for item in iter {
2016                let ((seq, index), data) = item?;
2017                if seq != current.summary.sequence_number {
2018                    trace!(
2019                        checkpoint_seq =? current.summary.sequence_number,
2020                        "Not enough checkpoint signatures",
2021                    );
2022                    // No more signatures (yet) for this checkpoint
2023                    return Ok(result);
2024                }
2025                trace!(
2026                    checkpoint_seq = current.summary.sequence_number,
2027                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2028                    current.summary.digest(),
2029                    data.summary.auth_sig().authority.concise()
2030                );
2031                self.metrics
2032                    .checkpoint_participation
2033                    .with_label_values(&[&format!(
2034                        "{:?}",
2035                        data.summary.auth_sig().authority.concise()
2036                    )])
2037                    .inc();
2038                if let Ok(auth_signature) = current.try_aggregate(data) {
2039                    debug!(
2040                        checkpoint_seq = current.summary.sequence_number,
2041                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2042                        current.summary.digest(),
2043                    );
2044                    let summary = VerifiedCheckpoint::new_unchecked(
2045                        CertifiedCheckpointSummary::new_from_data_and_sig(
2046                            current.summary.clone(),
2047                            auth_signature,
2048                        ),
2049                    );
2050
2051                    self.store.insert_certified_checkpoint(&summary)?;
2052                    self.metrics
2053                        .last_certified_checkpoint
2054                        .set(current.summary.sequence_number as i64);
2055                    current
2056                        .summary
2057                        .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2058                    result.push(summary.into_inner());
2059                    self.current = None;
2060                    continue 'outer;
2061                } else {
2062                    current.next_index = index + 1;
2063                }
2064            }
2065            break;
2066        }
2067        Ok(result)
2068    }
2069
2070    fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2071        Ok(self
2072            .store
2073            .tables
2074            .certified_checkpoints
2075            .reversed_safe_iter_with_bounds(None, None)?
2076            .next()
2077            .transpose()?
2078            .map(|(seq, _)| seq + 1)
2079            .unwrap_or_default())
2080    }
2081}
2082
2083impl CheckpointSignatureAggregator {
2084    #[expect(clippy::result_unit_err)]
2085    pub fn try_aggregate(
2086        &mut self,
2087        data: CheckpointSignatureMessage,
2088    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2089        let their_digest = *data.summary.digest();
2090        let (_, signature) = data.summary.into_data_and_sig();
2091        let author = signature.authority;
2092        let envelope =
2093            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2094        match self.signatures_by_digest.insert(their_digest, envelope) {
2095            // ignore repeated signatures
2096            InsertResult::Failed {
2097                error:
2098                    IotaError::StakeAggregatorRepeatedSigner {
2099                        conflicting_sig: false,
2100                        ..
2101                    },
2102            } => Err(()),
2103            InsertResult::Failed { error } => {
2104                warn!(
2105                    checkpoint_seq = self.summary.sequence_number,
2106                    "Failed to aggregate new signature from validator {:?}: {:?}",
2107                    author.concise(),
2108                    error
2109                );
2110                self.check_for_split_brain();
2111                Err(())
2112            }
2113            InsertResult::QuorumReached(cert) => {
2114                // It is not guaranteed that signature.authority == consensus_cert.author, but
2115                // we do verify the signature so we know that the author signed
2116                // the message at some point.
2117                if their_digest != self.digest {
2118                    self.metrics.remote_checkpoint_forks.inc();
2119                    warn!(
2120                        checkpoint_seq = self.summary.sequence_number,
2121                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2122                        author.concise(),
2123                        their_digest,
2124                        self.digest
2125                    );
2126                    return Err(());
2127                }
2128                Ok(cert)
2129            }
2130            InsertResult::NotEnoughVotes {
2131                bad_votes: _,
2132                bad_authorities: _,
2133            } => {
2134                self.check_for_split_brain();
2135                Err(())
2136            }
2137        }
2138    }
2139
2140    /// Check if there is a split brain condition in checkpoint signature
2141    /// aggregation, defined as any state wherein it is no longer possible
2142    /// to achieve quorum on a checkpoint proposal, irrespective of the
2143    /// outcome of any outstanding votes.
2144    fn check_for_split_brain(&self) {
2145        debug!(
2146            checkpoint_seq = self.summary.sequence_number,
2147            "Checking for split brain condition"
2148        );
2149        if self.signatures_by_digest.quorum_unreachable() {
2150            // TODO: at this point we should immediately halt processing
2151            // of new transaction certificates to avoid building on top of
2152            // forked output
2153            // self.halt_all_execution();
2154
2155            let digests_by_stake_messages = self
2156                .signatures_by_digest
2157                .get_all_unique_values()
2158                .into_iter()
2159                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2160                .map(|(digest, (_authorities, total_stake))| {
2161                    format!("{digest} (total stake: {total_stake})")
2162                })
2163                .collect::<Vec<String>>();
2164            error!(
2165                checkpoint_seq = self.summary.sequence_number,
2166                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2167                self.signatures_by_digest.uncommitted_stake(),
2168                digests_by_stake_messages,
2169            );
2170            self.metrics.split_brain_checkpoint_forks.inc();
2171
2172            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2173            let local_summary = self.summary.clone();
2174            let state = self.state.clone();
2175            let tables = self.store.clone();
2176
2177            tokio::spawn(async move {
2178                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2179            });
2180        }
2181    }
2182}
2183
2184/// Create data dump containing relevant data for diagnosing cause of the
2185/// split brain by querying one disagreeing validator for full checkpoint
2186/// contents. To minimize peer chatter, we only query one validator at random
2187/// from each disagreeing faction, as all honest validators that participated in
2188/// this round may inevitably run the same process.
2189async fn diagnose_split_brain(
2190    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2191    local_summary: CheckpointSummary,
2192    state: Arc<AuthorityState>,
2193    tables: Arc<CheckpointStore>,
2194) {
2195    debug!(
2196        checkpoint_seq = local_summary.sequence_number,
2197        "Running split brain diagnostics..."
2198    );
2199    let time = SystemTime::now();
2200    // collect one random disagreeing validator per differing digest
2201    let digest_to_validator = all_unique_values
2202        .iter()
2203        .filter_map(|(digest, (validators, _))| {
2204            if *digest != local_summary.digest() {
2205                let random_validator = validators.choose(&mut get_rng()).unwrap();
2206                Some((*digest, *random_validator))
2207            } else {
2208                None
2209            }
2210        })
2211        .collect::<HashMap<_, _>>();
2212    if digest_to_validator.is_empty() {
2213        panic!(
2214            "Given split brain condition, there should be at \
2215                least one validator that disagrees with local signature"
2216        );
2217    }
2218
2219    let epoch_store = state.load_epoch_store_one_call_per_task();
2220    let committee = epoch_store
2221        .epoch_start_state()
2222        .get_iota_committee_with_network_metadata();
2223    let network_config = default_iota_network_config();
2224    let network_clients =
2225        make_network_authority_clients_with_network_config(&committee, &network_config);
2226
2227    // Query all disagreeing validators
2228    let response_futures = digest_to_validator
2229        .values()
2230        .cloned()
2231        .map(|validator| {
2232            let client = network_clients
2233                .get(&validator)
2234                .expect("Failed to get network client");
2235            let request = CheckpointRequest {
2236                sequence_number: Some(local_summary.sequence_number),
2237                request_content: true,
2238                certified: false,
2239            };
2240            client.handle_checkpoint(request)
2241        })
2242        .collect::<Vec<_>>();
2243
2244    let digest_name_pair = digest_to_validator.iter();
2245    let response_data = futures::future::join_all(response_futures)
2246        .await
2247        .into_iter()
2248        .zip(digest_name_pair)
2249        .filter_map(|(response, (digest, name))| match response {
2250            Ok(response) => match response {
2251                CheckpointResponse {
2252                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2253                    contents: Some(contents),
2254                } => Some((*name, *digest, summary, contents)),
2255                CheckpointResponse {
2256                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2257                    contents: _,
2258                } => {
2259                    panic!("Expected pending checkpoint, but got certified checkpoint");
2260                }
2261                CheckpointResponse {
2262                    checkpoint: None,
2263                    contents: _,
2264                } => {
2265                    error!(
2266                        "Summary for checkpoint {:?} not found on validator {:?}",
2267                        local_summary.sequence_number, name
2268                    );
2269                    None
2270                }
2271                CheckpointResponse {
2272                    checkpoint: _,
2273                    contents: None,
2274                } => {
2275                    error!(
2276                        "Contents for checkpoint {:?} not found on validator {:?}",
2277                        local_summary.sequence_number, name
2278                    );
2279                    None
2280                }
2281            },
2282            Err(e) => {
2283                error!(
2284                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2285                    e
2286                );
2287                None
2288            }
2289        })
2290        .collect::<Vec<_>>();
2291
2292    let local_checkpoint_contents = tables
2293        .get_checkpoint_contents(&local_summary.content_digest)
2294        .unwrap_or_else(|_| {
2295            panic!(
2296                "Could not find checkpoint contents for digest {:?}",
2297                local_summary.digest()
2298            )
2299        })
2300        .unwrap_or_else(|| {
2301            panic!(
2302                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2303                local_summary.sequence_number,
2304                local_summary.digest()
2305            )
2306        });
2307    let local_contents_text = format!("{local_checkpoint_contents:?}");
2308
2309    let local_summary_text = format!("{local_summary:?}");
2310    let local_validator = state.name.concise();
2311    let diff_patches = response_data
2312        .iter()
2313        .map(|(name, other_digest, other_summary, contents)| {
2314            let other_contents_text = format!("{contents:?}");
2315            let other_summary_text = format!("{other_summary:?}");
2316            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2317                .enumerate_transactions(&local_summary)
2318                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2319                .unzip();
2320            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2321                .enumerate_transactions(other_summary)
2322                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2323                .unzip();
2324            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2325            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2326            let local_transactions_text = format!("{local_transactions:#?}");
2327            let other_transactions_text = format!("{other_transactions:#?}");
2328            let transactions_patch =
2329                create_patch(&local_transactions_text, &other_transactions_text);
2330            let local_effects_text = format!("{local_effects:#?}");
2331            let other_effects_text = format!("{other_effects:#?}");
2332            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2333            let seq_number = local_summary.sequence_number;
2334            let local_digest = local_summary.digest();
2335            let other_validator = name.concise();
2336            format!(
2337                "Checkpoint: {seq_number:?}\n\
2338                Local validator (original): {local_validator:?}, digest: {local_digest}\n\
2339                Other validator (modified): {other_validator:?}, digest: {other_digest}\n\n\
2340                Summary Diff: \n{summary_patch}\n\n\
2341                Contents Diff: \n{contents_patch}\n\n\
2342                Transactions Diff: \n{transactions_patch}\n\n\
2343                Effects Diff: \n{effects_patch}",
2344            )
2345        })
2346        .collect::<Vec<_>>()
2347        .join("\n\n\n");
2348
2349    let header = format!(
2350        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2351        Datetime: {time:?}"
2352    );
2353    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2354    let checkpoint_fork_dir = iota_common::tempdir().keep();
2355    let checkpoint_fork_file_path = checkpoint_fork_dir.join(Path::new("checkpoint_fork_dump.txt"));
2356    let mut file = File::create(checkpoint_fork_file_path).unwrap();
2357    write!(file, "{fork_logs_text}").unwrap();
2358    debug!("{}", fork_logs_text);
2359
2360    fail_point!("split_brain_reached");
2361}
2362
2363pub trait CheckpointServiceNotify {
2364    fn notify_checkpoint_signature(
2365        &self,
2366        epoch_store: &AuthorityPerEpochStore,
2367        info: &CheckpointSignatureMessage,
2368    ) -> IotaResult;
2369
2370    fn notify_checkpoint(&self) -> IotaResult;
2371}
2372
2373enum CheckpointServiceState {
2374    Unstarted(
2375        Box<(
2376            CheckpointBuilder,
2377            CheckpointAggregator,
2378            CheckpointStateHasher,
2379        )>,
2380    ),
2381    Started,
2382}
2383
2384impl CheckpointServiceState {
2385    fn take_unstarted(
2386        &mut self,
2387    ) -> (
2388        CheckpointBuilder,
2389        CheckpointAggregator,
2390        CheckpointStateHasher,
2391    ) {
2392        let mut state = CheckpointServiceState::Started;
2393        std::mem::swap(self, &mut state);
2394
2395        match state {
2396            CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1, tup.2),
2397            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2398        }
2399    }
2400}
2401
2402pub struct CheckpointService {
2403    tables: Arc<CheckpointStore>,
2404    notify_builder: Arc<Notify>,
2405    notify_aggregator: Arc<Notify>,
2406    last_signature_index: Mutex<u64>,
2407    // A notification for the current highest built sequence number.
2408    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2409    // The highest sequence number that had already been built at the time CheckpointService
2410    // was constructed
2411    highest_previously_built_seq: CheckpointSequenceNumber,
2412    metrics: Arc<CheckpointMetrics>,
2413    state: Mutex<CheckpointServiceState>,
2414}
2415
2416impl CheckpointService {
2417    /// Spawns the checkpoint service, initializing and starting the checkpoint
2418    /// builder and aggregator tasks.
2419    /// Constructs a new CheckpointService in an un-started state.
2420    pub fn build(
2421        state: Arc<AuthorityState>,
2422        checkpoint_store: Arc<CheckpointStore>,
2423        epoch_store: Arc<AuthorityPerEpochStore>,
2424        effects_store: Arc<dyn TransactionCacheRead>,
2425        global_state_hasher: Weak<GlobalStateHasher>,
2426        checkpoint_output: Box<dyn CheckpointOutput>,
2427        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2428        metrics: Arc<CheckpointMetrics>,
2429        max_transactions_per_checkpoint: usize,
2430        max_checkpoint_size_bytes: usize,
2431    ) -> Arc<Self> {
2432        info!(
2433            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2434        );
2435        let notify_builder = Arc::new(Notify::new());
2436        let notify_aggregator = Arc::new(Notify::new());
2437
2438        // We may have built higher checkpoint numbers before restarting.
2439        let highest_previously_built_seq = checkpoint_store
2440            .get_latest_locally_computed_checkpoint()
2441            .expect("failed to get latest locally computed checkpoint")
2442            .map(|s| s.sequence_number)
2443            .unwrap_or(0);
2444
2445        let highest_currently_built_seq =
2446            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2447                .expect("epoch should not have ended")
2448                .map(|(seq, _)| seq)
2449                .unwrap_or(0);
2450
2451        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2452
2453        let aggregator = CheckpointAggregator::new(
2454            checkpoint_store.clone(),
2455            epoch_store.clone(),
2456            notify_aggregator.clone(),
2457            certified_checkpoint_output,
2458            state.clone(),
2459            metrics.clone(),
2460        );
2461
2462        let (send_to_hasher, receive_from_builder) = mpsc::channel(16);
2463
2464        let ckpt_state_hasher = CheckpointStateHasher::new(
2465            epoch_store.clone(),
2466            global_state_hasher.clone(),
2467            receive_from_builder,
2468        );
2469
2470        let builder = CheckpointBuilder::new(
2471            state,
2472            checkpoint_store.clone(),
2473            epoch_store.clone(),
2474            notify_builder.clone(),
2475            effects_store,
2476            global_state_hasher,
2477            send_to_hasher,
2478            checkpoint_output,
2479            notify_aggregator.clone(),
2480            highest_currently_built_seq_tx.clone(),
2481            metrics.clone(),
2482            max_transactions_per_checkpoint,
2483            max_checkpoint_size_bytes,
2484        );
2485
2486        let last_signature_index = epoch_store
2487            .get_last_checkpoint_signature_index()
2488            .expect("should not cross end of epoch");
2489        let last_signature_index = Mutex::new(last_signature_index);
2490
2491        Arc::new(Self {
2492            tables: checkpoint_store,
2493            notify_builder,
2494            notify_aggregator,
2495            last_signature_index,
2496            highest_currently_built_seq_tx,
2497            highest_previously_built_seq,
2498            metrics,
2499            state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2500                builder,
2501                aggregator,
2502                ckpt_state_hasher,
2503            )))),
2504        })
2505    }
2506
2507    /// Starts the CheckpointService.
2508    ///
2509    /// This function blocks until the CheckpointBuilder re-builds all
2510    /// checkpoints that had been built before the most recent restart. You
2511    /// can think of this as a WAL replay operation. Upon startup, we may
2512    /// have a number of consensus commits and resulting checkpoints that
2513    /// were built but not committed to disk. We want to reprocess the
2514    /// commits and rebuild the checkpoints before starting normal operation.
2515    pub async fn spawn(&self) -> JoinSet<()> {
2516        let mut tasks = JoinSet::new();
2517
2518        let (builder, aggregator, state_hasher) = self.state.lock().take_unstarted();
2519        tasks.spawn(monitored_future!(builder.run()));
2520        tasks.spawn(monitored_future!(aggregator.run()));
2521        tasks.spawn(monitored_future!(state_hasher.run()));
2522
2523        // If this times out, the validator may still start up. The worst that can
2524        // happen is that we will crash later on instead of immediately. The eventual
2525        // crash would occur because we may be missing transactions that are below the
2526        // highest_synced_checkpoint watermark, which can cause a crash in
2527        // `CheckpointExecutor::extract_randomness_rounds`.
2528        if tokio::time::timeout(
2529            Duration::from_secs(120),
2530            self.wait_for_rebuilt_checkpoints(),
2531        )
2532        .await
2533        .is_err()
2534        {
2535            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2536        }
2537
2538        tasks
2539    }
2540}
2541
2542impl CheckpointService {
2543    /// Waits until all checkpoints had been built before the node restarted
2544    /// are rebuilt. This is required to preserve the invariant that all
2545    /// checkpoints (and their transactions) below the
2546    /// highest_synced_checkpoint watermark are available. Once the
2547    /// checkpoints are constructed, we can be sure that the transactions
2548    /// have also been executed.
2549    pub async fn wait_for_rebuilt_checkpoints(&self) {
2550        let highest_previously_built_seq = self.highest_previously_built_seq;
2551        let mut rx = self.highest_currently_built_seq_tx.subscribe();
2552        let mut highest_currently_built_seq = *rx.borrow_and_update();
2553        info!(
2554            "Waiting for checkpoints to be rebuilt, previously built seq: \
2555            {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2556        );
2557        loop {
2558            if highest_currently_built_seq >= highest_previously_built_seq {
2559                info!("Checkpoint rebuild complete");
2560                break;
2561            }
2562            rx.changed().await.unwrap();
2563            highest_currently_built_seq = *rx.borrow_and_update();
2564        }
2565    }
2566
2567    #[cfg(test)]
2568    fn write_and_notify_checkpoint_for_testing(
2569        &self,
2570        epoch_store: &AuthorityPerEpochStore,
2571        checkpoint: PendingCheckpoint,
2572    ) -> IotaResult {
2573        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2574
2575        let mut output = ConsensusCommitOutput::new(0);
2576        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2577        output.set_default_commit_stats_for_testing();
2578        epoch_store.push_consensus_output_for_tests(output);
2579        self.notify_checkpoint()?;
2580        Ok(())
2581    }
2582}
2583
2584impl CheckpointServiceNotify for CheckpointService {
2585    fn notify_checkpoint_signature(
2586        &self,
2587        epoch_store: &AuthorityPerEpochStore,
2588        info: &CheckpointSignatureMessage,
2589    ) -> IotaResult {
2590        let sequence = info.summary.sequence_number;
2591        let signer = info.summary.auth_sig().authority.concise();
2592
2593        if let Some(highest_verified_checkpoint) = self
2594            .tables
2595            .get_highest_verified_checkpoint()?
2596            .map(|x| *x.sequence_number())
2597        {
2598            if sequence <= highest_verified_checkpoint {
2599                trace!(
2600                    checkpoint_seq = sequence,
2601                    "Ignore checkpoint signature from {} - already certified", signer,
2602                );
2603                self.metrics
2604                    .last_ignored_checkpoint_signature_received
2605                    .set(sequence as i64);
2606                return Ok(());
2607            }
2608        }
2609        trace!(
2610            checkpoint_seq = sequence,
2611            "Received checkpoint signature, digest {} from {}",
2612            info.summary.digest(),
2613            signer,
2614        );
2615        self.metrics
2616            .last_received_checkpoint_signatures
2617            .with_label_values(&[&signer.to_string()])
2618            .set(sequence as i64);
2619        // While it can be tempting to make last_signature_index into AtomicU64, this
2620        // won't work We need to make sure we write to `pending_signatures` and
2621        // trigger `notify_aggregator` without race conditions
2622        let mut index = self.last_signature_index.lock();
2623        *index += 1;
2624        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2625        self.notify_aggregator.notify_one();
2626        Ok(())
2627    }
2628
2629    fn notify_checkpoint(&self) -> IotaResult {
2630        self.notify_builder.notify_one();
2631        Ok(())
2632    }
2633}
2634
2635#[iota_macros::with_checked_arithmetic]
2636mod checked {
2637    use iota_sdk_types::GasCostSummary;
2638    use iota_types::effects::{TransactionEffects, TransactionEffectsAPI};
2639    use itertools::MultiUnzip;
2640
2641    #[expect(clippy::type_complexity)]
2642    pub fn new_gas_cost_summary_from_txn_effects<'a>(
2643        transactions: impl Iterator<Item = &'a TransactionEffects>,
2644    ) -> GasCostSummary {
2645        let (
2646            storage_costs,
2647            computation_costs,
2648            computation_costs_burned,
2649            storage_rebates,
2650            non_refundable_storage_fee,
2651        ): (Vec<u64>, Vec<u64>, Vec<u64>, Vec<u64>, Vec<u64>) = transactions
2652            .map(|e| {
2653                (
2654                    e.gas_cost_summary().storage_cost,
2655                    e.gas_cost_summary().computation_cost,
2656                    e.gas_cost_summary().computation_cost_burned,
2657                    e.gas_cost_summary().storage_rebate,
2658                    e.gas_cost_summary().non_refundable_storage_fee,
2659                )
2660            })
2661            .multiunzip();
2662
2663        GasCostSummary::new(
2664            computation_costs.iter().sum(),
2665            computation_costs_burned.iter().sum(),
2666            storage_costs.iter().sum(),
2667            storage_rebates.iter().sum(),
2668            non_refundable_storage_fee.iter().sum(),
2669        )
2670    }
2671}
2672// test helper
2673pub struct CheckpointServiceNoop {}
2674impl CheckpointServiceNotify for CheckpointServiceNoop {
2675    fn notify_checkpoint_signature(
2676        &self,
2677        _: &AuthorityPerEpochStore,
2678        _: &CheckpointSignatureMessage,
2679    ) -> IotaResult {
2680        Ok(())
2681    }
2682
2683    fn notify_checkpoint(&self) -> IotaResult {
2684        Ok(())
2685    }
2686}
2687
2688#[cfg(test)]
2689mod tests {
2690    use std::{
2691        collections::{BTreeMap, HashMap},
2692        ops::Deref,
2693    };
2694
2695    use futures::{FutureExt as _, future::BoxFuture};
2696    use iota_macros::sim_test;
2697    use iota_protocol_config::{Chain, ProtocolConfig};
2698    use iota_sdk_types::{Identifier, ObjectId, Owner, move_package::MovePackage};
2699    use iota_types::{
2700        base_types::{SequenceNumber, TransactionEffectsDigest},
2701        crypto::Signature,
2702        effects::{
2703            TransactionEffects, TransactionEffectsAPIForTesting, TransactionEffectsExt,
2704            TransactionEvents,
2705        },
2706        messages_checkpoint::SignedCheckpointSummary,
2707        object,
2708        transaction::{GenesisObject, VerifiedTransaction},
2709    };
2710    use tokio::sync::mpsc;
2711
2712    use super::*;
2713    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2714
2715    #[sim_test]
2716    pub async fn checkpoint_builder_test() {
2717        telemetry_subscribers::init_for_testing();
2718
2719        let mut protocol_config =
2720            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2721        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2722        let state = TestAuthorityBuilder::new()
2723            .with_protocol_config(protocol_config)
2724            .build()
2725            .await;
2726
2727        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2728        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2729            vec![GenesisObject::new(
2730                object::Data::Package(
2731                    MovePackage::new(
2732                        ObjectId::random(),
2733                        SequenceNumber::default(),
2734                        BTreeMap::from([(Identifier::new_unchecked("m"), vec![0u8; 40000])]),
2735                        100_000,
2736                        // no modules so empty type_origin_table as no types are defined in this
2737                        // package
2738                        Vec::new(),
2739                        // no modules so empty linkage_table as no dependencies of this package
2740                        // exist
2741                        BTreeMap::new(),
2742                    )
2743                    .unwrap(),
2744                ),
2745                Owner::Immutable,
2746            )],
2747            vec![],
2748        );
2749        for i in 0..15 {
2750            state
2751                .database_for_testing()
2752                .perpetual_tables
2753                .transactions
2754                .insert(&d(i), dummy_tx.serializable_ref())
2755                .unwrap();
2756        }
2757        for i in 15..20 {
2758            state
2759                .database_for_testing()
2760                .perpetual_tables
2761                .transactions
2762                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2763                .unwrap();
2764        }
2765
2766        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2767        commit_cert_for_test(
2768            &mut store,
2769            state.clone(),
2770            d(1),
2771            vec![d(2), d(3)],
2772            GasCostSummary::new(11, 11, 12, 11, 1),
2773        );
2774        commit_cert_for_test(
2775            &mut store,
2776            state.clone(),
2777            d(2),
2778            vec![d(3), d(4)],
2779            GasCostSummary::new(21, 21, 22, 21, 1),
2780        );
2781        commit_cert_for_test(
2782            &mut store,
2783            state.clone(),
2784            d(3),
2785            vec![],
2786            GasCostSummary::new(31, 31, 32, 31, 1),
2787        );
2788        commit_cert_for_test(
2789            &mut store,
2790            state.clone(),
2791            d(4),
2792            vec![],
2793            GasCostSummary::new(41, 41, 42, 41, 1),
2794        );
2795        for i in [5, 6, 7, 10, 11, 12, 13] {
2796            commit_cert_for_test(
2797                &mut store,
2798                state.clone(),
2799                d(i),
2800                vec![],
2801                GasCostSummary::new(41, 41, 42, 41, 1),
2802            );
2803        }
2804        for i in [15, 16, 17] {
2805            commit_cert_for_test(
2806                &mut store,
2807                state.clone(),
2808                d(i),
2809                vec![],
2810                GasCostSummary::new(51, 51, 52, 51, 1),
2811            );
2812        }
2813        let all_digests: Vec<_> = store.keys().copied().collect();
2814        for digest in all_digests {
2815            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2816            state
2817                .epoch_store_for_testing()
2818                .test_insert_user_signature(digest, vec![signature]);
2819        }
2820
2821        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2822        let (certified_output, mut certified_result) =
2823            mpsc::channel::<CertifiedCheckpointSummary>(10);
2824        let store = Arc::new(store);
2825
2826        let tmp_dir = iota_common::tempdir();
2827        let checkpoint_store = CheckpointStore::new(tmp_dir.path());
2828        let epoch_store = state.epoch_store_for_testing();
2829
2830        let global_state_hasher = Arc::new(GlobalStateHasher::new_for_tests(
2831            state.get_global_state_hash_store().clone(),
2832        ));
2833
2834        let checkpoint_service = CheckpointService::build(
2835            state.clone(),
2836            checkpoint_store,
2837            epoch_store.clone(),
2838            store,
2839            Arc::downgrade(&global_state_hasher),
2840            Box::new(output),
2841            Box::new(certified_output),
2842            CheckpointMetrics::new_for_tests(),
2843            3,
2844            100_000,
2845        );
2846        let _tasks = checkpoint_service.spawn().await;
2847
2848        checkpoint_service
2849            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2850            .unwrap();
2851        checkpoint_service
2852            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2853            .unwrap();
2854        checkpoint_service
2855            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2856            .unwrap();
2857        checkpoint_service
2858            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2859            .unwrap();
2860        checkpoint_service
2861            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2862            .unwrap();
2863        checkpoint_service
2864            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2865            .unwrap();
2866
2867        let (c1c, c1s) = result.recv().await.unwrap();
2868        let (c2c, c2s) = result.recv().await.unwrap();
2869
2870        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2871        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2872        assert_eq!(c1t, vec![d(4)]);
2873        assert_eq!(c1s.previous_digest, None);
2874        assert_eq!(c1s.sequence_number, 0);
2875        assert_eq!(
2876            c1s.epoch_rolling_gas_cost_summary,
2877            GasCostSummary::new(41, 41, 42, 41, 1)
2878        );
2879
2880        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2881        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2882        assert_eq!(c2s.sequence_number, 1);
2883        assert_eq!(
2884            c2s.epoch_rolling_gas_cost_summary,
2885            GasCostSummary::new(104, 104, 108, 104, 4)
2886        );
2887
2888        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2889        // Verify that we split into 2 checkpoints.
2890        let (c3c, c3s) = result.recv().await.unwrap();
2891        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2892        let (c4c, c4s) = result.recv().await.unwrap();
2893        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2894        assert_eq!(c3s.sequence_number, 2);
2895        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2896        assert_eq!(c4s.sequence_number, 3);
2897        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2898        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2899        assert_eq!(c4t, vec![d(13)]);
2900
2901        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2902        // max. Verify that we split into 2 checkpoints.
2903        let (c5c, c5s) = result.recv().await.unwrap();
2904        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2905        let (c6c, c6s) = result.recv().await.unwrap();
2906        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2907        assert_eq!(c5s.sequence_number, 4);
2908        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2909        assert_eq!(c6s.sequence_number, 5);
2910        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2911        assert_eq!(c5t, vec![d(15), d(16)]);
2912        assert_eq!(c6t, vec![d(17)]);
2913
2914        // Pending at index 4 was too soon after the prior one and should be coalesced
2915        // into the next one.
2916        let (c7c, c7s) = result.recv().await.unwrap();
2917        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2918        assert_eq!(c7t, vec![d(5), d(6)]);
2919        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2920        assert_eq!(c7s.sequence_number, 6);
2921
2922        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2923        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2924
2925        checkpoint_service
2926            .notify_checkpoint_signature(
2927                &epoch_store,
2928                &CheckpointSignatureMessage { summary: c2ss },
2929            )
2930            .unwrap();
2931        checkpoint_service
2932            .notify_checkpoint_signature(
2933                &epoch_store,
2934                &CheckpointSignatureMessage { summary: c1ss },
2935            )
2936            .unwrap();
2937
2938        let c1sc = certified_result.recv().await.unwrap();
2939        let c2sc = certified_result.recv().await.unwrap();
2940        assert_eq!(c1sc.sequence_number, 0);
2941        assert_eq!(c2sc.sequence_number, 1);
2942    }
2943
2944    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2945        fn try_notify_read_executed_effects(
2946            &self,
2947            digests: &[TransactionDigest],
2948        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2949            std::future::ready(Ok(digests
2950                .iter()
2951                .map(|d| self.get(d).expect("effects not found").clone())
2952                .collect()))
2953            .boxed()
2954        }
2955
2956        fn try_notify_read_executed_effects_digests(
2957            &self,
2958            digests: &[TransactionDigest],
2959        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2960            std::future::ready(Ok(digests
2961                .iter()
2962                .map(|d| {
2963                    self.get(d)
2964                        .map(|fx| fx.digest())
2965                        .expect("effects not found")
2966                })
2967                .collect()))
2968            .boxed()
2969        }
2970
2971        fn try_multi_get_executed_effects(
2972            &self,
2973            digests: &[TransactionDigest],
2974        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2975            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2976        }
2977
2978        // Unimplemented methods - its unfortunate to have this big blob of useless
2979        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2980        // for these tests, as it caused a ton of complication in non-test code.
2981        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2982        // implementors).
2983
2984        fn try_multi_get_transaction_blocks(
2985            &self,
2986            _: &[TransactionDigest],
2987        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2988            unimplemented!()
2989        }
2990
2991        fn try_multi_get_executed_effects_digests(
2992            &self,
2993            _: &[TransactionDigest],
2994        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2995            unimplemented!()
2996        }
2997
2998        fn try_multi_get_effects(
2999            &self,
3000            _: &[TransactionEffectsDigest],
3001        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
3002            unimplemented!()
3003        }
3004
3005        fn try_multi_get_events(
3006            &self,
3007            _: &[TransactionDigest],
3008        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
3009            unimplemented!()
3010        }
3011    }
3012
3013    #[async_trait::async_trait]
3014    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
3015        async fn checkpoint_created(
3016            &self,
3017            summary: &CheckpointSummary,
3018            contents: &CheckpointContents,
3019            _epoch_store: &Arc<AuthorityPerEpochStore>,
3020            _checkpoint_store: &Arc<CheckpointStore>,
3021        ) -> IotaResult {
3022            self.try_send((contents.clone(), summary.clone())).unwrap();
3023            Ok(())
3024        }
3025    }
3026
3027    #[async_trait::async_trait]
3028    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
3029        async fn certified_checkpoint_created(
3030            &self,
3031            summary: &CertifiedCheckpointSummary,
3032        ) -> IotaResult {
3033            self.try_send(summary.clone()).unwrap();
3034            Ok(())
3035        }
3036    }
3037
3038    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3039        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
3040            roots: t
3041                .into_iter()
3042                .map(|t| TransactionKey::Digest(d(t)))
3043                .collect(),
3044            details: PendingCheckpointInfo {
3045                timestamp_ms,
3046                last_of_epoch: false,
3047                checkpoint_height: i,
3048            },
3049        })
3050    }
3051
3052    fn d(i: u8) -> TransactionDigest {
3053        let mut bytes: [u8; 32] = Default::default();
3054        bytes[0] = i;
3055        TransactionDigest::new(bytes)
3056    }
3057
3058    fn e(
3059        transaction_digest: TransactionDigest,
3060        dependencies: Vec<TransactionDigest>,
3061        gas_cost_summary: GasCostSummary,
3062    ) -> TransactionEffects {
3063        let mut effects = TransactionEffects::new_empty_v1(transaction_digest);
3064        *effects.dependencies_mut_for_testing() = dependencies;
3065        *effects.gas_cost_summary_mut_for_testing() = gas_cost_summary;
3066        effects
3067    }
3068
3069    fn commit_cert_for_test(
3070        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3071        state: Arc<AuthorityState>,
3072        digest: TransactionDigest,
3073        dependencies: Vec<TransactionDigest>,
3074        gas_cost_summary: GasCostSummary,
3075    ) {
3076        let epoch_store = state.epoch_store_for_testing();
3077        let effects = e(digest, dependencies, gas_cost_summary);
3078        store.insert(digest, effects);
3079        epoch_store
3080            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
3081            .expect("Inserting cert fx and sigs should not fail");
3082    }
3083}