iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27    committee::StakeUnit,
28    crypto::AuthorityStrongQuorumSignInfo,
29    digests::{CheckpointContentsDigest, CheckpointDigest},
30    effects::{TransactionEffects, TransactionEffectsAPI},
31    error::{IotaError, IotaResult},
32    event::SystemEpochInfoEvent,
33    executable_transaction::VerifiedExecutableTransaction,
34    gas::GasCostSummary,
35    iota_system_state::{
36        IotaSystemState, IotaSystemStateTrait,
37        epoch_start_iota_system_state::EpochStartSystemStateTrait,
38    },
39    message_envelope::Message,
40    messages_checkpoint::{
41        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
42        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
43        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
44        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
45        VerifiedCheckpointContents,
46    },
47    messages_consensus::ConsensusTransactionKey,
48    signature::GenericSignature,
49    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
50};
51use itertools::Itertools;
52use nonempty::NonEmpty;
53use parking_lot::Mutex;
54use rand::{rngs::OsRng, seq::SliceRandom};
55use serde::{Deserialize, Serialize};
56use tokio::{
57    sync::{Notify, watch},
58    task::JoinSet,
59    time::timeout,
60};
61use tracing::{debug, error, info, instrument, trace, warn};
62use typed_store::{
63    DBMapUtils, Map, TypedStoreError,
64    rocks::{DBMap, MetricConf},
65    traits::{TableSummary, TypedStoreDebug},
66};
67
68pub use crate::checkpoints::{
69    checkpoint_output::{
70        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
71    },
72    metrics::CheckpointMetrics,
73};
74use crate::{
75    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
76    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
77    checkpoints::{
78        causal_order::CausalOrder,
79        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
80    },
81    consensus_handler::SequencedConsensusTransactionKey,
82    execution_cache::TransactionCacheRead,
83    stake_aggregator::{InsertResult, MultiStakeAggregator},
84    state_accumulator::StateAccumulator,
85};
86
87pub type CheckpointHeight = u64;
88
89pub struct EpochStats {
90    pub checkpoint_count: u64,
91    pub transaction_count: u64,
92    pub total_gas_reward: u64,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct PendingCheckpointInfo {
97    pub timestamp_ms: CheckpointTimestamp,
98    pub last_of_epoch: bool,
99    pub checkpoint_height: CheckpointHeight,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub enum PendingCheckpoint {
104    // This is an enum for future upgradability, though at the moment there is only one variant.
105    V1(PendingCheckpointContentsV1),
106}
107
108#[derive(Clone, Debug, Serialize, Deserialize)]
109pub struct PendingCheckpointContentsV1 {
110    pub roots: Vec<TransactionKey>,
111    pub details: PendingCheckpointInfo,
112}
113
114impl PendingCheckpoint {
115    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
116        match self {
117            PendingCheckpoint::V1(contents) => contents,
118        }
119    }
120
121    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
122        match self {
123            PendingCheckpoint::V1(contents) => contents,
124        }
125    }
126
127    pub fn roots(&self) -> &Vec<TransactionKey> {
128        &self.as_v1().roots
129    }
130
131    pub fn details(&self) -> &PendingCheckpointInfo {
132        &self.as_v1().details
133    }
134
135    pub fn height(&self) -> CheckpointHeight {
136        self.details().checkpoint_height
137    }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141pub struct BuilderCheckpointSummary {
142    pub summary: CheckpointSummary,
143    // Height at which this checkpoint summary was built. None for genesis checkpoint
144    pub checkpoint_height: Option<CheckpointHeight>,
145    pub position_in_commit: usize,
146}
147
148#[derive(DBMapUtils)]
149pub struct CheckpointStoreTables {
150    /// Maps checkpoint contents digest to checkpoint contents
151    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
152
153    /// Maps checkpoint contents digest to checkpoint sequence number
154    pub(crate) checkpoint_sequence_by_contents_digest:
155        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
156
157    /// Stores entire checkpoint contents from state sync, indexed by sequence
158    /// number, for efficient reads of full checkpoints. Entries from this
159    /// table are deleted after state accumulation has completed.
160    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
161
162    /// Stores certified checkpoints
163    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
164    /// Map from checkpoint digest to certified checkpoint
165    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
166
167    /// Store locally computed checkpoint summaries so that we can detect forks
168    /// and log useful information. Can be pruned as soon as we verify that
169    /// we are in agreement with the latest certified checkpoint.
170    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
171
172    /// A map from epoch ID to the sequence number of the last checkpoint in
173    /// that epoch.
174    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
175
176    /// Watermarks used to determine the highest verified, fully synced, and
177    /// fully executed checkpoints
178    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
179}
180
181impl CheckpointStoreTables {
182    pub fn new(path: &Path, metric_name: &'static str) -> Self {
183        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
184    }
185    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
186        Self::get_read_only_handle(
187            path.to_path_buf(),
188            None,
189            None,
190            MetricConf::new("checkpoint_readonly"),
191        )
192    }
193}
194
195pub struct CheckpointStore {
196    pub(crate) tables: CheckpointStoreTables,
197    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
198    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
199}
200
201impl CheckpointStore {
202    pub fn new(path: &Path) -> Arc<Self> {
203        let tables = CheckpointStoreTables::new(path, "checkpoint");
204        Arc::new(Self {
205            tables,
206            synced_checkpoint_notify_read: NotifyRead::new(),
207            executed_checkpoint_notify_read: NotifyRead::new(),
208        })
209    }
210
211    pub fn new_for_tests() -> Arc<Self> {
212        let ckpt_dir = tempfile::tempdir().unwrap();
213        CheckpointStore::new(ckpt_dir.path())
214    }
215
216    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
217        let tables = CheckpointStoreTables::new(path, "db_checkpoint");
218        Arc::new(Self {
219            tables,
220            synced_checkpoint_notify_read: NotifyRead::new(),
221            executed_checkpoint_notify_read: NotifyRead::new(),
222        })
223    }
224
225    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
226        CheckpointStoreTables::open_readonly(path)
227    }
228
229    #[instrument(level = "info", skip_all)]
230    pub fn insert_genesis_checkpoint(
231        &self,
232        checkpoint: VerifiedCheckpoint,
233        contents: CheckpointContents,
234        epoch_store: &AuthorityPerEpochStore,
235    ) {
236        assert_eq!(
237            checkpoint.epoch(),
238            0,
239            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
240        );
241        assert_eq!(
242            *checkpoint.sequence_number(),
243            0,
244            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
245        );
246
247        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
248        // already
249        if self
250            .get_checkpoint_by_digest(checkpoint.digest())
251            .unwrap()
252            .is_none()
253        {
254            if epoch_store.epoch() == checkpoint.epoch {
255                epoch_store
256                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
257                    .unwrap();
258            } else {
259                debug!(
260                    validator_epoch =% epoch_store.epoch(),
261                    genesis_epoch =% checkpoint.epoch(),
262                    "Not inserting checkpoint builder data for genesis checkpoint",
263                );
264            }
265            self.insert_checkpoint_contents(contents).unwrap();
266            self.insert_verified_checkpoint(&checkpoint).unwrap();
267            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
268        }
269    }
270
271    pub fn get_checkpoint_by_digest(
272        &self,
273        digest: &CheckpointDigest,
274    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
275        self.tables
276            .checkpoint_by_digest
277            .get(digest)
278            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
279    }
280
281    pub fn get_checkpoint_by_sequence_number(
282        &self,
283        sequence_number: CheckpointSequenceNumber,
284    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
285        self.tables
286            .certified_checkpoints
287            .get(&sequence_number)
288            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
289    }
290
291    pub fn get_locally_computed_checkpoint(
292        &self,
293        sequence_number: CheckpointSequenceNumber,
294    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
295        self.tables
296            .locally_computed_checkpoints
297            .get(&sequence_number)
298    }
299
300    pub fn get_sequence_number_by_contents_digest(
301        &self,
302        digest: &CheckpointContentsDigest,
303    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
304        self.tables
305            .checkpoint_sequence_by_contents_digest
306            .get(digest)
307    }
308
309    pub fn delete_contents_digest_sequence_number_mapping(
310        &self,
311        digest: &CheckpointContentsDigest,
312    ) -> Result<(), TypedStoreError> {
313        self.tables
314            .checkpoint_sequence_by_contents_digest
315            .remove(digest)
316    }
317
318    pub fn get_latest_certified_checkpoint(
319        &self,
320    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
321        Ok(self
322            .tables
323            .certified_checkpoints
324            .reversed_safe_iter_with_bounds(None, None)?
325            .next()
326            .transpose()?
327            .map(|(_, v)| v.into()))
328    }
329
330    pub fn get_latest_locally_computed_checkpoint(
331        &self,
332    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
333        Ok(self
334            .tables
335            .locally_computed_checkpoints
336            .reversed_safe_iter_with_bounds(None, None)?
337            .next()
338            .transpose()?
339            .map(|(_, v)| v))
340    }
341
342    pub fn multi_get_checkpoint_by_sequence_number(
343        &self,
344        sequence_numbers: &[CheckpointSequenceNumber],
345    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
346        let checkpoints = self
347            .tables
348            .certified_checkpoints
349            .multi_get(sequence_numbers)?
350            .into_iter()
351            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
352            .collect();
353
354        Ok(checkpoints)
355    }
356
357    pub fn multi_get_checkpoint_content(
358        &self,
359        contents_digest: &[CheckpointContentsDigest],
360    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
361        self.tables.checkpoint_content.multi_get(contents_digest)
362    }
363
364    pub fn get_highest_verified_checkpoint(
365        &self,
366    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
367        let highest_verified = if let Some(highest_verified) = self
368            .tables
369            .watermarks
370            .get(&CheckpointWatermark::HighestVerified)?
371        {
372            highest_verified
373        } else {
374            return Ok(None);
375        };
376        self.get_checkpoint_by_digest(&highest_verified.1)
377    }
378
379    pub fn get_highest_synced_checkpoint(
380        &self,
381    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
382        let highest_synced = if let Some(highest_synced) = self
383            .tables
384            .watermarks
385            .get(&CheckpointWatermark::HighestSynced)?
386        {
387            highest_synced
388        } else {
389            return Ok(None);
390        };
391        self.get_checkpoint_by_digest(&highest_synced.1)
392    }
393
394    pub fn get_highest_synced_checkpoint_seq_number(
395        &self,
396    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
397        if let Some(highest_synced) = self
398            .tables
399            .watermarks
400            .get(&CheckpointWatermark::HighestSynced)?
401        {
402            Ok(Some(highest_synced.0))
403        } else {
404            Ok(None)
405        }
406    }
407
408    pub fn get_highest_executed_checkpoint_seq_number(
409        &self,
410    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
411        if let Some(highest_executed) = self
412            .tables
413            .watermarks
414            .get(&CheckpointWatermark::HighestExecuted)?
415        {
416            Ok(Some(highest_executed.0))
417        } else {
418            Ok(None)
419        }
420    }
421
422    pub fn get_highest_executed_checkpoint(
423        &self,
424    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
425        let highest_executed = if let Some(highest_executed) = self
426            .tables
427            .watermarks
428            .get(&CheckpointWatermark::HighestExecuted)?
429        {
430            highest_executed
431        } else {
432            return Ok(None);
433        };
434        self.get_checkpoint_by_digest(&highest_executed.1)
435    }
436
437    pub fn get_highest_pruned_checkpoint_seq_number(
438        &self,
439    ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
440        Ok(self
441            .tables
442            .watermarks
443            .get(&CheckpointWatermark::HighestPruned)?
444            .unwrap_or_default()
445            .0)
446    }
447
448    pub fn get_checkpoint_contents(
449        &self,
450        digest: &CheckpointContentsDigest,
451    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
452        self.tables.checkpoint_content.get(digest)
453    }
454
455    pub fn get_full_checkpoint_contents_by_sequence_number(
456        &self,
457        seq: CheckpointSequenceNumber,
458    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
459        self.tables.full_checkpoint_content.get(&seq)
460    }
461
462    fn prune_local_summaries(&self) -> IotaResult {
463        if let Some((last_local_summary, _)) = self
464            .tables
465            .locally_computed_checkpoints
466            .reversed_safe_iter_with_bounds(None, None)?
467            .next()
468            .transpose()?
469        {
470            let mut batch = self.tables.locally_computed_checkpoints.batch();
471            batch.schedule_delete_range(
472                &self.tables.locally_computed_checkpoints,
473                &0,
474                &last_local_summary,
475            )?;
476            batch.write()?;
477            info!("Pruned local summaries up to {:?}", last_local_summary);
478        }
479        Ok(())
480    }
481
482    #[instrument(level = "trace", skip_all)]
483    fn check_for_checkpoint_fork(
484        &self,
485        local_checkpoint: &CheckpointSummary,
486        verified_checkpoint: &VerifiedCheckpoint,
487    ) {
488        if local_checkpoint != verified_checkpoint.data() {
489            let verified_contents = self
490                .get_checkpoint_contents(&verified_checkpoint.content_digest)
491                .map(|opt_contents| {
492                    opt_contents
493                        .map(|contents| format!("{contents:?}"))
494                        .unwrap_or_else(|| {
495                            format!(
496                                "Verified checkpoint contents not found, digest: {:?}",
497                                verified_checkpoint.content_digest,
498                            )
499                        })
500                })
501                .map_err(|e| {
502                    format!(
503                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
504                        verified_checkpoint.content_digest, e
505                    )
506                })
507                .unwrap_or_else(|err_msg| err_msg);
508
509            let local_contents = self
510                .get_checkpoint_contents(&local_checkpoint.content_digest)
511                .map(|opt_contents| {
512                    opt_contents
513                        .map(|contents| format!("{contents:?}"))
514                        .unwrap_or_else(|| {
515                            format!(
516                                "Local checkpoint contents not found, digest: {:?}",
517                                local_checkpoint.content_digest
518                            )
519                        })
520                })
521                .map_err(|e| {
522                    format!(
523                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
524                        local_checkpoint.content_digest, e
525                    )
526                })
527                .unwrap_or_else(|err_msg| err_msg);
528
529            // checkpoint contents may be too large for panic message.
530            error!(
531                verified_checkpoint = ?verified_checkpoint.data(),
532                ?verified_contents,
533                ?local_checkpoint,
534                ?local_contents,
535                "Local checkpoint fork detected!",
536            );
537            fatal!(
538                "Local checkpoint fork detected for sequence number: {}",
539                local_checkpoint.sequence_number()
540            );
541        }
542    }
543
544    // Called by consensus (ConsensusAggregator).
545    // Different from `insert_verified_checkpoint`, it does not touch
546    // the highest_verified_checkpoint watermark such that state sync
547    // will have a chance to process this checkpoint and perform some
548    // state-sync only things.
549    pub fn insert_certified_checkpoint(
550        &self,
551        checkpoint: &VerifiedCheckpoint,
552    ) -> Result<(), TypedStoreError> {
553        debug!(
554            checkpoint_seq = checkpoint.sequence_number(),
555            "Inserting certified checkpoint",
556        );
557        let mut batch = self.tables.certified_checkpoints.batch();
558        batch
559            .insert_batch(
560                &self.tables.certified_checkpoints,
561                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
562            )?
563            .insert_batch(
564                &self.tables.checkpoint_by_digest,
565                [(checkpoint.digest(), checkpoint.serializable_ref())],
566            )?;
567        if checkpoint.next_epoch_committee().is_some() {
568            batch.insert_batch(
569                &self.tables.epoch_last_checkpoint_map,
570                [(&checkpoint.epoch(), checkpoint.sequence_number())],
571            )?;
572        }
573        batch.write()?;
574
575        if let Some(local_checkpoint) = self
576            .tables
577            .locally_computed_checkpoints
578            .get(checkpoint.sequence_number())?
579        {
580            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
581        }
582
583        Ok(())
584    }
585
586    // Called by state sync, apart from inserting the checkpoint and updating
587    // related tables, it also bumps the highest_verified_checkpoint watermark.
588    #[instrument(level = "trace", skip_all)]
589    pub fn insert_verified_checkpoint(
590        &self,
591        checkpoint: &VerifiedCheckpoint,
592    ) -> Result<(), TypedStoreError> {
593        self.insert_certified_checkpoint(checkpoint)?;
594        self.update_highest_verified_checkpoint(checkpoint)
595    }
596
597    pub fn update_highest_verified_checkpoint(
598        &self,
599        checkpoint: &VerifiedCheckpoint,
600    ) -> Result<(), TypedStoreError> {
601        if Some(*checkpoint.sequence_number())
602            > self
603                .get_highest_verified_checkpoint()?
604                .map(|x| *x.sequence_number())
605        {
606            debug!(
607                checkpoint_seq = checkpoint.sequence_number(),
608                "Updating highest verified checkpoint",
609            );
610            self.tables.watermarks.insert(
611                &CheckpointWatermark::HighestVerified,
612                &(*checkpoint.sequence_number(), *checkpoint.digest()),
613            )?;
614        }
615
616        Ok(())
617    }
618
619    pub fn update_highest_synced_checkpoint(
620        &self,
621        checkpoint: &VerifiedCheckpoint,
622    ) -> Result<(), TypedStoreError> {
623        let seq = *checkpoint.sequence_number();
624        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
625        self.tables.watermarks.insert(
626            &CheckpointWatermark::HighestSynced,
627            &(seq, *checkpoint.digest()),
628        )?;
629        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
630        Ok(())
631    }
632
633    async fn notify_read_checkpoint_watermark<F>(
634        &self,
635        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
636        seq: CheckpointSequenceNumber,
637        get_watermark: F,
638    ) -> VerifiedCheckpoint
639    where
640        F: Fn() -> Option<CheckpointSequenceNumber>,
641    {
642        type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
643
644        notify_read
645            .read(&[seq], |seqs| {
646                let seq = seqs[0];
647                let Some(highest) = get_watermark() else {
648                    return Ok(vec![None]) as ReadResult;
649                };
650                if highest < seq {
651                    return Ok(vec![None]) as ReadResult;
652                }
653                let checkpoint = self
654                    .get_checkpoint_by_sequence_number(seq)
655                    .expect("db error")
656                    .expect("checkpoint not found");
657                Ok(vec![Some(checkpoint)]) as ReadResult
658            })
659            .await
660            .unwrap()
661            .into_iter()
662            .next()
663            .unwrap()
664    }
665
666    pub async fn notify_read_synced_checkpoint(
667        &self,
668        seq: CheckpointSequenceNumber,
669    ) -> VerifiedCheckpoint {
670        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
671            self.get_highest_synced_checkpoint_seq_number()
672                .expect("db error")
673        })
674        .await
675    }
676
677    pub async fn notify_read_executed_checkpoint(
678        &self,
679        seq: CheckpointSequenceNumber,
680    ) -> VerifiedCheckpoint {
681        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
682            self.get_highest_executed_checkpoint_seq_number()
683                .expect("db error")
684        })
685        .await
686    }
687
688    pub fn update_highest_executed_checkpoint(
689        &self,
690        checkpoint: &VerifiedCheckpoint,
691    ) -> Result<(), TypedStoreError> {
692        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
693            if seq_number >= *checkpoint.sequence_number() {
694                return Ok(());
695            }
696            assert_eq!(
697                seq_number + 1,
698                *checkpoint.sequence_number(),
699                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
700                checkpoint.sequence_number(),
701                seq_number
702            );
703        }
704        let seq = *checkpoint.sequence_number();
705        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
706        self.tables.watermarks.insert(
707            &CheckpointWatermark::HighestExecuted,
708            &(seq, *checkpoint.digest()),
709        )?;
710        self.executed_checkpoint_notify_read
711            .notify(&seq, checkpoint);
712        Ok(())
713    }
714
715    pub fn update_highest_pruned_checkpoint(
716        &self,
717        checkpoint: &VerifiedCheckpoint,
718    ) -> Result<(), TypedStoreError> {
719        self.tables.watermarks.insert(
720            &CheckpointWatermark::HighestPruned,
721            &(*checkpoint.sequence_number(), *checkpoint.digest()),
722        )
723    }
724
725    /// Sets highest executed checkpoint to any value.
726    ///
727    /// WARNING: This method is very subtle and can corrupt the database if used
728    /// incorrectly. It should only be used in one-off cases or tests after
729    /// fully understanding the risk.
730    pub fn set_highest_executed_checkpoint_subtle(
731        &self,
732        checkpoint: &VerifiedCheckpoint,
733    ) -> Result<(), TypedStoreError> {
734        self.tables.watermarks.insert(
735            &CheckpointWatermark::HighestExecuted,
736            &(*checkpoint.sequence_number(), *checkpoint.digest()),
737        )
738    }
739
740    pub fn insert_checkpoint_contents(
741        &self,
742        contents: CheckpointContents,
743    ) -> Result<(), TypedStoreError> {
744        debug!(
745            checkpoint_seq = ?contents.digest(),
746            "Inserting checkpoint contents",
747        );
748        self.tables
749            .checkpoint_content
750            .insert(contents.digest(), &contents)
751    }
752
753    pub fn insert_verified_checkpoint_contents(
754        &self,
755        checkpoint: &VerifiedCheckpoint,
756        full_contents: VerifiedCheckpointContents,
757    ) -> Result<(), TypedStoreError> {
758        let mut batch = self.tables.full_checkpoint_content.batch();
759        batch.insert_batch(
760            &self.tables.checkpoint_sequence_by_contents_digest,
761            [(&checkpoint.content_digest, checkpoint.sequence_number())],
762        )?;
763        let full_contents = full_contents.into_inner();
764        batch.insert_batch(
765            &self.tables.full_checkpoint_content,
766            [(checkpoint.sequence_number(), &full_contents)],
767        )?;
768
769        let contents = full_contents.into_checkpoint_contents();
770        assert_eq!(&checkpoint.content_digest, contents.digest());
771
772        batch.insert_batch(
773            &self.tables.checkpoint_content,
774            [(contents.digest(), &contents)],
775        )?;
776
777        batch.write()
778    }
779
780    pub fn delete_full_checkpoint_contents(
781        &self,
782        seq: CheckpointSequenceNumber,
783    ) -> Result<(), TypedStoreError> {
784        self.tables.full_checkpoint_content.remove(&seq)
785    }
786
787    pub fn get_epoch_last_checkpoint(
788        &self,
789        epoch_id: EpochId,
790    ) -> IotaResult<Option<VerifiedCheckpoint>> {
791        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
792        let checkpoint = match seq {
793            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
794            None => None,
795        };
796        Ok(checkpoint)
797    }
798
799    pub fn insert_epoch_last_checkpoint(
800        &self,
801        epoch_id: EpochId,
802        checkpoint: &VerifiedCheckpoint,
803    ) -> IotaResult {
804        self.tables
805            .epoch_last_checkpoint_map
806            .insert(&epoch_id, checkpoint.sequence_number())?;
807        Ok(())
808    }
809
810    pub fn get_epoch_state_commitments(
811        &self,
812        epoch: EpochId,
813    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
814        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
815            checkpoint
816                .end_of_epoch_data
817                .as_ref()
818                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
819                .epoch_commitments
820                .clone()
821        });
822        Ok(commitments)
823    }
824
825    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
826    /// statistics of the epoch.
827    pub fn get_epoch_stats(
828        &self,
829        epoch: EpochId,
830        last_checkpoint: &CheckpointSummary,
831    ) -> Option<EpochStats> {
832        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
833            (0, 0)
834        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
835            (
836                checkpoint.sequence_number + 1,
837                checkpoint.network_total_transactions,
838            )
839        } else {
840            return None;
841        };
842        Some(EpochStats {
843            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
844            transaction_count: last_checkpoint.network_total_transactions
845                - prev_epoch_network_transactions,
846            total_gas_reward: last_checkpoint
847                .epoch_rolling_gas_cost_summary
848                .computation_cost,
849        })
850    }
851
852    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
853        // This checkpoints the entire db and not one column family
854        self.tables
855            .checkpoint_content
856            .checkpoint_db(path)
857            .map_err(Into::into)
858    }
859
860    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
861        let mut wb = self.tables.watermarks.batch();
862        wb.delete_batch(
863            &self.tables.watermarks,
864            std::iter::once(CheckpointWatermark::HighestExecuted),
865        )?;
866        wb.write()?;
867        Ok(())
868    }
869
870    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
871        self.delete_highest_executed_checkpoint_test_only()?;
872        self.tables.watermarks.rocksdb.flush()?;
873        Ok(())
874    }
875
876    /// TODO: this is only needed while upgrading from non-dataquarantine to
877    /// dataquarantine. After that it can be deleted.
878    ///
879    /// Re-executes all transactions from all local, uncertified checkpoints for
880    /// crash recovery. All transactions thus re-executed are guaranteed to
881    /// not have any missing dependencies, because we start from the highest
882    /// executed checkpoint, and proceed through checkpoints in order.
883    // tracking issue: https://github.com/iotaledger/iota/issues/8290
884    #[instrument(level = "debug", skip_all)]
885    pub async fn reexecute_local_checkpoints(
886        &self,
887        state: &AuthorityState,
888        epoch_store: &AuthorityPerEpochStore,
889    ) {
890        let epoch = epoch_store.epoch();
891        let highest_executed = self
892            .get_highest_executed_checkpoint_seq_number()
893            .expect("get_highest_executed_checkpoint_seq_number should not fail")
894            .unwrap_or(0);
895
896        let Ok(Some(highest_built)) = self.get_latest_locally_computed_checkpoint() else {
897            info!("no locally built checkpoints to verify");
898            return;
899        };
900
901        info!(
902            "rexecuting locally computed checkpoints for crash recovery from {} to {}",
903            highest_executed, highest_built
904        );
905
906        for seq in highest_executed + 1..=*highest_built.sequence_number() {
907            info!(?seq, "Re-executing locally computed checkpoint");
908            let Some(checkpoint) = self
909                .get_locally_computed_checkpoint(seq)
910                .expect("get_locally_computed_checkpoint should not fail")
911            else {
912                panic!("locally computed checkpoint {seq:?} not found");
913            };
914
915            let Some(contents) = self
916                .get_checkpoint_contents(&checkpoint.content_digest)
917                .expect("get_checkpoint_contents should not fail")
918            else {
919                panic!(
920                    "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
921                    seq, checkpoint.content_digest
922                );
923            };
924
925            let cache = state.get_transaction_cache_reader();
926
927            let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
928            let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
929            let txns = cache
930                .try_multi_get_transaction_blocks(&tx_digests)
931                .expect("try_multi_get_transaction_blocks should not fail");
932
933            let txns: Vec<_> = itertools::izip!(txns, tx_digests, fx_digests)
934                .filter_map(|(tx, digest, fx)| {
935                    if let Some(tx) = tx {
936                        Some((tx, fx))
937                    } else {
938                        info!(
939                            "transaction {:?} not found during checkpoint re-execution",
940                            digest
941                        );
942                        None
943                    }
944                })
945                // end of epoch transaction can only be executed by CheckpointExecutor
946                .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
947                .map(|(tx, fx)| {
948                    (
949                        VerifiedExecutableTransaction::new_from_checkpoint(
950                            (*tx).clone(),
951                            epoch,
952                            seq,
953                        ),
954                        fx,
955                    )
956                })
957                .collect();
958
959            let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
960
961            info!(
962                ?seq,
963                ?tx_digests,
964                "Re-executing transactions for locally built checkpoint"
965            );
966            // this will panic if any re-execution diverges from the previously recorded
967            // effects digest
968            state.enqueue_with_expected_effects_digest(txns, epoch_store);
969
970            // a task that logs every so often until it is cancelled
971            // This should normally finish very quickly, so seeing this log more than once
972            // or twice is likely a sign of a problem.
973            let waiting_logger = tokio::task::spawn(async move {
974                let mut interval = tokio::time::interval(Duration::from_secs(1));
975                loop {
976                    interval.tick().await;
977                    warn!(?seq, "Still waiting for re-execution to complete");
978                }
979            });
980
981            cache
982                .try_notify_read_executed_effects_digests(&tx_digests)
983                .await
984                .expect("notify_read_executed_effects_digests should not fail");
985
986            waiting_logger.abort();
987            waiting_logger.await.ok();
988            info!(?seq, "Re-execution completed for locally built checkpoint");
989        }
990
991        info!("Re-execution of locally built checkpoints completed");
992    }
993}
994
995#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
996pub enum CheckpointWatermark {
997    HighestVerified,
998    HighestSynced,
999    HighestExecuted,
1000    HighestPruned,
1001}
1002
1003pub struct CheckpointBuilder {
1004    state: Arc<AuthorityState>,
1005    store: Arc<CheckpointStore>,
1006    epoch_store: Arc<AuthorityPerEpochStore>,
1007    notify: Arc<Notify>,
1008    notify_aggregator: Arc<Notify>,
1009    last_built: watch::Sender<CheckpointSequenceNumber>,
1010    effects_store: Arc<dyn TransactionCacheRead>,
1011    accumulator: Weak<StateAccumulator>,
1012    output: Box<dyn CheckpointOutput>,
1013    metrics: Arc<CheckpointMetrics>,
1014    max_transactions_per_checkpoint: usize,
1015    max_checkpoint_size_bytes: usize,
1016}
1017
1018pub struct CheckpointAggregator {
1019    store: Arc<CheckpointStore>,
1020    epoch_store: Arc<AuthorityPerEpochStore>,
1021    notify: Arc<Notify>,
1022    current: Option<CheckpointSignatureAggregator>,
1023    output: Box<dyn CertifiedCheckpointOutput>,
1024    state: Arc<AuthorityState>,
1025    metrics: Arc<CheckpointMetrics>,
1026}
1027
1028// This holds information to aggregate signatures for one checkpoint
1029pub struct CheckpointSignatureAggregator {
1030    next_index: u64,
1031    summary: CheckpointSummary,
1032    digest: CheckpointDigest,
1033    /// Aggregates voting stake for each signed checkpoint proposal by authority
1034    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1035    store: Arc<CheckpointStore>,
1036    state: Arc<AuthorityState>,
1037    metrics: Arc<CheckpointMetrics>,
1038}
1039
1040impl CheckpointBuilder {
1041    fn new(
1042        state: Arc<AuthorityState>,
1043        store: Arc<CheckpointStore>,
1044        epoch_store: Arc<AuthorityPerEpochStore>,
1045        notify: Arc<Notify>,
1046        effects_store: Arc<dyn TransactionCacheRead>,
1047        accumulator: Weak<StateAccumulator>,
1048        output: Box<dyn CheckpointOutput>,
1049        notify_aggregator: Arc<Notify>,
1050        last_built: watch::Sender<CheckpointSequenceNumber>,
1051        metrics: Arc<CheckpointMetrics>,
1052        max_transactions_per_checkpoint: usize,
1053        max_checkpoint_size_bytes: usize,
1054    ) -> Self {
1055        Self {
1056            state,
1057            store,
1058            epoch_store,
1059            notify,
1060            effects_store,
1061            accumulator,
1062            output,
1063            notify_aggregator,
1064            last_built,
1065            metrics,
1066            max_transactions_per_checkpoint,
1067            max_checkpoint_size_bytes,
1068        }
1069    }
1070
1071    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
1072    /// creation of checkpoints.
1073    async fn run(mut self) {
1074        info!("Starting CheckpointBuilder");
1075        loop {
1076            self.maybe_build_checkpoints().await;
1077
1078            self.notify.notified().await;
1079        }
1080    }
1081
1082    async fn maybe_build_checkpoints(&mut self) {
1083        let _scope = monitored_scope("BuildCheckpoints");
1084
1085        // Collect info about the most recently built checkpoint.
1086        let summary = self
1087            .epoch_store
1088            .last_built_checkpoint_builder_summary()
1089            .expect("epoch should not have ended");
1090        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1091        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1092
1093        let min_checkpoint_interval_ms = self
1094            .epoch_store
1095            .protocol_config()
1096            .min_checkpoint_interval_ms_as_option()
1097            .unwrap_or_default();
1098        let mut grouped_pending_checkpoints = Vec::new();
1099        let mut checkpoints_iter = self
1100            .epoch_store
1101            .get_pending_checkpoints(last_height)
1102            .expect("unexpected epoch store error")
1103            .into_iter()
1104            .peekable();
1105        while let Some((height, pending)) = checkpoints_iter.next() {
1106            // Group PendingCheckpoints until:
1107            // - minimum interval has elapsed ...
1108            let current_timestamp = pending.details().timestamp_ms;
1109            let can_build = match last_timestamp {
1110                    Some(last_timestamp) => {
1111                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1112                    }
1113                    None => true,
1114                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1115                //   should be written separately) ...
1116                } || checkpoints_iter
1117                    .peek()
1118                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1119                // - or, we have reached end of epoch.
1120                    || pending.details().last_of_epoch;
1121            grouped_pending_checkpoints.push(pending);
1122            if !can_build {
1123                debug!(
1124                    checkpoint_commit_height = height,
1125                    ?last_timestamp,
1126                    ?current_timestamp,
1127                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1128                );
1129                continue;
1130            }
1131
1132            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1133            last_height = Some(height);
1134            last_timestamp = Some(current_timestamp);
1135            debug!(
1136                checkpoint_commit_height = height,
1137                "Making checkpoint at commit height"
1138            );
1139
1140            match self
1141                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1142                .await
1143            {
1144                Ok(seq) => {
1145                    self.last_built.send_if_modified(|cur| {
1146                        // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1147                        if seq > *cur {
1148                            *cur = seq;
1149                            true
1150                        } else {
1151                            false
1152                        }
1153                    });
1154                }
1155                Err(e) => {
1156                    error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1157                    tokio::time::sleep(Duration::from_secs(1)).await;
1158                    self.metrics.checkpoint_errors.inc();
1159                    return;
1160                }
1161            }
1162            // Ensure that the task can be cancelled at end of epoch, even if no other await
1163            // yields execution.
1164            tokio::task::yield_now().await;
1165        }
1166        debug!(
1167            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1168            grouped_pending_checkpoints.len(),
1169        );
1170    }
1171
1172    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1173    async fn make_checkpoint(
1174        &self,
1175        pendings: Vec<PendingCheckpoint>,
1176    ) -> anyhow::Result<CheckpointSequenceNumber> {
1177        let last_details = pendings.last().unwrap().details().clone();
1178
1179        // Keeps track of the effects that are already included in the current
1180        // checkpoint. This is used when there are multiple pending checkpoints
1181        // to create a single checkpoint because in such scenarios, dependencies
1182        // of a transaction may in earlier created checkpoints, or in earlier
1183        // pending checkpoints.
1184        let mut effects_in_current_checkpoint = BTreeSet::new();
1185
1186        // Stores the transactions that should be included in the checkpoint.
1187        // Transactions will be recorded in the checkpoint in this order.
1188        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1189        for pending_checkpoint in pendings.into_iter() {
1190            let pending = pending_checkpoint.into_v1();
1191            let txn_in_checkpoint = self
1192                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1193                .await?;
1194            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1195        }
1196        let new_checkpoints = self
1197            .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1198            .await?;
1199        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1200        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1201            .await?;
1202        Ok(highest_sequence)
1203    }
1204
1205    // Given the root transactions of a pending checkpoint, resolve the transactions
1206    // should be included in the checkpoint, and return them in the order they
1207    // should be included in the checkpoint. `effects_in_current_checkpoint`
1208    // tracks the transactions that already exist in the current checkpoint.
1209    #[instrument(level = "debug", skip_all)]
1210    async fn resolve_checkpoint_transactions(
1211        &self,
1212        roots: Vec<TransactionKey>,
1213        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1214    ) -> IotaResult<Vec<TransactionEffects>> {
1215        self.metrics
1216            .checkpoint_roots_count
1217            .inc_by(roots.len() as u64);
1218
1219        let root_digests = self
1220            .epoch_store
1221            .notify_read_executed_digests(&roots)
1222            .in_monitored_scope("CheckpointNotifyDigests")
1223            .await?;
1224        let root_effects = self
1225            .effects_store
1226            .try_notify_read_executed_effects(&root_digests)
1227            .in_monitored_scope("CheckpointNotifyRead")
1228            .await?;
1229
1230        let _scope = monitored_scope("CheckpointBuilder");
1231
1232        let consensus_commit_prologue = {
1233            // If the roots contains consensus commit prologue transaction, we want to
1234            // extract it, and put it to the front of the checkpoint.
1235
1236            let consensus_commit_prologue = self
1237                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1238                .await?;
1239
1240            // Get the un-included dependencies of the consensus commit prologue. We should
1241            // expect no other dependencies that haven't been included in any
1242            // previous checkpoints.
1243            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1244                let unsorted_ccp = self.complete_checkpoint_effects(
1245                    vec![ccp_effects.clone()],
1246                    effects_in_current_checkpoint,
1247                )?;
1248
1249                // No other dependencies of this consensus commit prologue that haven't been
1250                // included in any previous checkpoint.
1251                if unsorted_ccp.len() != 1 {
1252                    fatal!(
1253                        "Expected 1 consensus commit prologue, got {:?}",
1254                        unsorted_ccp
1255                            .iter()
1256                            .map(|e| e.transaction_digest())
1257                            .collect::<Vec<_>>()
1258                    );
1259                }
1260                assert_eq!(unsorted_ccp.len(), 1);
1261                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1262            }
1263            consensus_commit_prologue
1264        };
1265
1266        let unsorted =
1267            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1268
1269        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1270        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1271        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1272            #[cfg(debug_assertions)]
1273            {
1274                // When consensus_commit_prologue is extracted, it should not be included in the
1275                // `unsorted`.
1276                for tx in unsorted.iter() {
1277                    assert!(tx.transaction_digest() != &_ccp_digest);
1278                }
1279            }
1280            sorted.push(ccp_effects);
1281        }
1282        sorted.extend(CausalOrder::causal_sort(unsorted));
1283
1284        #[cfg(msim)]
1285        {
1286            // Check consensus commit prologue invariants in sim test.
1287            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1288        }
1289
1290        Ok(sorted)
1291    }
1292
1293    // This function is used to extract the consensus commit prologue digest and
1294    // effects from the root transactions.
1295    // The consensus commit prologue is expected to be the first transaction in the
1296    // roots.
1297    async fn extract_consensus_commit_prologue(
1298        &self,
1299        root_digests: &[TransactionDigest],
1300        root_effects: &[TransactionEffects],
1301    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1302        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1303        if root_digests.is_empty() {
1304            return Ok(None);
1305        }
1306
1307        // Reads the first transaction in the roots, and checks whether it is a
1308        // consensus commit prologue transaction.
1309        // The consensus commit prologue transaction should be the first transaction
1310        // in the roots written by the consensus handler.
1311        let first_tx = self
1312            .state
1313            .get_transaction_cache_reader()
1314            .try_get_transaction_block(&root_digests[0])?
1315            .expect("Transaction block must exist");
1316
1317        Ok(match first_tx.transaction_data().kind() {
1318            TransactionKind::ConsensusCommitPrologueV1(_) => {
1319                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1320                Some((*first_tx.digest(), root_effects[0].clone()))
1321            }
1322            _ => None,
1323        })
1324    }
1325
1326    /// Writes the new checkpoints to the DB storage and processes them.
1327    #[instrument(level = "debug", skip_all)]
1328    async fn write_checkpoints(
1329        &self,
1330        height: CheckpointHeight,
1331        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1332    ) -> IotaResult {
1333        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1334        let mut batch = self.store.tables.checkpoint_content.batch();
1335        let mut all_tx_digests =
1336            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1337
1338        for (summary, contents) in &new_checkpoints {
1339            debug!(
1340                checkpoint_commit_height = height,
1341                checkpoint_seq = summary.sequence_number,
1342                contents_digest = ?contents.digest(),
1343                "writing checkpoint",
1344            );
1345
1346            if let Some(previously_computed_summary) = self
1347                .store
1348                .tables
1349                .locally_computed_checkpoints
1350                .get(&summary.sequence_number)?
1351            {
1352                if previously_computed_summary != *summary {
1353                    // Panic so that we don't send out an equivocating checkpoint sig.
1354                    fatal!(
1355                        "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1356                        summary.sequence_number,
1357                    );
1358                }
1359            }
1360
1361            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1362
1363            self.metrics
1364                .transactions_included_in_checkpoint
1365                .inc_by(contents.size() as u64);
1366            let sequence_number = summary.sequence_number;
1367            self.metrics
1368                .last_constructed_checkpoint
1369                .set(sequence_number as i64);
1370
1371            batch.insert_batch(
1372                &self.store.tables.checkpoint_content,
1373                [(contents.digest(), contents)],
1374            )?;
1375
1376            batch.insert_batch(
1377                &self.store.tables.locally_computed_checkpoints,
1378                [(sequence_number, summary)],
1379            )?;
1380        }
1381
1382        batch.write()?;
1383
1384        // Send all checkpoint sigs to consensus.
1385        for (summary, contents) in &new_checkpoints {
1386            self.output
1387                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1388                .await?;
1389        }
1390
1391        for (local_checkpoint, _) in &new_checkpoints {
1392            if let Some(certified_checkpoint) = self
1393                .store
1394                .tables
1395                .certified_checkpoints
1396                .get(local_checkpoint.sequence_number())?
1397            {
1398                self.store
1399                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1400            }
1401        }
1402
1403        self.notify_aggregator.notify_one();
1404        self.epoch_store
1405            .process_constructed_checkpoint(height, new_checkpoints);
1406        Ok(())
1407    }
1408
1409    #[expect(clippy::type_complexity)]
1410    fn split_checkpoint_chunks(
1411        &self,
1412        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1413        signatures: Vec<Vec<GenericSignature>>,
1414    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1415        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1416        let mut chunks = Vec::new();
1417        let mut chunk = Vec::new();
1418        let mut chunk_size: usize = 0;
1419        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1420            .into_iter()
1421            .zip(signatures.into_iter())
1422        {
1423            // Roll over to a new chunk after either max count or max size is reached.
1424            // The size calculation here is intended to estimate the size of the
1425            // FullCheckpointContents struct. If this code is modified, that struct
1426            // should also be updated accordingly.
1427            let size = transaction_size
1428                + bcs::serialized_size(&effects)?
1429                + bcs::serialized_size(&signatures)?;
1430            if chunk.len() == self.max_transactions_per_checkpoint
1431                || (chunk_size + size) > self.max_checkpoint_size_bytes
1432            {
1433                if chunk.is_empty() {
1434                    // Always allow at least one tx in a checkpoint.
1435                    warn!(
1436                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1437                        self.max_checkpoint_size_bytes
1438                    );
1439                } else {
1440                    chunks.push(chunk);
1441                    chunk = Vec::new();
1442                    chunk_size = 0;
1443                }
1444            }
1445
1446            chunk.push((effects, signatures));
1447            chunk_size += size;
1448        }
1449
1450        if !chunk.is_empty() || chunks.is_empty() {
1451            // We intentionally create an empty checkpoint if there is no content provided
1452            // to make a 'heartbeat' checkpoint.
1453            // Important: if some conditions are added here later, we need to make sure we
1454            // always have at least one chunk if last_pending_of_epoch is set
1455            chunks.push(chunk);
1456            // Note: empty checkpoints are ok - they shouldn't happen at all on
1457            // a network with even modest load. Even if they do
1458            // happen, it is still useful as it allows fullnodes to
1459            // distinguish between "no transactions have happened" and "i am not
1460            // receiving new checkpoints".
1461        }
1462        Ok(chunks)
1463    }
1464
1465    /// Creates checkpoints using the provided transaction effects and pending
1466    /// checkpoint information.
1467    fn load_last_built_checkpoint_summary(
1468        epoch_store: &AuthorityPerEpochStore,
1469        store: &CheckpointStore,
1470    ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1471        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1472        if last_checkpoint.is_none() {
1473            let epoch = epoch_store.epoch();
1474            if epoch > 0 {
1475                let previous_epoch = epoch - 1;
1476                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1477                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1478                if let Some((ref seq, _)) = last_checkpoint {
1479                    debug!(
1480                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1481                    );
1482                } else {
1483                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1484                    // via panic
1485                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1486                }
1487            }
1488        }
1489        Ok(last_checkpoint)
1490    }
1491
1492    #[instrument(level = "debug", skip_all)]
1493    async fn create_checkpoints(
1494        &self,
1495        all_effects: Vec<TransactionEffects>,
1496        details: &PendingCheckpointInfo,
1497    ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1498        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1499        let total = all_effects.len();
1500        let mut last_checkpoint =
1501            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1502        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1503        debug!(
1504            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1505            checkpoint_timestamp = details.timestamp_ms,
1506            "Creating checkpoint(s) for {} transactions",
1507            all_effects.len(),
1508        );
1509
1510        let all_digests: Vec<_> = all_effects
1511            .iter()
1512            .map(|effect| *effect.transaction_digest())
1513            .collect();
1514        let transactions_and_sizes = self
1515            .state
1516            .get_transaction_cache_reader()
1517            .try_get_transactions_and_serialized_sizes(&all_digests)?;
1518        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1519        let mut transactions = Vec::with_capacity(all_effects.len());
1520        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1521        let mut randomness_rounds = BTreeMap::new();
1522        {
1523            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1524            debug!(
1525                ?last_checkpoint_seq,
1526                "Waiting for {:?} certificates to appear in consensus",
1527                all_effects.len()
1528            );
1529
1530            for (effects, transaction_and_size) in all_effects
1531                .into_iter()
1532                .zip(transactions_and_sizes.into_iter())
1533            {
1534                let (transaction, size) = transaction_and_size
1535                    .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1536                match transaction.inner().transaction_data().kind() {
1537                    TransactionKind::ConsensusCommitPrologueV1(_)
1538                    | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1539                        // ConsensusCommitPrologue and
1540                        // AuthenticatorStateUpdateV1
1541                        // are guaranteed to be
1542                        // processed before we reach here.
1543                    }
1544                    TransactionKind::RandomnessStateUpdate(rsu) => {
1545                        randomness_rounds
1546                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1547                    }
1548                    _ => {
1549                        // All other tx should be included in the call to
1550                        // `consensus_messages_processed_notify`.
1551                        transaction_keys.push(SequencedConsensusTransactionKey::External(
1552                            ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1553                        ));
1554                    }
1555                }
1556                transactions.push(transaction);
1557                all_effects_and_transaction_sizes.push((effects, size));
1558            }
1559
1560            self.epoch_store
1561                .consensus_messages_processed_notify(transaction_keys)
1562                .await?;
1563        }
1564
1565        let signatures = self
1566            .epoch_store
1567            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1568        debug!(
1569            ?last_checkpoint_seq,
1570            "Received {} checkpoint user signatures from consensus",
1571            signatures.len()
1572        );
1573
1574        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1575        let chunks_count = chunks.len();
1576
1577        let mut checkpoints = Vec::with_capacity(chunks_count);
1578        debug!(
1579            ?last_checkpoint_seq,
1580            "Creating {} checkpoints with {} transactions", chunks_count, total,
1581        );
1582
1583        let epoch = self.epoch_store.epoch();
1584        for (index, transactions) in chunks.into_iter().enumerate() {
1585            let first_checkpoint_of_epoch = index == 0
1586                && last_checkpoint
1587                    .as_ref()
1588                    .map(|(_, c)| c.epoch != epoch)
1589                    .unwrap_or(true);
1590            if first_checkpoint_of_epoch {
1591                self.epoch_store
1592                    .record_epoch_first_checkpoint_creation_time_metric();
1593            }
1594            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1595
1596            let sequence_number = last_checkpoint
1597                .as_ref()
1598                .map(|(_, c)| c.sequence_number + 1)
1599                .unwrap_or_default();
1600            let mut timestamp_ms = details.timestamp_ms;
1601            if let Some((_, last_checkpoint)) = &last_checkpoint {
1602                if last_checkpoint.timestamp_ms > timestamp_ms {
1603                    // The first consensus commit of an epoch can have zero timestamp.
1604                    debug!(
1605                        "Decrease of checkpoint timestamp, possibly due to epoch change. Sequence: {}, previous: {}, current: {}",
1606                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms,
1607                    );
1608                    if self
1609                        .epoch_store
1610                        .protocol_config()
1611                        .consensus_median_timestamp_with_checkpoint_enforcement()
1612                    {
1613                        timestamp_ms = last_checkpoint.timestamp_ms;
1614                    }
1615                }
1616            }
1617
1618            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1619            let epoch_rolling_gas_cost_summary =
1620                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1621
1622            let end_of_epoch_data = if last_checkpoint_of_epoch {
1623                let (system_state_obj, system_epoch_info_event) = self
1624                    .augment_epoch_last_checkpoint(
1625                        &epoch_rolling_gas_cost_summary,
1626                        timestamp_ms,
1627                        &mut effects,
1628                        &mut signatures,
1629                        sequence_number,
1630                    )
1631                    .await?;
1632
1633                // The system epoch info event can be `None` in case if the `advance_epoch`
1634                // Move function call failed and was executed in the safe mode.
1635                // In this case, the tokens supply should be unchanged.
1636                //
1637                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1638                // to those small numbers, no overflows will occur during conversion or
1639                // subtraction.
1640                let epoch_supply_change =
1641                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1642
1643                let committee = system_state_obj
1644                    .get_current_epoch_committee()
1645                    .committee()
1646                    .clone();
1647
1648                // This must happen after the call to augment_epoch_last_checkpoint,
1649                // otherwise we will not capture the change_epoch tx.
1650                let root_state_digest = {
1651                    let state_acc = self
1652                        .accumulator
1653                        .upgrade()
1654                        .expect("No checkpoints should be getting built after local configuration");
1655                    let acc = state_acc.accumulate_checkpoint(
1656                        &effects,
1657                        sequence_number,
1658                        &self.epoch_store,
1659                    )?;
1660
1661                    state_acc
1662                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1663                        .await?;
1664
1665                    state_acc.accumulate_running_root(
1666                        &self.epoch_store,
1667                        sequence_number,
1668                        Some(acc),
1669                    )?;
1670                    state_acc
1671                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1672                        .await?
1673                };
1674                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1675                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1676
1677                let epoch_commitments = vec![root_state_digest.into()];
1678
1679                Some(EndOfEpochData {
1680                    next_epoch_committee: committee.voting_rights,
1681                    next_epoch_protocol_version: ProtocolVersion::new(
1682                        system_state_obj.protocol_version(),
1683                    ),
1684                    epoch_commitments,
1685                    epoch_supply_change,
1686                })
1687            } else {
1688                None
1689            };
1690            let contents = CheckpointContents::new_with_digests_and_signatures(
1691                effects.iter().map(TransactionEffects::execution_digests),
1692                signatures,
1693            );
1694
1695            let num_txns = contents.size() as u64;
1696
1697            let network_total_transactions = last_checkpoint
1698                .as_ref()
1699                .map(|(_, c)| c.network_total_transactions + num_txns)
1700                .unwrap_or(num_txns);
1701
1702            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1703
1704            let matching_randomness_rounds: Vec<_> = effects
1705                .iter()
1706                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1707                .copied()
1708                .collect();
1709
1710            let summary = CheckpointSummary::new(
1711                self.epoch_store.protocol_config(),
1712                epoch,
1713                sequence_number,
1714                network_total_transactions,
1715                &contents,
1716                previous_digest,
1717                epoch_rolling_gas_cost_summary,
1718                end_of_epoch_data,
1719                timestamp_ms,
1720                matching_randomness_rounds,
1721            );
1722            summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1723            if last_checkpoint_of_epoch {
1724                info!(
1725                    checkpoint_seq = sequence_number,
1726                    "creating last checkpoint of epoch {}", epoch
1727                );
1728                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1729                    self.epoch_store
1730                        .report_epoch_metrics_at_last_checkpoint(stats);
1731                }
1732            }
1733            last_checkpoint = Some((sequence_number, summary.clone()));
1734            checkpoints.push((summary, contents));
1735        }
1736
1737        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1738    }
1739
1740    fn get_epoch_total_gas_cost(
1741        &self,
1742        last_checkpoint: Option<&CheckpointSummary>,
1743        cur_checkpoint_effects: &[TransactionEffects],
1744    ) -> GasCostSummary {
1745        let (previous_epoch, previous_gas_costs) = last_checkpoint
1746            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1747            .unwrap_or_default();
1748        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1749        if previous_epoch == self.epoch_store.epoch() {
1750            // sum only when we are within the same epoch
1751            GasCostSummary::new(
1752                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1753                previous_gas_costs.computation_cost_burned
1754                    + current_gas_costs.computation_cost_burned,
1755                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1756                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1757                previous_gas_costs.non_refundable_storage_fee
1758                    + current_gas_costs.non_refundable_storage_fee,
1759            )
1760        } else {
1761            current_gas_costs
1762        }
1763    }
1764
1765    /// Augments the last checkpoint of the epoch by creating and executing an
1766    /// advance epoch transaction.
1767    #[instrument(level = "error", skip_all)]
1768    async fn augment_epoch_last_checkpoint(
1769        &self,
1770        epoch_total_gas_cost: &GasCostSummary,
1771        epoch_start_timestamp_ms: CheckpointTimestamp,
1772        checkpoint_effects: &mut Vec<TransactionEffects>,
1773        signatures: &mut Vec<Vec<GenericSignature>>,
1774        checkpoint: CheckpointSequenceNumber,
1775    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1776        let (system_state, system_epoch_info_event, effects) = self
1777            .state
1778            .create_and_execute_advance_epoch_tx(
1779                &self.epoch_store,
1780                epoch_total_gas_cost,
1781                checkpoint,
1782                epoch_start_timestamp_ms,
1783            )
1784            .await?;
1785        checkpoint_effects.push(effects);
1786        signatures.push(vec![]);
1787        Ok((system_state, system_epoch_info_event))
1788    }
1789
1790    /// For the given roots return complete list of effects to include in
1791    /// checkpoint This list includes the roots and all their dependencies,
1792    /// which are not part of checkpoint already. Note that this function
1793    /// may be called multiple times to construct the checkpoint.
1794    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1795    /// that are already included in the checkpoint. Txs in `roots` that
1796    /// need to be included in the checkpoint will be added to
1797    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1798    #[instrument(level = "debug", skip_all)]
1799    fn complete_checkpoint_effects(
1800        &self,
1801        mut roots: Vec<TransactionEffects>,
1802        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1803    ) -> IotaResult<Vec<TransactionEffects>> {
1804        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1805        let mut results = vec![];
1806        let mut seen = HashSet::new();
1807        loop {
1808            let mut pending = HashSet::new();
1809
1810            let transactions_included = self
1811                .epoch_store
1812                .builder_included_transactions_in_checkpoint(
1813                    roots.iter().map(|e| e.transaction_digest()),
1814                )?;
1815
1816            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1817                let digest = effect.transaction_digest();
1818                // Unnecessary to read effects of a dependency if the effect is already
1819                // processed.
1820                seen.insert(*digest);
1821
1822                // Skip roots that are already included in the checkpoint.
1823                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1824                    continue;
1825                }
1826
1827                // Skip roots already included in checkpoints or roots from previous epochs
1828                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1829                    continue;
1830                }
1831
1832                let existing_effects = self
1833                    .epoch_store
1834                    .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1835
1836                for (dependency, effects_signature_exists) in
1837                    effect.dependencies().iter().zip(existing_effects.iter())
1838                {
1839                    // Skip here if dependency not executed in the current epoch.
1840                    // Note that the existence of an effects signature in the
1841                    // epoch store for the given digest indicates that the transaction
1842                    // was locally executed in the current epoch
1843                    if !effects_signature_exists {
1844                        continue;
1845                    }
1846                    if seen.insert(*dependency) {
1847                        pending.insert(*dependency);
1848                    }
1849                }
1850                results.push(effect);
1851            }
1852            if pending.is_empty() {
1853                break;
1854            }
1855            let pending = pending.into_iter().collect::<Vec<_>>();
1856            let effects = self
1857                .effects_store
1858                .try_multi_get_executed_effects(&pending)?;
1859            let effects = effects
1860                .into_iter()
1861                .zip(pending)
1862                .map(|(opt, digest)| match opt {
1863                    Some(x) => x,
1864                    None => panic!(
1865                        "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1866                    ),
1867                })
1868                .collect::<Vec<_>>();
1869            roots = effects;
1870        }
1871
1872        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1873        Ok(results)
1874    }
1875
1876    // This function is used to check the invariants of the consensus commit
1877    // prologue transactions in the checkpoint in simtest.
1878    #[cfg(msim)]
1879    fn expensive_consensus_commit_prologue_invariants_check(
1880        &self,
1881        root_digests: &[TransactionDigest],
1882        sorted: &[TransactionEffects],
1883    ) {
1884        // Gets all the consensus commit prologue transactions from the roots.
1885        let root_txs = self
1886            .state
1887            .get_transaction_cache_reader()
1888            .multi_get_transaction_blocks(root_digests);
1889        let ccps = root_txs
1890            .iter()
1891            .filter_map(|tx| {
1892                tx.as_ref().filter(|tx| {
1893                    matches!(
1894                        tx.transaction_data().kind(),
1895                        TransactionKind::ConsensusCommitPrologueV1(_)
1896                    )
1897                })
1898            })
1899            .collect::<Vec<_>>();
1900
1901        // There should be at most one consensus commit prologue transaction in the
1902        // roots.
1903        assert!(ccps.len() <= 1);
1904
1905        // Get all the transactions in the checkpoint.
1906        let txs = self
1907            .state
1908            .get_transaction_cache_reader()
1909            .multi_get_transaction_blocks(
1910                &sorted
1911                    .iter()
1912                    .map(|tx| *tx.transaction_digest())
1913                    .collect::<Vec<_>>(),
1914            );
1915
1916        if ccps.is_empty() {
1917            // If there is no consensus commit prologue transaction in the roots, then there
1918            // should be no consensus commit prologue transaction in the
1919            // checkpoint.
1920            for tx in txs.iter().flatten() {
1921                assert!(!matches!(
1922                    tx.transaction_data().kind(),
1923                    TransactionKind::ConsensusCommitPrologueV1(_)
1924                ));
1925            }
1926        } else {
1927            // If there is one consensus commit prologue, it must be the first one in the
1928            // checkpoint.
1929            assert!(matches!(
1930                txs[0].as_ref().unwrap().transaction_data().kind(),
1931                TransactionKind::ConsensusCommitPrologueV1(_)
1932            ));
1933
1934            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1935
1936            for tx in txs.iter().skip(1).flatten() {
1937                assert!(!matches!(
1938                    tx.transaction_data().kind(),
1939                    TransactionKind::ConsensusCommitPrologueV1(_)
1940                ));
1941            }
1942        }
1943    }
1944}
1945
1946impl CheckpointAggregator {
1947    fn new(
1948        tables: Arc<CheckpointStore>,
1949        epoch_store: Arc<AuthorityPerEpochStore>,
1950        notify: Arc<Notify>,
1951        output: Box<dyn CertifiedCheckpointOutput>,
1952        state: Arc<AuthorityState>,
1953        metrics: Arc<CheckpointMetrics>,
1954    ) -> Self {
1955        let current = None;
1956        Self {
1957            store: tables,
1958            epoch_store,
1959            notify,
1960            current,
1961            output,
1962            state,
1963            metrics,
1964        }
1965    }
1966
1967    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1968    /// aggregation of checkpoints.
1969    /// The function ensures continuous aggregation of checkpoints, handling
1970    /// errors and retries gracefully, and allowing for proper shutdown on
1971    /// receiving an exit signal.
1972    async fn run(mut self) {
1973        info!("Starting CheckpointAggregator");
1974        loop {
1975            if let Err(e) = self.run_and_notify().await {
1976                error!(
1977                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1978                    e
1979                );
1980                self.metrics.checkpoint_errors.inc();
1981                tokio::time::sleep(Duration::from_secs(1)).await;
1982                continue;
1983            }
1984
1985            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1986        }
1987    }
1988
1989    async fn run_and_notify(&mut self) -> IotaResult {
1990        let summaries = self.run_inner()?;
1991        for summary in summaries {
1992            self.output.certified_checkpoint_created(&summary).await?;
1993        }
1994        Ok(())
1995    }
1996
1997    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1998        let _scope = monitored_scope("CheckpointAggregator");
1999        let mut result = vec![];
2000        'outer: loop {
2001            let next_to_certify = self.next_checkpoint_to_certify()?;
2002            let current = if let Some(current) = &mut self.current {
2003                // It's possible that the checkpoint was already certified by
2004                // the rest of the network and we've already received the
2005                // certified checkpoint via StateSync. In this case, we reset
2006                // the current signature aggregator to the next checkpoint to
2007                // be certified
2008                if current.summary.sequence_number < next_to_certify {
2009                    self.current = None;
2010                    continue;
2011                }
2012                current
2013            } else {
2014                let Some(summary) = self
2015                    .epoch_store
2016                    .get_built_checkpoint_summary(next_to_certify)?
2017                else {
2018                    return Ok(result);
2019                };
2020                self.current = Some(CheckpointSignatureAggregator {
2021                    next_index: 0,
2022                    digest: summary.digest(),
2023                    summary,
2024                    signatures_by_digest: MultiStakeAggregator::new(
2025                        self.epoch_store.committee().clone(),
2026                    ),
2027                    store: self.store.clone(),
2028                    state: self.state.clone(),
2029                    metrics: self.metrics.clone(),
2030                });
2031                self.current.as_mut().unwrap()
2032            };
2033
2034            let epoch_tables = self
2035                .epoch_store
2036                .tables()
2037                .expect("should not run past end of epoch");
2038            let iter = epoch_tables
2039                .pending_checkpoint_signatures
2040                .safe_iter_with_bounds(
2041                    Some((current.summary.sequence_number, current.next_index)),
2042                    None,
2043                );
2044            for item in iter {
2045                let ((seq, index), data) = item?;
2046                if seq != current.summary.sequence_number {
2047                    trace!(
2048                        checkpoint_seq =? current.summary.sequence_number,
2049                        "Not enough checkpoint signatures",
2050                    );
2051                    // No more signatures (yet) for this checkpoint
2052                    return Ok(result);
2053                }
2054                trace!(
2055                    checkpoint_seq = current.summary.sequence_number,
2056                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2057                    current.summary.digest(),
2058                    data.summary.auth_sig().authority.concise()
2059                );
2060                self.metrics
2061                    .checkpoint_participation
2062                    .with_label_values(&[&format!(
2063                        "{:?}",
2064                        data.summary.auth_sig().authority.concise()
2065                    )])
2066                    .inc();
2067                if let Ok(auth_signature) = current.try_aggregate(data) {
2068                    debug!(
2069                        checkpoint_seq = current.summary.sequence_number,
2070                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2071                        current.summary.digest(),
2072                    );
2073                    let summary = VerifiedCheckpoint::new_unchecked(
2074                        CertifiedCheckpointSummary::new_from_data_and_sig(
2075                            current.summary.clone(),
2076                            auth_signature,
2077                        ),
2078                    );
2079
2080                    self.store.insert_certified_checkpoint(&summary)?;
2081                    self.metrics
2082                        .last_certified_checkpoint
2083                        .set(current.summary.sequence_number as i64);
2084                    current
2085                        .summary
2086                        .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2087                    result.push(summary.into_inner());
2088                    self.current = None;
2089                    continue 'outer;
2090                } else {
2091                    current.next_index = index + 1;
2092                }
2093            }
2094            break;
2095        }
2096        Ok(result)
2097    }
2098
2099    fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2100        Ok(self
2101            .store
2102            .tables
2103            .certified_checkpoints
2104            .reversed_safe_iter_with_bounds(None, None)?
2105            .next()
2106            .transpose()?
2107            .map(|(seq, _)| seq + 1)
2108            .unwrap_or_default())
2109    }
2110}
2111
2112impl CheckpointSignatureAggregator {
2113    #[expect(clippy::result_unit_err)]
2114    pub fn try_aggregate(
2115        &mut self,
2116        data: CheckpointSignatureMessage,
2117    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2118        let their_digest = *data.summary.digest();
2119        let (_, signature) = data.summary.into_data_and_sig();
2120        let author = signature.authority;
2121        let envelope =
2122            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2123        match self.signatures_by_digest.insert(their_digest, envelope) {
2124            // ignore repeated signatures
2125            InsertResult::Failed {
2126                error:
2127                    IotaError::StakeAggregatorRepeatedSigner {
2128                        conflicting_sig: false,
2129                        ..
2130                    },
2131            } => Err(()),
2132            InsertResult::Failed { error } => {
2133                warn!(
2134                    checkpoint_seq = self.summary.sequence_number,
2135                    "Failed to aggregate new signature from validator {:?}: {:?}",
2136                    author.concise(),
2137                    error
2138                );
2139                self.check_for_split_brain();
2140                Err(())
2141            }
2142            InsertResult::QuorumReached(cert) => {
2143                // It is not guaranteed that signature.authority == consensus_cert.author, but
2144                // we do verify the signature so we know that the author signed
2145                // the message at some point.
2146                if their_digest != self.digest {
2147                    self.metrics.remote_checkpoint_forks.inc();
2148                    warn!(
2149                        checkpoint_seq = self.summary.sequence_number,
2150                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2151                        author.concise(),
2152                        their_digest,
2153                        self.digest
2154                    );
2155                    return Err(());
2156                }
2157                Ok(cert)
2158            }
2159            InsertResult::NotEnoughVotes {
2160                bad_votes: _,
2161                bad_authorities: _,
2162            } => {
2163                self.check_for_split_brain();
2164                Err(())
2165            }
2166        }
2167    }
2168
2169    /// Check if there is a split brain condition in checkpoint signature
2170    /// aggregation, defined as any state wherein it is no longer possible
2171    /// to achieve quorum on a checkpoint proposal, irrespective of the
2172    /// outcome of any outstanding votes.
2173    fn check_for_split_brain(&self) {
2174        debug!(
2175            checkpoint_seq = self.summary.sequence_number,
2176            "Checking for split brain condition"
2177        );
2178        if self.signatures_by_digest.quorum_unreachable() {
2179            // TODO: at this point we should immediately halt processing
2180            // of new transaction certificates to avoid building on top of
2181            // forked output
2182            // self.halt_all_execution();
2183
2184            let digests_by_stake_messages = self
2185                .signatures_by_digest
2186                .get_all_unique_values()
2187                .into_iter()
2188                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2189                .map(|(digest, (_authorities, total_stake))| {
2190                    format!("{digest:?} (total stake: {total_stake})")
2191                })
2192                .collect::<Vec<String>>();
2193            error!(
2194                checkpoint_seq = self.summary.sequence_number,
2195                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2196                self.signatures_by_digest.uncommitted_stake(),
2197                digests_by_stake_messages,
2198            );
2199            self.metrics.split_brain_checkpoint_forks.inc();
2200
2201            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2202            let local_summary = self.summary.clone();
2203            let state = self.state.clone();
2204            let tables = self.store.clone();
2205
2206            tokio::spawn(async move {
2207                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2208            });
2209        }
2210    }
2211}
2212
2213/// Create data dump containing relevant data for diagnosing cause of the
2214/// split brain by querying one disagreeing validator for full checkpoint
2215/// contents. To minimize peer chatter, we only query one validator at random
2216/// from each disagreeing faction, as all honest validators that participated in
2217/// this round may inevitably run the same process.
2218async fn diagnose_split_brain(
2219    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2220    local_summary: CheckpointSummary,
2221    state: Arc<AuthorityState>,
2222    tables: Arc<CheckpointStore>,
2223) {
2224    debug!(
2225        checkpoint_seq = local_summary.sequence_number,
2226        "Running split brain diagnostics..."
2227    );
2228    let time = SystemTime::now();
2229    // collect one random disagreeing validator per differing digest
2230    let digest_to_validator = all_unique_values
2231        .iter()
2232        .filter_map(|(digest, (validators, _))| {
2233            if *digest != local_summary.digest() {
2234                let random_validator = validators.choose(&mut OsRng).unwrap();
2235                Some((*digest, *random_validator))
2236            } else {
2237                None
2238            }
2239        })
2240        .collect::<HashMap<_, _>>();
2241    if digest_to_validator.is_empty() {
2242        panic!(
2243            "Given split brain condition, there should be at \
2244                least one validator that disagrees with local signature"
2245        );
2246    }
2247
2248    let epoch_store = state.load_epoch_store_one_call_per_task();
2249    let committee = epoch_store
2250        .epoch_start_state()
2251        .get_iota_committee_with_network_metadata();
2252    let network_config = default_iota_network_config();
2253    let network_clients =
2254        make_network_authority_clients_with_network_config(&committee, &network_config);
2255
2256    // Query all disagreeing validators
2257    let response_futures = digest_to_validator
2258        .values()
2259        .cloned()
2260        .map(|validator| {
2261            let client = network_clients
2262                .get(&validator)
2263                .expect("Failed to get network client");
2264            let request = CheckpointRequest {
2265                sequence_number: Some(local_summary.sequence_number),
2266                request_content: true,
2267                certified: false,
2268            };
2269            client.handle_checkpoint(request)
2270        })
2271        .collect::<Vec<_>>();
2272
2273    let digest_name_pair = digest_to_validator.iter();
2274    let response_data = futures::future::join_all(response_futures)
2275        .await
2276        .into_iter()
2277        .zip(digest_name_pair)
2278        .filter_map(|(response, (digest, name))| match response {
2279            Ok(response) => match response {
2280                CheckpointResponse {
2281                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2282                    contents: Some(contents),
2283                } => Some((*name, *digest, summary, contents)),
2284                CheckpointResponse {
2285                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2286                    contents: _,
2287                } => {
2288                    panic!("Expected pending checkpoint, but got certified checkpoint");
2289                }
2290                CheckpointResponse {
2291                    checkpoint: None,
2292                    contents: _,
2293                } => {
2294                    error!(
2295                        "Summary for checkpoint {:?} not found on validator {:?}",
2296                        local_summary.sequence_number, name
2297                    );
2298                    None
2299                }
2300                CheckpointResponse {
2301                    checkpoint: _,
2302                    contents: None,
2303                } => {
2304                    error!(
2305                        "Contents for checkpoint {:?} not found on validator {:?}",
2306                        local_summary.sequence_number, name
2307                    );
2308                    None
2309                }
2310            },
2311            Err(e) => {
2312                error!(
2313                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2314                    e
2315                );
2316                None
2317            }
2318        })
2319        .collect::<Vec<_>>();
2320
2321    let local_checkpoint_contents = tables
2322        .get_checkpoint_contents(&local_summary.content_digest)
2323        .unwrap_or_else(|_| {
2324            panic!(
2325                "Could not find checkpoint contents for digest {:?}",
2326                local_summary.digest()
2327            )
2328        })
2329        .unwrap_or_else(|| {
2330            panic!(
2331                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2332                local_summary.sequence_number,
2333                local_summary.digest()
2334            )
2335        });
2336    let local_contents_text = format!("{local_checkpoint_contents:?}");
2337
2338    let local_summary_text = format!("{local_summary:?}");
2339    let local_validator = state.name.concise();
2340    let diff_patches = response_data
2341        .iter()
2342        .map(|(name, other_digest, other_summary, contents)| {
2343            let other_contents_text = format!("{contents:?}");
2344            let other_summary_text = format!("{other_summary:?}");
2345            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2346                .enumerate_transactions(&local_summary)
2347                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2348                .unzip();
2349            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2350                .enumerate_transactions(other_summary)
2351                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2352                .unzip();
2353            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2354            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2355            let local_transactions_text = format!("{local_transactions:#?}");
2356            let other_transactions_text = format!("{other_transactions:#?}");
2357            let transactions_patch =
2358                create_patch(&local_transactions_text, &other_transactions_text);
2359            let local_effects_text = format!("{local_effects:#?}");
2360            let other_effects_text = format!("{other_effects:#?}");
2361            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2362            let seq_number = local_summary.sequence_number;
2363            let local_digest = local_summary.digest();
2364            let other_validator = name.concise();
2365            format!(
2366                "Checkpoint: {seq_number:?}\n\
2367                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2368                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2369                Summary Diff: \n{summary_patch}\n\n\
2370                Contents Diff: \n{contents_patch}\n\n\
2371                Transactions Diff: \n{transactions_patch}\n\n\
2372                Effects Diff: \n{effects_patch}",
2373            )
2374        })
2375        .collect::<Vec<_>>()
2376        .join("\n\n\n");
2377
2378    let header = format!(
2379        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2380        Datetime: {time:?}"
2381    );
2382    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2383    let path = tempfile::tempdir()
2384        .expect("Failed to create tempdir")
2385        .keep()
2386        .join(Path::new("checkpoint_fork_dump.txt"));
2387    let mut file = File::create(path).unwrap();
2388    write!(file, "{fork_logs_text}").unwrap();
2389    debug!("{}", fork_logs_text);
2390
2391    fail_point!("split_brain_reached");
2392}
2393
2394pub trait CheckpointServiceNotify {
2395    fn notify_checkpoint_signature(
2396        &self,
2397        epoch_store: &AuthorityPerEpochStore,
2398        info: &CheckpointSignatureMessage,
2399    ) -> IotaResult;
2400
2401    fn notify_checkpoint(&self) -> IotaResult;
2402}
2403
2404enum CheckpointServiceState {
2405    Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2406    Started,
2407}
2408
2409impl CheckpointServiceState {
2410    fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2411        let mut state = CheckpointServiceState::Started;
2412        std::mem::swap(self, &mut state);
2413
2414        match state {
2415            CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2416            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2417        }
2418    }
2419}
2420
2421pub struct CheckpointService {
2422    tables: Arc<CheckpointStore>,
2423    notify_builder: Arc<Notify>,
2424    notify_aggregator: Arc<Notify>,
2425    last_signature_index: Mutex<u64>,
2426    // A notification for the current highest built sequence number.
2427    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2428    // The highest sequence number that had already been built at the time CheckpointService
2429    // was constructed
2430    highest_previously_built_seq: CheckpointSequenceNumber,
2431    metrics: Arc<CheckpointMetrics>,
2432    state: Mutex<CheckpointServiceState>,
2433}
2434
2435impl CheckpointService {
2436    /// Spawns the checkpoint service, initializing and starting the checkpoint
2437    /// builder and aggregator tasks.
2438    /// Constructs a new CheckpointService in an un-started state.
2439    pub fn build(
2440        state: Arc<AuthorityState>,
2441        checkpoint_store: Arc<CheckpointStore>,
2442        epoch_store: Arc<AuthorityPerEpochStore>,
2443        effects_store: Arc<dyn TransactionCacheRead>,
2444        accumulator: Weak<StateAccumulator>,
2445        checkpoint_output: Box<dyn CheckpointOutput>,
2446        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2447        metrics: Arc<CheckpointMetrics>,
2448        max_transactions_per_checkpoint: usize,
2449        max_checkpoint_size_bytes: usize,
2450    ) -> Arc<Self> {
2451        info!(
2452            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2453        );
2454        let notify_builder = Arc::new(Notify::new());
2455        let notify_aggregator = Arc::new(Notify::new());
2456
2457        // We may have built higher checkpoint numbers before restarting.
2458        let highest_previously_built_seq = checkpoint_store
2459            .get_latest_locally_computed_checkpoint()
2460            .expect("failed to get latest locally computed checkpoint")
2461            .map(|s| s.sequence_number)
2462            .unwrap_or(0);
2463
2464        let highest_currently_built_seq =
2465            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2466                .expect("epoch should not have ended")
2467                .map(|(seq, _)| seq)
2468                .unwrap_or(0);
2469
2470        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2471
2472        let aggregator = CheckpointAggregator::new(
2473            checkpoint_store.clone(),
2474            epoch_store.clone(),
2475            notify_aggregator.clone(),
2476            certified_checkpoint_output,
2477            state.clone(),
2478            metrics.clone(),
2479        );
2480
2481        let builder = CheckpointBuilder::new(
2482            state.clone(),
2483            checkpoint_store.clone(),
2484            epoch_store.clone(),
2485            notify_builder.clone(),
2486            effects_store,
2487            accumulator,
2488            checkpoint_output,
2489            notify_aggregator.clone(),
2490            highest_currently_built_seq_tx.clone(),
2491            metrics.clone(),
2492            max_transactions_per_checkpoint,
2493            max_checkpoint_size_bytes,
2494        );
2495
2496        let last_signature_index = epoch_store
2497            .get_last_checkpoint_signature_index()
2498            .expect("should not cross end of epoch");
2499        let last_signature_index = Mutex::new(last_signature_index);
2500
2501        Arc::new(Self {
2502            tables: checkpoint_store,
2503            notify_builder,
2504            notify_aggregator,
2505            last_signature_index,
2506            highest_currently_built_seq_tx,
2507            highest_previously_built_seq,
2508            metrics,
2509            state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2510                builder, aggregator,
2511            )))),
2512        })
2513    }
2514
2515    /// Starts the CheckpointService.
2516    ///
2517    /// This function blocks until the CheckpointBuilder re-builds all
2518    /// checkpoints that had been built before the most recent restart. You
2519    /// can think of this as a WAL replay operation. Upon startup, we may
2520    /// have a number of consensus commits and resulting checkpoints that
2521    /// were built but not committed to disk. We want to reprocess the
2522    /// commits and rebuild the checkpoints before starting normal operation.
2523    pub async fn spawn(&self) -> JoinSet<()> {
2524        let mut tasks = JoinSet::new();
2525
2526        let (builder, aggregator) = self.state.lock().take_unstarted();
2527        tasks.spawn(monitored_future!(builder.run()));
2528        tasks.spawn(monitored_future!(aggregator.run()));
2529
2530        // If this times out, the validator may still start up. The worst that can
2531        // happen is that we will crash later on instead of immediately. The eventual
2532        // crash would occur because we may be missing transactions that are below the
2533        // highest_synced_checkpoint watermark, which can cause a crash in
2534        // `CheckpointExecutor::extract_randomness_rounds`.
2535        if tokio::time::timeout(
2536            Duration::from_secs(120),
2537            self.wait_for_rebuilt_checkpoints(),
2538        )
2539        .await
2540        .is_err()
2541        {
2542            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2543        }
2544
2545        tasks
2546    }
2547}
2548
2549impl CheckpointService {
2550    /// Waits until all checkpoints had been built before the node restarted
2551    /// are rebuilt. This is required to preserve the invariant that all
2552    /// checkpoints (and their transactions) below the
2553    /// highest_synced_checkpoint watermark are available. Once the
2554    /// checkpoints are constructed, we can be sure that the transactions
2555    /// have also been executed.
2556    pub async fn wait_for_rebuilt_checkpoints(&self) {
2557        let highest_previously_built_seq = self.highest_previously_built_seq;
2558        let mut rx = self.highest_currently_built_seq_tx.subscribe();
2559        let mut highest_currently_built_seq = *rx.borrow_and_update();
2560        info!(
2561            "Waiting for checkpoints to be rebuilt, previously built seq: \
2562            {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2563        );
2564        loop {
2565            if highest_currently_built_seq >= highest_previously_built_seq {
2566                info!("Checkpoint rebuild complete");
2567                break;
2568            }
2569            rx.changed().await.unwrap();
2570            highest_currently_built_seq = *rx.borrow_and_update();
2571        }
2572    }
2573
2574    #[cfg(test)]
2575    fn write_and_notify_checkpoint_for_testing(
2576        &self,
2577        epoch_store: &AuthorityPerEpochStore,
2578        checkpoint: PendingCheckpoint,
2579    ) -> IotaResult {
2580        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2581
2582        let mut output = ConsensusCommitOutput::new(0);
2583        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2584        output.set_default_commit_stats_for_testing();
2585        epoch_store.push_consensus_output_for_tests(output);
2586        self.notify_checkpoint()?;
2587        Ok(())
2588    }
2589}
2590
2591impl CheckpointServiceNotify for CheckpointService {
2592    fn notify_checkpoint_signature(
2593        &self,
2594        epoch_store: &AuthorityPerEpochStore,
2595        info: &CheckpointSignatureMessage,
2596    ) -> IotaResult {
2597        let sequence = info.summary.sequence_number;
2598        let signer = info.summary.auth_sig().authority.concise();
2599
2600        if let Some(highest_verified_checkpoint) = self
2601            .tables
2602            .get_highest_verified_checkpoint()?
2603            .map(|x| *x.sequence_number())
2604        {
2605            if sequence <= highest_verified_checkpoint {
2606                trace!(
2607                    checkpoint_seq = sequence,
2608                    "Ignore checkpoint signature from {} - already certified", signer,
2609                );
2610                self.metrics
2611                    .last_ignored_checkpoint_signature_received
2612                    .set(sequence as i64);
2613                return Ok(());
2614            }
2615        }
2616        trace!(
2617            checkpoint_seq = sequence,
2618            "Received checkpoint signature, digest {} from {}",
2619            info.summary.digest(),
2620            signer,
2621        );
2622        self.metrics
2623            .last_received_checkpoint_signatures
2624            .with_label_values(&[&signer.to_string()])
2625            .set(sequence as i64);
2626        // While it can be tempting to make last_signature_index into AtomicU64, this
2627        // won't work We need to make sure we write to `pending_signatures` and
2628        // trigger `notify_aggregator` without race conditions
2629        let mut index = self.last_signature_index.lock();
2630        *index += 1;
2631        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2632        self.notify_aggregator.notify_one();
2633        Ok(())
2634    }
2635
2636    fn notify_checkpoint(&self) -> IotaResult {
2637        self.notify_builder.notify_one();
2638        Ok(())
2639    }
2640}
2641
2642// test helper
2643pub struct CheckpointServiceNoop {}
2644impl CheckpointServiceNotify for CheckpointServiceNoop {
2645    fn notify_checkpoint_signature(
2646        &self,
2647        _: &AuthorityPerEpochStore,
2648        _: &CheckpointSignatureMessage,
2649    ) -> IotaResult {
2650        Ok(())
2651    }
2652
2653    fn notify_checkpoint(&self) -> IotaResult {
2654        Ok(())
2655    }
2656}
2657
2658#[cfg(test)]
2659mod tests {
2660    use std::{
2661        collections::{BTreeMap, HashMap},
2662        ops::Deref,
2663    };
2664
2665    use futures::{FutureExt as _, future::BoxFuture};
2666    use iota_macros::sim_test;
2667    use iota_protocol_config::{Chain, ProtocolConfig};
2668    use iota_types::{
2669        base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2670        crypto::Signature,
2671        digests::TransactionEventsDigest,
2672        effects::{TransactionEffects, TransactionEvents},
2673        messages_checkpoint::SignedCheckpointSummary,
2674        move_package::MovePackage,
2675        object,
2676        transaction::{GenesisObject, VerifiedTransaction},
2677    };
2678    use tokio::sync::mpsc;
2679
2680    use super::*;
2681    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2682
2683    #[sim_test]
2684    pub async fn checkpoint_builder_test() {
2685        telemetry_subscribers::init_for_testing();
2686
2687        let mut protocol_config =
2688            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2689        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2690        let state = TestAuthorityBuilder::new()
2691            .with_protocol_config(protocol_config)
2692            .build()
2693            .await;
2694
2695        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2696        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2697            vec![GenesisObject::RawObject {
2698                data: object::Data::Package(
2699                    MovePackage::new(
2700                        ObjectID::random(),
2701                        SequenceNumber::new(),
2702                        BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2703                        100_000,
2704                        // no modules so empty type_origin_table as no types are defined in this
2705                        // package
2706                        Vec::new(),
2707                        // no modules so empty linkage_table as no dependencies of this package
2708                        // exist
2709                        BTreeMap::new(),
2710                    )
2711                    .unwrap(),
2712                ),
2713                owner: object::Owner::Immutable,
2714            }],
2715            vec![],
2716        );
2717        for i in 0..15 {
2718            state
2719                .database_for_testing()
2720                .perpetual_tables
2721                .transactions
2722                .insert(&d(i), dummy_tx.serializable_ref())
2723                .unwrap();
2724        }
2725        for i in 15..20 {
2726            state
2727                .database_for_testing()
2728                .perpetual_tables
2729                .transactions
2730                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2731                .unwrap();
2732        }
2733
2734        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2735        commit_cert_for_test(
2736            &mut store,
2737            state.clone(),
2738            d(1),
2739            vec![d(2), d(3)],
2740            GasCostSummary::new(11, 11, 12, 11, 1),
2741        );
2742        commit_cert_for_test(
2743            &mut store,
2744            state.clone(),
2745            d(2),
2746            vec![d(3), d(4)],
2747            GasCostSummary::new(21, 21, 22, 21, 1),
2748        );
2749        commit_cert_for_test(
2750            &mut store,
2751            state.clone(),
2752            d(3),
2753            vec![],
2754            GasCostSummary::new(31, 31, 32, 31, 1),
2755        );
2756        commit_cert_for_test(
2757            &mut store,
2758            state.clone(),
2759            d(4),
2760            vec![],
2761            GasCostSummary::new(41, 41, 42, 41, 1),
2762        );
2763        for i in [5, 6, 7, 10, 11, 12, 13] {
2764            commit_cert_for_test(
2765                &mut store,
2766                state.clone(),
2767                d(i),
2768                vec![],
2769                GasCostSummary::new(41, 41, 42, 41, 1),
2770            );
2771        }
2772        for i in [15, 16, 17] {
2773            commit_cert_for_test(
2774                &mut store,
2775                state.clone(),
2776                d(i),
2777                vec![],
2778                GasCostSummary::new(51, 51, 52, 51, 1),
2779            );
2780        }
2781        let all_digests: Vec<_> = store.keys().copied().collect();
2782        for digest in all_digests {
2783            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2784            state
2785                .epoch_store_for_testing()
2786                .test_insert_user_signature(digest, vec![signature]);
2787        }
2788
2789        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2790        let (certified_output, mut certified_result) =
2791            mpsc::channel::<CertifiedCheckpointSummary>(10);
2792        let store = Arc::new(store);
2793
2794        let ckpt_dir = tempfile::tempdir().unwrap();
2795        let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2796        let epoch_store = state.epoch_store_for_testing();
2797
2798        let accumulator = Arc::new(StateAccumulator::new_for_tests(
2799            state.get_accumulator_store().clone(),
2800        ));
2801
2802        let checkpoint_service = CheckpointService::build(
2803            state.clone(),
2804            checkpoint_store,
2805            epoch_store.clone(),
2806            store,
2807            Arc::downgrade(&accumulator),
2808            Box::new(output),
2809            Box::new(certified_output),
2810            CheckpointMetrics::new_for_tests(),
2811            3,
2812            100_000,
2813        );
2814        let _tasks = checkpoint_service.spawn().await;
2815
2816        checkpoint_service
2817            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2818            .unwrap();
2819        checkpoint_service
2820            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2821            .unwrap();
2822        checkpoint_service
2823            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2824            .unwrap();
2825        checkpoint_service
2826            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2827            .unwrap();
2828        checkpoint_service
2829            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2830            .unwrap();
2831        checkpoint_service
2832            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2833            .unwrap();
2834
2835        let (c1c, c1s) = result.recv().await.unwrap();
2836        let (c2c, c2s) = result.recv().await.unwrap();
2837
2838        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2839        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2840        assert_eq!(c1t, vec![d(4)]);
2841        assert_eq!(c1s.previous_digest, None);
2842        assert_eq!(c1s.sequence_number, 0);
2843        assert_eq!(
2844            c1s.epoch_rolling_gas_cost_summary,
2845            GasCostSummary::new(41, 41, 42, 41, 1)
2846        );
2847
2848        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2849        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2850        assert_eq!(c2s.sequence_number, 1);
2851        assert_eq!(
2852            c2s.epoch_rolling_gas_cost_summary,
2853            GasCostSummary::new(104, 104, 108, 104, 4)
2854        );
2855
2856        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2857        // Verify that we split into 2 checkpoints.
2858        let (c3c, c3s) = result.recv().await.unwrap();
2859        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2860        let (c4c, c4s) = result.recv().await.unwrap();
2861        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2862        assert_eq!(c3s.sequence_number, 2);
2863        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2864        assert_eq!(c4s.sequence_number, 3);
2865        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2866        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2867        assert_eq!(c4t, vec![d(13)]);
2868
2869        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2870        // max. Verify that we split into 2 checkpoints.
2871        let (c5c, c5s) = result.recv().await.unwrap();
2872        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2873        let (c6c, c6s) = result.recv().await.unwrap();
2874        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2875        assert_eq!(c5s.sequence_number, 4);
2876        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2877        assert_eq!(c6s.sequence_number, 5);
2878        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2879        assert_eq!(c5t, vec![d(15), d(16)]);
2880        assert_eq!(c6t, vec![d(17)]);
2881
2882        // Pending at index 4 was too soon after the prior one and should be coalesced
2883        // into the next one.
2884        let (c7c, c7s) = result.recv().await.unwrap();
2885        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2886        assert_eq!(c7t, vec![d(5), d(6)]);
2887        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2888        assert_eq!(c7s.sequence_number, 6);
2889
2890        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2891        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2892
2893        checkpoint_service
2894            .notify_checkpoint_signature(
2895                &epoch_store,
2896                &CheckpointSignatureMessage { summary: c2ss },
2897            )
2898            .unwrap();
2899        checkpoint_service
2900            .notify_checkpoint_signature(
2901                &epoch_store,
2902                &CheckpointSignatureMessage { summary: c1ss },
2903            )
2904            .unwrap();
2905
2906        let c1sc = certified_result.recv().await.unwrap();
2907        let c2sc = certified_result.recv().await.unwrap();
2908        assert_eq!(c1sc.sequence_number, 0);
2909        assert_eq!(c2sc.sequence_number, 1);
2910    }
2911
2912    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2913        fn try_notify_read_executed_effects(
2914            &self,
2915            digests: &[TransactionDigest],
2916        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2917            std::future::ready(Ok(digests
2918                .iter()
2919                .map(|d| self.get(d).expect("effects not found").clone())
2920                .collect()))
2921            .boxed()
2922        }
2923
2924        fn try_notify_read_executed_effects_digests(
2925            &self,
2926            digests: &[TransactionDigest],
2927        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2928            std::future::ready(Ok(digests
2929                .iter()
2930                .map(|d| {
2931                    self.get(d)
2932                        .map(|fx| fx.digest())
2933                        .expect("effects not found")
2934                })
2935                .collect()))
2936            .boxed()
2937        }
2938
2939        fn try_multi_get_executed_effects(
2940            &self,
2941            digests: &[TransactionDigest],
2942        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2943            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2944        }
2945
2946        // Unimplemented methods - its unfortunate to have this big blob of useless
2947        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2948        // for these tests, as it caused a ton of complication in non-test code.
2949        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2950        // implementors).
2951
2952        fn try_multi_get_transaction_blocks(
2953            &self,
2954            _: &[TransactionDigest],
2955        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2956            unimplemented!()
2957        }
2958
2959        fn try_multi_get_executed_effects_digests(
2960            &self,
2961            _: &[TransactionDigest],
2962        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2963            unimplemented!()
2964        }
2965
2966        fn try_multi_get_effects(
2967            &self,
2968            _: &[TransactionEffectsDigest],
2969        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2970            unimplemented!()
2971        }
2972
2973        fn try_multi_get_events(
2974            &self,
2975            _: &[TransactionEventsDigest],
2976        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2977            unimplemented!()
2978        }
2979    }
2980
2981    #[async_trait::async_trait]
2982    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2983        async fn checkpoint_created(
2984            &self,
2985            summary: &CheckpointSummary,
2986            contents: &CheckpointContents,
2987            _epoch_store: &Arc<AuthorityPerEpochStore>,
2988            _checkpoint_store: &Arc<CheckpointStore>,
2989        ) -> IotaResult {
2990            self.try_send((contents.clone(), summary.clone())).unwrap();
2991            Ok(())
2992        }
2993    }
2994
2995    #[async_trait::async_trait]
2996    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2997        async fn certified_checkpoint_created(
2998            &self,
2999            summary: &CertifiedCheckpointSummary,
3000        ) -> IotaResult {
3001            self.try_send(summary.clone()).unwrap();
3002            Ok(())
3003        }
3004    }
3005
3006    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
3007        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
3008            roots: t
3009                .into_iter()
3010                .map(|t| TransactionKey::Digest(d(t)))
3011                .collect(),
3012            details: PendingCheckpointInfo {
3013                timestamp_ms,
3014                last_of_epoch: false,
3015                checkpoint_height: i,
3016            },
3017        })
3018    }
3019
3020    fn d(i: u8) -> TransactionDigest {
3021        let mut bytes: [u8; 32] = Default::default();
3022        bytes[0] = i;
3023        TransactionDigest::new(bytes)
3024    }
3025
3026    fn e(
3027        transaction_digest: TransactionDigest,
3028        dependencies: Vec<TransactionDigest>,
3029        gas_used: GasCostSummary,
3030    ) -> TransactionEffects {
3031        let mut effects = TransactionEffects::default();
3032        *effects.transaction_digest_mut_for_testing() = transaction_digest;
3033        *effects.dependencies_mut_for_testing() = dependencies;
3034        *effects.gas_cost_summary_mut_for_testing() = gas_used;
3035        effects
3036    }
3037
3038    fn commit_cert_for_test(
3039        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3040        state: Arc<AuthorityState>,
3041        digest: TransactionDigest,
3042        dependencies: Vec<TransactionDigest>,
3043        gas_used: GasCostSummary,
3044    ) {
3045        let epoch_store = state.epoch_store_for_testing();
3046        let effects = e(digest, dependencies, gas_used);
3047        store.insert(digest, effects.clone());
3048        epoch_store
3049            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
3050            .expect("Inserting cert fx and sigs should not fail");
3051    }
3052}