iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::{Duration, SystemTime},
17};
18
19use diffy::create_patch;
20use iota_common::{debug_fatal, fatal, sync::notify_read::NotifyRead};
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27    committee::StakeUnit,
28    crypto::AuthorityStrongQuorumSignInfo,
29    digests::{CheckpointContentsDigest, CheckpointDigest},
30    effects::{TransactionEffects, TransactionEffectsAPI},
31    error::{IotaError, IotaResult},
32    event::SystemEpochInfoEvent,
33    executable_transaction::VerifiedExecutableTransaction,
34    gas::GasCostSummary,
35    iota_system_state::{
36        IotaSystemState, IotaSystemStateTrait,
37        epoch_start_iota_system_state::EpochStartSystemStateTrait,
38    },
39    message_envelope::Message,
40    messages_checkpoint::{
41        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
42        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
43        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
44        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
45        VerifiedCheckpointContents,
46    },
47    messages_consensus::ConsensusTransactionKey,
48    signature::GenericSignature,
49    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
50};
51use itertools::Itertools;
52use nonempty::NonEmpty;
53use parking_lot::Mutex;
54use rand::{rngs::OsRng, seq::SliceRandom};
55use serde::{Deserialize, Serialize};
56use tokio::{
57    sync::{Notify, watch},
58    task::JoinSet,
59    time::timeout,
60};
61use tracing::{debug, error, info, instrument, trace, warn};
62use typed_store::{
63    DBMapUtils, Map, TypedStoreError,
64    rocks::{DBMap, MetricConf},
65    traits::{TableSummary, TypedStoreDebug},
66};
67
68pub use crate::checkpoints::{
69    checkpoint_output::{
70        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
71    },
72    metrics::CheckpointMetrics,
73};
74use crate::{
75    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
76    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
77    checkpoints::{
78        causal_order::CausalOrder,
79        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
80    },
81    consensus_handler::SequencedConsensusTransactionKey,
82    execution_cache::TransactionCacheRead,
83    stake_aggregator::{InsertResult, MultiStakeAggregator},
84    state_accumulator::StateAccumulator,
85};
86
87pub type CheckpointHeight = u64;
88
89pub struct EpochStats {
90    pub checkpoint_count: u64,
91    pub transaction_count: u64,
92    pub total_gas_reward: u64,
93}
94
95#[derive(Clone, Debug, Serialize, Deserialize)]
96pub struct PendingCheckpointInfo {
97    pub timestamp_ms: CheckpointTimestamp,
98    pub last_of_epoch: bool,
99    pub checkpoint_height: CheckpointHeight,
100}
101
102#[derive(Clone, Debug, Serialize, Deserialize)]
103pub enum PendingCheckpoint {
104    // This is an enum for future upgradability, though at the moment there is only one variant.
105    V1(PendingCheckpointContentsV1),
106}
107
108#[derive(Clone, Debug, Serialize, Deserialize)]
109pub struct PendingCheckpointContentsV1 {
110    pub roots: Vec<TransactionKey>,
111    pub details: PendingCheckpointInfo,
112}
113
114impl PendingCheckpoint {
115    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
116        match self {
117            PendingCheckpoint::V1(contents) => contents,
118        }
119    }
120
121    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
122        match self {
123            PendingCheckpoint::V1(contents) => contents,
124        }
125    }
126
127    pub fn roots(&self) -> &Vec<TransactionKey> {
128        &self.as_v1().roots
129    }
130
131    pub fn details(&self) -> &PendingCheckpointInfo {
132        &self.as_v1().details
133    }
134
135    pub fn height(&self) -> CheckpointHeight {
136        self.details().checkpoint_height
137    }
138}
139
140#[derive(Clone, Debug, Serialize, Deserialize)]
141pub struct BuilderCheckpointSummary {
142    pub summary: CheckpointSummary,
143    // Height at which this checkpoint summary was built. None for genesis checkpoint
144    pub checkpoint_height: Option<CheckpointHeight>,
145    pub position_in_commit: usize,
146}
147
148#[derive(DBMapUtils)]
149pub struct CheckpointStoreTables {
150    /// Maps checkpoint contents digest to checkpoint contents
151    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
152
153    /// Maps checkpoint contents digest to checkpoint sequence number
154    pub(crate) checkpoint_sequence_by_contents_digest:
155        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
156
157    /// Stores entire checkpoint contents from state sync, indexed by sequence
158    /// number, for efficient reads of full checkpoints. Entries from this
159    /// table are deleted after state accumulation has completed.
160    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
161
162    /// Stores certified checkpoints
163    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
164    /// Map from checkpoint digest to certified checkpoint
165    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
166
167    /// Store locally computed checkpoint summaries so that we can detect forks
168    /// and log useful information. Can be pruned as soon as we verify that
169    /// we are in agreement with the latest certified checkpoint.
170    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
171
172    /// A map from epoch ID to the sequence number of the last checkpoint in
173    /// that epoch.
174    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
175
176    /// Watermarks used to determine the highest verified, fully synced, and
177    /// fully executed checkpoints
178    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
179}
180
181impl CheckpointStoreTables {
182    pub fn new(path: &Path, metric_name: &'static str) -> Self {
183        Self::open_tables_read_write(path.to_path_buf(), MetricConf::new(metric_name), None, None)
184    }
185    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
186        Self::get_read_only_handle(
187            path.to_path_buf(),
188            None,
189            None,
190            MetricConf::new("checkpoint_readonly"),
191        )
192    }
193}
194
195pub struct CheckpointStore {
196    pub(crate) tables: CheckpointStoreTables,
197    synced_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
198    executed_checkpoint_notify_read: NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
199}
200
201impl CheckpointStore {
202    pub fn new(path: &Path) -> Arc<Self> {
203        let tables = CheckpointStoreTables::new(path, "checkpoint");
204        Arc::new(Self {
205            tables,
206            synced_checkpoint_notify_read: NotifyRead::new(),
207            executed_checkpoint_notify_read: NotifyRead::new(),
208        })
209    }
210
211    pub fn new_for_tests() -> Arc<Self> {
212        let ckpt_dir = tempfile::tempdir().unwrap();
213        CheckpointStore::new(ckpt_dir.path())
214    }
215
216    pub fn new_for_db_checkpoint_handler(path: &Path) -> Arc<Self> {
217        let tables = CheckpointStoreTables::new(path, "db_checkpoint");
218        Arc::new(Self {
219            tables,
220            synced_checkpoint_notify_read: NotifyRead::new(),
221            executed_checkpoint_notify_read: NotifyRead::new(),
222        })
223    }
224
225    pub fn open_readonly(path: &Path) -> CheckpointStoreTablesReadOnly {
226        CheckpointStoreTables::open_readonly(path)
227    }
228
229    #[instrument(level = "info", skip_all)]
230    pub fn insert_genesis_checkpoint(
231        &self,
232        checkpoint: VerifiedCheckpoint,
233        contents: CheckpointContents,
234        epoch_store: &AuthorityPerEpochStore,
235    ) {
236        assert_eq!(
237            checkpoint.epoch(),
238            0,
239            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
240        );
241        assert_eq!(
242            *checkpoint.sequence_number(),
243            0,
244            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
245        );
246
247        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
248        // already
249        if self
250            .get_checkpoint_by_digest(checkpoint.digest())
251            .unwrap()
252            .is_none()
253        {
254            if epoch_store.epoch() == checkpoint.epoch {
255                epoch_store
256                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
257                    .unwrap();
258            } else {
259                debug!(
260                    validator_epoch =% epoch_store.epoch(),
261                    genesis_epoch =% checkpoint.epoch(),
262                    "Not inserting checkpoint builder data for genesis checkpoint",
263                );
264            }
265            self.insert_checkpoint_contents(contents).unwrap();
266            self.insert_verified_checkpoint(&checkpoint).unwrap();
267            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
268        }
269    }
270
271    pub fn get_checkpoint_by_digest(
272        &self,
273        digest: &CheckpointDigest,
274    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
275        self.tables
276            .checkpoint_by_digest
277            .get(digest)
278            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
279    }
280
281    pub fn get_checkpoint_by_sequence_number(
282        &self,
283        sequence_number: CheckpointSequenceNumber,
284    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
285        self.tables
286            .certified_checkpoints
287            .get(&sequence_number)
288            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
289    }
290
291    pub fn get_locally_computed_checkpoint(
292        &self,
293        sequence_number: CheckpointSequenceNumber,
294    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
295        self.tables
296            .locally_computed_checkpoints
297            .get(&sequence_number)
298    }
299
300    pub fn get_sequence_number_by_contents_digest(
301        &self,
302        digest: &CheckpointContentsDigest,
303    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
304        self.tables
305            .checkpoint_sequence_by_contents_digest
306            .get(digest)
307    }
308
309    pub fn delete_contents_digest_sequence_number_mapping(
310        &self,
311        digest: &CheckpointContentsDigest,
312    ) -> Result<(), TypedStoreError> {
313        self.tables
314            .checkpoint_sequence_by_contents_digest
315            .remove(digest)
316    }
317
318    pub fn get_latest_certified_checkpoint(
319        &self,
320    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
321        Ok(self
322            .tables
323            .certified_checkpoints
324            .reversed_safe_iter_with_bounds(None, None)?
325            .next()
326            .transpose()?
327            .map(|(_, v)| v.into()))
328    }
329
330    pub fn get_latest_locally_computed_checkpoint(
331        &self,
332    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
333        Ok(self
334            .tables
335            .locally_computed_checkpoints
336            .reversed_safe_iter_with_bounds(None, None)?
337            .next()
338            .transpose()?
339            .map(|(_, v)| v))
340    }
341
342    pub fn multi_get_checkpoint_by_sequence_number(
343        &self,
344        sequence_numbers: &[CheckpointSequenceNumber],
345    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
346        let checkpoints = self
347            .tables
348            .certified_checkpoints
349            .multi_get(sequence_numbers)?
350            .into_iter()
351            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
352            .collect();
353
354        Ok(checkpoints)
355    }
356
357    pub fn multi_get_checkpoint_content(
358        &self,
359        contents_digest: &[CheckpointContentsDigest],
360    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
361        self.tables.checkpoint_content.multi_get(contents_digest)
362    }
363
364    pub fn get_highest_verified_checkpoint(
365        &self,
366    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
367        let highest_verified = if let Some(highest_verified) = self
368            .tables
369            .watermarks
370            .get(&CheckpointWatermark::HighestVerified)?
371        {
372            highest_verified
373        } else {
374            return Ok(None);
375        };
376        self.get_checkpoint_by_digest(&highest_verified.1)
377    }
378
379    pub fn get_highest_synced_checkpoint(
380        &self,
381    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
382        let highest_synced = if let Some(highest_synced) = self
383            .tables
384            .watermarks
385            .get(&CheckpointWatermark::HighestSynced)?
386        {
387            highest_synced
388        } else {
389            return Ok(None);
390        };
391        self.get_checkpoint_by_digest(&highest_synced.1)
392    }
393
394    pub fn get_highest_synced_checkpoint_seq_number(
395        &self,
396    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
397        if let Some(highest_synced) = self
398            .tables
399            .watermarks
400            .get(&CheckpointWatermark::HighestSynced)?
401        {
402            Ok(Some(highest_synced.0))
403        } else {
404            Ok(None)
405        }
406    }
407
408    pub fn get_highest_executed_checkpoint_seq_number(
409        &self,
410    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
411        if let Some(highest_executed) = self
412            .tables
413            .watermarks
414            .get(&CheckpointWatermark::HighestExecuted)?
415        {
416            Ok(Some(highest_executed.0))
417        } else {
418            Ok(None)
419        }
420    }
421
422    pub fn get_highest_executed_checkpoint(
423        &self,
424    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
425        let highest_executed = if let Some(highest_executed) = self
426            .tables
427            .watermarks
428            .get(&CheckpointWatermark::HighestExecuted)?
429        {
430            highest_executed
431        } else {
432            return Ok(None);
433        };
434        self.get_checkpoint_by_digest(&highest_executed.1)
435    }
436
437    pub fn get_highest_pruned_checkpoint_seq_number(
438        &self,
439    ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
440        Ok(self
441            .tables
442            .watermarks
443            .get(&CheckpointWatermark::HighestPruned)?
444            .unwrap_or_default()
445            .0)
446    }
447
448    pub fn get_checkpoint_contents(
449        &self,
450        digest: &CheckpointContentsDigest,
451    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
452        self.tables.checkpoint_content.get(digest)
453    }
454
455    pub fn get_full_checkpoint_contents_by_sequence_number(
456        &self,
457        seq: CheckpointSequenceNumber,
458    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
459        self.tables.full_checkpoint_content.get(&seq)
460    }
461
462    fn prune_local_summaries(&self) -> IotaResult {
463        if let Some((last_local_summary, _)) = self
464            .tables
465            .locally_computed_checkpoints
466            .reversed_safe_iter_with_bounds(None, None)?
467            .next()
468            .transpose()?
469        {
470            let mut batch = self.tables.locally_computed_checkpoints.batch();
471            batch.schedule_delete_range(
472                &self.tables.locally_computed_checkpoints,
473                &0,
474                &last_local_summary,
475            )?;
476            batch.write()?;
477            info!("Pruned local summaries up to {:?}", last_local_summary);
478        }
479        Ok(())
480    }
481
482    fn check_for_checkpoint_fork(
483        &self,
484        local_checkpoint: &CheckpointSummary,
485        verified_checkpoint: &VerifiedCheckpoint,
486    ) {
487        if local_checkpoint != verified_checkpoint.data() {
488            let verified_contents = self
489                .get_checkpoint_contents(&verified_checkpoint.content_digest)
490                .map(|opt_contents| {
491                    opt_contents
492                        .map(|contents| format!("{contents:?}"))
493                        .unwrap_or_else(|| {
494                            format!(
495                                "Verified checkpoint contents not found, digest: {:?}",
496                                verified_checkpoint.content_digest,
497                            )
498                        })
499                })
500                .map_err(|e| {
501                    format!(
502                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
503                        verified_checkpoint.content_digest, e
504                    )
505                })
506                .unwrap_or_else(|err_msg| err_msg);
507
508            let local_contents = self
509                .get_checkpoint_contents(&local_checkpoint.content_digest)
510                .map(|opt_contents| {
511                    opt_contents
512                        .map(|contents| format!("{contents:?}"))
513                        .unwrap_or_else(|| {
514                            format!(
515                                "Local checkpoint contents not found, digest: {:?}",
516                                local_checkpoint.content_digest
517                            )
518                        })
519                })
520                .map_err(|e| {
521                    format!(
522                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
523                        local_checkpoint.content_digest, e
524                    )
525                })
526                .unwrap_or_else(|err_msg| err_msg);
527
528            // checkpoint contents may be too large for panic message.
529            error!(
530                verified_checkpoint = ?verified_checkpoint.data(),
531                ?verified_contents,
532                ?local_checkpoint,
533                ?local_contents,
534                "Local checkpoint fork detected!",
535            );
536            fatal!(
537                "Local checkpoint fork detected for sequence number: {}",
538                local_checkpoint.sequence_number()
539            );
540        }
541    }
542
543    // Called by consensus (ConsensusAggregator).
544    // Different from `insert_verified_checkpoint`, it does not touch
545    // the highest_verified_checkpoint watermark such that state sync
546    // will have a chance to process this checkpoint and perform some
547    // state-sync only things.
548    pub fn insert_certified_checkpoint(
549        &self,
550        checkpoint: &VerifiedCheckpoint,
551    ) -> Result<(), TypedStoreError> {
552        debug!(
553            checkpoint_seq = checkpoint.sequence_number(),
554            "Inserting certified checkpoint",
555        );
556        let mut batch = self.tables.certified_checkpoints.batch();
557        batch
558            .insert_batch(
559                &self.tables.certified_checkpoints,
560                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
561            )?
562            .insert_batch(
563                &self.tables.checkpoint_by_digest,
564                [(checkpoint.digest(), checkpoint.serializable_ref())],
565            )?;
566        if checkpoint.next_epoch_committee().is_some() {
567            batch.insert_batch(
568                &self.tables.epoch_last_checkpoint_map,
569                [(&checkpoint.epoch(), checkpoint.sequence_number())],
570            )?;
571        }
572        batch.write()?;
573
574        if let Some(local_checkpoint) = self
575            .tables
576            .locally_computed_checkpoints
577            .get(checkpoint.sequence_number())?
578        {
579            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
580        }
581
582        Ok(())
583    }
584
585    // Called by state sync, apart from inserting the checkpoint and updating
586    // related tables, it also bumps the highest_verified_checkpoint watermark.
587    #[instrument(level = "debug", skip_all)]
588    pub fn insert_verified_checkpoint(
589        &self,
590        checkpoint: &VerifiedCheckpoint,
591    ) -> Result<(), TypedStoreError> {
592        self.insert_certified_checkpoint(checkpoint)?;
593        self.update_highest_verified_checkpoint(checkpoint)
594    }
595
596    pub fn update_highest_verified_checkpoint(
597        &self,
598        checkpoint: &VerifiedCheckpoint,
599    ) -> Result<(), TypedStoreError> {
600        if Some(*checkpoint.sequence_number())
601            > self
602                .get_highest_verified_checkpoint()?
603                .map(|x| *x.sequence_number())
604        {
605            debug!(
606                checkpoint_seq = checkpoint.sequence_number(),
607                "Updating highest verified checkpoint",
608            );
609            self.tables.watermarks.insert(
610                &CheckpointWatermark::HighestVerified,
611                &(*checkpoint.sequence_number(), *checkpoint.digest()),
612            )?;
613        }
614
615        Ok(())
616    }
617
618    pub fn update_highest_synced_checkpoint(
619        &self,
620        checkpoint: &VerifiedCheckpoint,
621    ) -> Result<(), TypedStoreError> {
622        let seq = *checkpoint.sequence_number();
623        debug!(checkpoint_seq = seq, "Updating highest synced checkpoint",);
624        self.tables.watermarks.insert(
625            &CheckpointWatermark::HighestSynced,
626            &(seq, *checkpoint.digest()),
627        )?;
628        self.synced_checkpoint_notify_read.notify(&seq, checkpoint);
629        Ok(())
630    }
631
632    async fn notify_read_checkpoint_watermark<F>(
633        &self,
634        notify_read: &NotifyRead<CheckpointSequenceNumber, VerifiedCheckpoint>,
635        seq: CheckpointSequenceNumber,
636        get_watermark: F,
637    ) -> VerifiedCheckpoint
638    where
639        F: Fn() -> Option<CheckpointSequenceNumber>,
640    {
641        type ReadResult = Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError>;
642
643        notify_read
644            .read(&[seq], |seqs| {
645                let seq = seqs[0];
646                let Some(highest) = get_watermark() else {
647                    return Ok(vec![None]) as ReadResult;
648                };
649                if highest < seq {
650                    return Ok(vec![None]) as ReadResult;
651                }
652                let checkpoint = self
653                    .get_checkpoint_by_sequence_number(seq)
654                    .expect("db error")
655                    .expect("checkpoint not found");
656                Ok(vec![Some(checkpoint)]) as ReadResult
657            })
658            .await
659            .unwrap()
660            .into_iter()
661            .next()
662            .unwrap()
663    }
664
665    pub async fn notify_read_synced_checkpoint(
666        &self,
667        seq: CheckpointSequenceNumber,
668    ) -> VerifiedCheckpoint {
669        self.notify_read_checkpoint_watermark(&self.synced_checkpoint_notify_read, seq, || {
670            self.get_highest_synced_checkpoint_seq_number()
671                .expect("db error")
672        })
673        .await
674    }
675
676    pub async fn notify_read_executed_checkpoint(
677        &self,
678        seq: CheckpointSequenceNumber,
679    ) -> VerifiedCheckpoint {
680        self.notify_read_checkpoint_watermark(&self.executed_checkpoint_notify_read, seq, || {
681            self.get_highest_executed_checkpoint_seq_number()
682                .expect("db error")
683        })
684        .await
685    }
686
687    pub fn update_highest_executed_checkpoint(
688        &self,
689        checkpoint: &VerifiedCheckpoint,
690    ) -> Result<(), TypedStoreError> {
691        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
692            if seq_number >= *checkpoint.sequence_number() {
693                return Ok(());
694            }
695            assert_eq!(
696                seq_number + 1,
697                *checkpoint.sequence_number(),
698                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
699                checkpoint.sequence_number(),
700                seq_number
701            );
702        }
703        let seq = *checkpoint.sequence_number();
704        debug!(checkpoint_seq = seq, "Updating highest executed checkpoint",);
705        self.tables.watermarks.insert(
706            &CheckpointWatermark::HighestExecuted,
707            &(seq, *checkpoint.digest()),
708        )?;
709        self.executed_checkpoint_notify_read
710            .notify(&seq, checkpoint);
711        Ok(())
712    }
713
714    pub fn update_highest_pruned_checkpoint(
715        &self,
716        checkpoint: &VerifiedCheckpoint,
717    ) -> Result<(), TypedStoreError> {
718        self.tables.watermarks.insert(
719            &CheckpointWatermark::HighestPruned,
720            &(*checkpoint.sequence_number(), *checkpoint.digest()),
721        )
722    }
723
724    /// Sets highest executed checkpoint to any value.
725    ///
726    /// WARNING: This method is very subtle and can corrupt the database if used
727    /// incorrectly. It should only be used in one-off cases or tests after
728    /// fully understanding the risk.
729    pub fn set_highest_executed_checkpoint_subtle(
730        &self,
731        checkpoint: &VerifiedCheckpoint,
732    ) -> Result<(), TypedStoreError> {
733        self.tables.watermarks.insert(
734            &CheckpointWatermark::HighestExecuted,
735            &(*checkpoint.sequence_number(), *checkpoint.digest()),
736        )
737    }
738
739    pub fn insert_checkpoint_contents(
740        &self,
741        contents: CheckpointContents,
742    ) -> Result<(), TypedStoreError> {
743        debug!(
744            checkpoint_seq = ?contents.digest(),
745            "Inserting checkpoint contents",
746        );
747        self.tables
748            .checkpoint_content
749            .insert(contents.digest(), &contents)
750    }
751
752    pub fn insert_verified_checkpoint_contents(
753        &self,
754        checkpoint: &VerifiedCheckpoint,
755        full_contents: VerifiedCheckpointContents,
756    ) -> Result<(), TypedStoreError> {
757        let mut batch = self.tables.full_checkpoint_content.batch();
758        batch.insert_batch(
759            &self.tables.checkpoint_sequence_by_contents_digest,
760            [(&checkpoint.content_digest, checkpoint.sequence_number())],
761        )?;
762        let full_contents = full_contents.into_inner();
763        batch.insert_batch(
764            &self.tables.full_checkpoint_content,
765            [(checkpoint.sequence_number(), &full_contents)],
766        )?;
767
768        let contents = full_contents.into_checkpoint_contents();
769        assert_eq!(&checkpoint.content_digest, contents.digest());
770
771        batch.insert_batch(
772            &self.tables.checkpoint_content,
773            [(contents.digest(), &contents)],
774        )?;
775
776        batch.write()
777    }
778
779    pub fn delete_full_checkpoint_contents(
780        &self,
781        seq: CheckpointSequenceNumber,
782    ) -> Result<(), TypedStoreError> {
783        self.tables.full_checkpoint_content.remove(&seq)
784    }
785
786    pub fn get_epoch_last_checkpoint(
787        &self,
788        epoch_id: EpochId,
789    ) -> IotaResult<Option<VerifiedCheckpoint>> {
790        let seq = self.tables.epoch_last_checkpoint_map.get(&epoch_id)?;
791        let checkpoint = match seq {
792            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
793            None => None,
794        };
795        Ok(checkpoint)
796    }
797
798    pub fn insert_epoch_last_checkpoint(
799        &self,
800        epoch_id: EpochId,
801        checkpoint: &VerifiedCheckpoint,
802    ) -> IotaResult {
803        self.tables
804            .epoch_last_checkpoint_map
805            .insert(&epoch_id, checkpoint.sequence_number())?;
806        Ok(())
807    }
808
809    pub fn get_epoch_state_commitments(
810        &self,
811        epoch: EpochId,
812    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
813        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
814            checkpoint
815                .end_of_epoch_data
816                .as_ref()
817                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
818                .epoch_commitments
819                .clone()
820        });
821        Ok(commitments)
822    }
823
824    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
825    /// statistics of the epoch.
826    pub fn get_epoch_stats(
827        &self,
828        epoch: EpochId,
829        last_checkpoint: &CheckpointSummary,
830    ) -> Option<EpochStats> {
831        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
832            (0, 0)
833        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
834            (
835                checkpoint.sequence_number + 1,
836                checkpoint.network_total_transactions,
837            )
838        } else {
839            return None;
840        };
841        Some(EpochStats {
842            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
843            transaction_count: last_checkpoint.network_total_transactions
844                - prev_epoch_network_transactions,
845            total_gas_reward: last_checkpoint
846                .epoch_rolling_gas_cost_summary
847                .computation_cost,
848        })
849    }
850
851    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
852        // This checkpoints the entire db and not one column family
853        self.tables
854            .checkpoint_content
855            .checkpoint_db(path)
856            .map_err(Into::into)
857    }
858
859    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
860        let mut wb = self.tables.watermarks.batch();
861        wb.delete_batch(
862            &self.tables.watermarks,
863            std::iter::once(CheckpointWatermark::HighestExecuted),
864        )?;
865        wb.write()?;
866        Ok(())
867    }
868
869    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
870        self.delete_highest_executed_checkpoint_test_only()?;
871        self.tables.watermarks.rocksdb.flush()?;
872        Ok(())
873    }
874
875    /// TODO: this is only needed while upgrading from non-dataquarantine to
876    /// dataquarantine. After that it can be deleted.
877    ///
878    /// Re-executes all transactions from all local, uncertified checkpoints for
879    /// crash recovery. All transactions thus re-executed are guaranteed to
880    /// not have any missing dependencies, because we start from the highest
881    /// executed checkpoint, and proceed through checkpoints in order.
882    // tracking issue: https://github.com/iotaledger/iota/issues/8290
883    #[instrument(level = "debug", skip_all)]
884    pub async fn reexecute_local_checkpoints(
885        &self,
886        state: &AuthorityState,
887        epoch_store: &AuthorityPerEpochStore,
888    ) {
889        let epoch = epoch_store.epoch();
890        let highest_executed = self
891            .get_highest_executed_checkpoint_seq_number()
892            .expect("get_highest_executed_checkpoint_seq_number should not fail")
893            .unwrap_or(0);
894
895        let Ok(Some(highest_built)) = self.get_latest_locally_computed_checkpoint() else {
896            info!("no locally built checkpoints to verify");
897            return;
898        };
899
900        info!(
901            "rexecuting locally computed checkpoints for crash recovery from {} to {}",
902            highest_executed, highest_built
903        );
904
905        for seq in highest_executed + 1..=*highest_built.sequence_number() {
906            info!(?seq, "Re-executing locally computed checkpoint");
907            let Some(checkpoint) = self
908                .get_locally_computed_checkpoint(seq)
909                .expect("get_locally_computed_checkpoint should not fail")
910            else {
911                panic!("locally computed checkpoint {seq:?} not found");
912            };
913
914            let Some(contents) = self
915                .get_checkpoint_contents(&checkpoint.content_digest)
916                .expect("get_checkpoint_contents should not fail")
917            else {
918                panic!(
919                    "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
920                    seq, checkpoint.content_digest
921                );
922            };
923
924            let cache = state.get_transaction_cache_reader();
925
926            let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
927            let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
928            let txns = cache
929                .try_multi_get_transaction_blocks(&tx_digests)
930                .expect("try_multi_get_transaction_blocks should not fail");
931
932            let txns: Vec<_> = itertools::izip!(txns, tx_digests, fx_digests)
933                .filter_map(|(tx, digest, fx)| {
934                    if let Some(tx) = tx {
935                        Some((tx, fx))
936                    } else {
937                        info!(
938                            "transaction {:?} not found during checkpoint re-execution",
939                            digest
940                        );
941                        None
942                    }
943                })
944                // end of epoch transaction can only be executed by CheckpointExecutor
945                .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
946                .map(|(tx, fx)| {
947                    (
948                        VerifiedExecutableTransaction::new_from_checkpoint(
949                            (*tx).clone(),
950                            epoch,
951                            seq,
952                        ),
953                        fx,
954                    )
955                })
956                .collect();
957
958            let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
959
960            info!(
961                ?seq,
962                ?tx_digests,
963                "Re-executing transactions for locally built checkpoint"
964            );
965            // this will panic if any re-execution diverges from the previously recorded
966            // effects digest
967            state.enqueue_with_expected_effects_digest(txns, epoch_store);
968
969            // a task that logs every so often until it is cancelled
970            // This should normally finish very quickly, so seeing this log more than once
971            // or twice is likely a sign of a problem.
972            let waiting_logger = tokio::task::spawn(async move {
973                let mut interval = tokio::time::interval(Duration::from_secs(1));
974                loop {
975                    interval.tick().await;
976                    warn!(?seq, "Still waiting for re-execution to complete");
977                }
978            });
979
980            cache
981                .try_notify_read_executed_effects_digests(&tx_digests)
982                .await
983                .expect("notify_read_executed_effects_digests should not fail");
984
985            waiting_logger.abort();
986            waiting_logger.await.ok();
987            info!(?seq, "Re-execution completed for locally built checkpoint");
988        }
989
990        info!("Re-execution of locally built checkpoints completed");
991    }
992}
993
994#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
995pub enum CheckpointWatermark {
996    HighestVerified,
997    HighestSynced,
998    HighestExecuted,
999    HighestPruned,
1000}
1001
1002pub struct CheckpointBuilder {
1003    state: Arc<AuthorityState>,
1004    store: Arc<CheckpointStore>,
1005    epoch_store: Arc<AuthorityPerEpochStore>,
1006    notify: Arc<Notify>,
1007    notify_aggregator: Arc<Notify>,
1008    last_built: watch::Sender<CheckpointSequenceNumber>,
1009    effects_store: Arc<dyn TransactionCacheRead>,
1010    accumulator: Weak<StateAccumulator>,
1011    output: Box<dyn CheckpointOutput>,
1012    metrics: Arc<CheckpointMetrics>,
1013    max_transactions_per_checkpoint: usize,
1014    max_checkpoint_size_bytes: usize,
1015}
1016
1017pub struct CheckpointAggregator {
1018    store: Arc<CheckpointStore>,
1019    epoch_store: Arc<AuthorityPerEpochStore>,
1020    notify: Arc<Notify>,
1021    current: Option<CheckpointSignatureAggregator>,
1022    output: Box<dyn CertifiedCheckpointOutput>,
1023    state: Arc<AuthorityState>,
1024    metrics: Arc<CheckpointMetrics>,
1025}
1026
1027// This holds information to aggregate signatures for one checkpoint
1028pub struct CheckpointSignatureAggregator {
1029    next_index: u64,
1030    summary: CheckpointSummary,
1031    digest: CheckpointDigest,
1032    /// Aggregates voting stake for each signed checkpoint proposal by authority
1033    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
1034    store: Arc<CheckpointStore>,
1035    state: Arc<AuthorityState>,
1036    metrics: Arc<CheckpointMetrics>,
1037}
1038
1039impl CheckpointBuilder {
1040    fn new(
1041        state: Arc<AuthorityState>,
1042        store: Arc<CheckpointStore>,
1043        epoch_store: Arc<AuthorityPerEpochStore>,
1044        notify: Arc<Notify>,
1045        effects_store: Arc<dyn TransactionCacheRead>,
1046        accumulator: Weak<StateAccumulator>,
1047        output: Box<dyn CheckpointOutput>,
1048        notify_aggregator: Arc<Notify>,
1049        last_built: watch::Sender<CheckpointSequenceNumber>,
1050        metrics: Arc<CheckpointMetrics>,
1051        max_transactions_per_checkpoint: usize,
1052        max_checkpoint_size_bytes: usize,
1053    ) -> Self {
1054        Self {
1055            state,
1056            store,
1057            epoch_store,
1058            notify,
1059            effects_store,
1060            accumulator,
1061            output,
1062            notify_aggregator,
1063            last_built,
1064            metrics,
1065            max_transactions_per_checkpoint,
1066            max_checkpoint_size_bytes,
1067        }
1068    }
1069
1070    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
1071    /// creation of checkpoints.
1072    async fn run(mut self) {
1073        info!("Starting CheckpointBuilder");
1074        loop {
1075            self.maybe_build_checkpoints().await;
1076
1077            self.notify.notified().await;
1078        }
1079    }
1080
1081    async fn maybe_build_checkpoints(&mut self) {
1082        let _scope = monitored_scope("BuildCheckpoints");
1083
1084        // Collect info about the most recently built checkpoint.
1085        let summary = self
1086            .epoch_store
1087            .last_built_checkpoint_builder_summary()
1088            .expect("epoch should not have ended");
1089        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
1090        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
1091
1092        let min_checkpoint_interval_ms = self
1093            .epoch_store
1094            .protocol_config()
1095            .min_checkpoint_interval_ms_as_option()
1096            .unwrap_or_default();
1097        let mut grouped_pending_checkpoints = Vec::new();
1098        let mut checkpoints_iter = self
1099            .epoch_store
1100            .get_pending_checkpoints(last_height)
1101            .expect("unexpected epoch store error")
1102            .into_iter()
1103            .peekable();
1104        while let Some((height, pending)) = checkpoints_iter.next() {
1105            // Group PendingCheckpoints until:
1106            // - minimum interval has elapsed ...
1107            let current_timestamp = pending.details().timestamp_ms;
1108            let can_build = match last_timestamp {
1109                    Some(last_timestamp) => {
1110                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
1111                    }
1112                    None => true,
1113                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
1114                //   should be written separately) ...
1115                } || checkpoints_iter
1116                    .peek()
1117                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
1118                // - or, we have reached end of epoch.
1119                    || pending.details().last_of_epoch;
1120            grouped_pending_checkpoints.push(pending);
1121            if !can_build {
1122                debug!(
1123                    checkpoint_commit_height = height,
1124                    ?last_timestamp,
1125                    ?current_timestamp,
1126                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
1127                );
1128                continue;
1129            }
1130
1131            // Min interval has elapsed, we can now coalesce and build a checkpoint.
1132            last_height = Some(height);
1133            last_timestamp = Some(current_timestamp);
1134            debug!(
1135                checkpoint_commit_height = height,
1136                "Making checkpoint at commit height"
1137            );
1138
1139            match self
1140                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
1141                .await
1142            {
1143                Ok(seq) => {
1144                    self.last_built.send_if_modified(|cur| {
1145                        // when rebuilding checkpoints at startup, seq can be for an old checkpoint
1146                        if seq > *cur {
1147                            *cur = seq;
1148                            true
1149                        } else {
1150                            false
1151                        }
1152                    });
1153                }
1154                Err(e) => {
1155                    error!("Error while making checkpoint, will retry in 1s: {:?}", e);
1156                    tokio::time::sleep(Duration::from_secs(1)).await;
1157                    self.metrics.checkpoint_errors.inc();
1158                    return;
1159                }
1160            }
1161            // Ensure that the task can be cancelled at end of epoch, even if no other await
1162            // yields execution.
1163            tokio::task::yield_now().await;
1164        }
1165        debug!(
1166            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
1167            grouped_pending_checkpoints.len(),
1168        );
1169    }
1170
1171    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1172    async fn make_checkpoint(
1173        &self,
1174        pendings: Vec<PendingCheckpoint>,
1175    ) -> anyhow::Result<CheckpointSequenceNumber> {
1176        let last_details = pendings.last().unwrap().details().clone();
1177
1178        // Keeps track of the effects that are already included in the current
1179        // checkpoint. This is used when there are multiple pending checkpoints
1180        // to create a single checkpoint because in such scenarios, dependencies
1181        // of a transaction may in earlier created checkpoints, or in earlier
1182        // pending checkpoints.
1183        let mut effects_in_current_checkpoint = BTreeSet::new();
1184
1185        // Stores the transactions that should be included in the checkpoint.
1186        // Transactions will be recorded in the checkpoint in this order.
1187        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1188        for pending_checkpoint in pendings.into_iter() {
1189            let pending = pending_checkpoint.into_v1();
1190            let txn_in_checkpoint = self
1191                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1192                .await?;
1193            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1194        }
1195        let new_checkpoints = self
1196            .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1197            .await?;
1198        let highest_sequence = *new_checkpoints.last().0.sequence_number();
1199        self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
1200            .await?;
1201        Ok(highest_sequence)
1202    }
1203
1204    // Given the root transactions of a pending checkpoint, resolve the transactions
1205    // should be included in the checkpoint, and return them in the order they
1206    // should be included in the checkpoint. `effects_in_current_checkpoint`
1207    // tracks the transactions that already exist in the current checkpoint.
1208    #[instrument(level = "debug", skip_all)]
1209    async fn resolve_checkpoint_transactions(
1210        &self,
1211        roots: Vec<TransactionKey>,
1212        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1213    ) -> IotaResult<Vec<TransactionEffects>> {
1214        self.metrics
1215            .checkpoint_roots_count
1216            .inc_by(roots.len() as u64);
1217
1218        let root_digests = self
1219            .epoch_store
1220            .notify_read_executed_digests(&roots)
1221            .in_monitored_scope("CheckpointNotifyDigests")
1222            .await?;
1223        let root_effects = self
1224            .effects_store
1225            .try_notify_read_executed_effects(&root_digests)
1226            .in_monitored_scope("CheckpointNotifyRead")
1227            .await?;
1228
1229        let _scope = monitored_scope("CheckpointBuilder");
1230
1231        let consensus_commit_prologue = {
1232            // If the roots contains consensus commit prologue transaction, we want to
1233            // extract it, and put it to the front of the checkpoint.
1234
1235            let consensus_commit_prologue = self
1236                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1237                .await?;
1238
1239            // Get the un-included dependencies of the consensus commit prologue. We should
1240            // expect no other dependencies that haven't been included in any
1241            // previous checkpoints.
1242            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1243                let unsorted_ccp = self.complete_checkpoint_effects(
1244                    vec![ccp_effects.clone()],
1245                    effects_in_current_checkpoint,
1246                )?;
1247
1248                // No other dependencies of this consensus commit prologue that haven't been
1249                // included in any previous checkpoint.
1250                if unsorted_ccp.len() != 1 {
1251                    fatal!(
1252                        "Expected 1 consensus commit prologue, got {:?}",
1253                        unsorted_ccp
1254                            .iter()
1255                            .map(|e| e.transaction_digest())
1256                            .collect::<Vec<_>>()
1257                    );
1258                }
1259                assert_eq!(unsorted_ccp.len(), 1);
1260                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1261            }
1262            consensus_commit_prologue
1263        };
1264
1265        let unsorted =
1266            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1267
1268        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1269        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1270        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1271            #[cfg(debug_assertions)]
1272            {
1273                // When consensus_commit_prologue is extracted, it should not be included in the
1274                // `unsorted`.
1275                for tx in unsorted.iter() {
1276                    assert!(tx.transaction_digest() != &_ccp_digest);
1277                }
1278            }
1279            sorted.push(ccp_effects);
1280        }
1281        sorted.extend(CausalOrder::causal_sort(unsorted));
1282
1283        #[cfg(msim)]
1284        {
1285            // Check consensus commit prologue invariants in sim test.
1286            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1287        }
1288
1289        Ok(sorted)
1290    }
1291
1292    // This function is used to extract the consensus commit prologue digest and
1293    // effects from the root transactions.
1294    // The consensus commit prologue is expected to be the first transaction in the
1295    // roots.
1296    async fn extract_consensus_commit_prologue(
1297        &self,
1298        root_digests: &[TransactionDigest],
1299        root_effects: &[TransactionEffects],
1300    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1301        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1302        if root_digests.is_empty() {
1303            return Ok(None);
1304        }
1305
1306        // Reads the first transaction in the roots, and checks whether it is a
1307        // consensus commit prologue transaction.
1308        // The consensus commit prologue transaction should be the first transaction
1309        // in the roots written by the consensus handler.
1310        let first_tx = self
1311            .state
1312            .get_transaction_cache_reader()
1313            .try_get_transaction_block(&root_digests[0])?
1314            .expect("Transaction block must exist");
1315
1316        Ok(match first_tx.transaction_data().kind() {
1317            TransactionKind::ConsensusCommitPrologueV1(_) => {
1318                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1319                Some((*first_tx.digest(), root_effects[0].clone()))
1320            }
1321            _ => None,
1322        })
1323    }
1324
1325    /// Writes the new checkpoints to the DB storage and processes them.
1326    #[instrument(level = "debug", skip_all)]
1327    async fn write_checkpoints(
1328        &self,
1329        height: CheckpointHeight,
1330        new_checkpoints: NonEmpty<(CheckpointSummary, CheckpointContents)>,
1331    ) -> IotaResult {
1332        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1333        let mut batch = self.store.tables.checkpoint_content.batch();
1334        let mut all_tx_digests =
1335            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1336
1337        for (summary, contents) in &new_checkpoints {
1338            debug!(
1339                checkpoint_commit_height = height,
1340                checkpoint_seq = summary.sequence_number,
1341                contents_digest = ?contents.digest(),
1342                "writing checkpoint",
1343            );
1344
1345            if let Some(previously_computed_summary) = self
1346                .store
1347                .tables
1348                .locally_computed_checkpoints
1349                .get(&summary.sequence_number)?
1350            {
1351                if previously_computed_summary != *summary {
1352                    // Panic so that we don't send out an equivocating checkpoint sig.
1353                    fatal!(
1354                        "Checkpoint {} was previously built with a different result: {previously_computed_summary:?} vs {summary:?}",
1355                        summary.sequence_number,
1356                    );
1357                }
1358            }
1359
1360            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1361
1362            self.metrics
1363                .transactions_included_in_checkpoint
1364                .inc_by(contents.size() as u64);
1365            let sequence_number = summary.sequence_number;
1366            self.metrics
1367                .last_constructed_checkpoint
1368                .set(sequence_number as i64);
1369
1370            batch.insert_batch(
1371                &self.store.tables.checkpoint_content,
1372                [(contents.digest(), contents)],
1373            )?;
1374
1375            batch.insert_batch(
1376                &self.store.tables.locally_computed_checkpoints,
1377                [(sequence_number, summary)],
1378            )?;
1379        }
1380
1381        batch.write()?;
1382
1383        // Send all checkpoint sigs to consensus.
1384        for (summary, contents) in &new_checkpoints {
1385            self.output
1386                .checkpoint_created(summary, contents, &self.epoch_store, &self.store)
1387                .await?;
1388        }
1389
1390        for (local_checkpoint, _) in &new_checkpoints {
1391            if let Some(certified_checkpoint) = self
1392                .store
1393                .tables
1394                .certified_checkpoints
1395                .get(local_checkpoint.sequence_number())?
1396            {
1397                self.store
1398                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1399            }
1400        }
1401
1402        self.notify_aggregator.notify_one();
1403        self.epoch_store
1404            .process_constructed_checkpoint(height, new_checkpoints);
1405        Ok(())
1406    }
1407
1408    #[expect(clippy::type_complexity)]
1409    fn split_checkpoint_chunks(
1410        &self,
1411        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1412        signatures: Vec<Vec<GenericSignature>>,
1413    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1414        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1415        let mut chunks = Vec::new();
1416        let mut chunk = Vec::new();
1417        let mut chunk_size: usize = 0;
1418        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1419            .into_iter()
1420            .zip(signatures.into_iter())
1421        {
1422            // Roll over to a new chunk after either max count or max size is reached.
1423            // The size calculation here is intended to estimate the size of the
1424            // FullCheckpointContents struct. If this code is modified, that struct
1425            // should also be updated accordingly.
1426            let size = transaction_size
1427                + bcs::serialized_size(&effects)?
1428                + bcs::serialized_size(&signatures)?;
1429            if chunk.len() == self.max_transactions_per_checkpoint
1430                || (chunk_size + size) > self.max_checkpoint_size_bytes
1431            {
1432                if chunk.is_empty() {
1433                    // Always allow at least one tx in a checkpoint.
1434                    warn!(
1435                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1436                        self.max_checkpoint_size_bytes
1437                    );
1438                } else {
1439                    chunks.push(chunk);
1440                    chunk = Vec::new();
1441                    chunk_size = 0;
1442                }
1443            }
1444
1445            chunk.push((effects, signatures));
1446            chunk_size += size;
1447        }
1448
1449        if !chunk.is_empty() || chunks.is_empty() {
1450            // We intentionally create an empty checkpoint if there is no content provided
1451            // to make a 'heartbeat' checkpoint.
1452            // Important: if some conditions are added here later, we need to make sure we
1453            // always have at least one chunk if last_pending_of_epoch is set
1454            chunks.push(chunk);
1455            // Note: empty checkpoints are ok - they shouldn't happen at all on
1456            // a network with even modest load. Even if they do
1457            // happen, it is still useful as it allows fullnodes to
1458            // distinguish between "no transactions have happened" and "i am not
1459            // receiving new checkpoints".
1460        }
1461        Ok(chunks)
1462    }
1463
1464    /// Creates checkpoints using the provided transaction effects and pending
1465    /// checkpoint information.
1466    fn load_last_built_checkpoint_summary(
1467        epoch_store: &AuthorityPerEpochStore,
1468        store: &CheckpointStore,
1469    ) -> IotaResult<Option<(CheckpointSequenceNumber, CheckpointSummary)>> {
1470        let mut last_checkpoint = epoch_store.last_built_checkpoint_summary()?;
1471        if last_checkpoint.is_none() {
1472            let epoch = epoch_store.epoch();
1473            if epoch > 0 {
1474                let previous_epoch = epoch - 1;
1475                let last_verified = store.get_epoch_last_checkpoint(previous_epoch)?;
1476                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1477                if let Some((ref seq, _)) = last_checkpoint {
1478                    debug!(
1479                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1480                    );
1481                } else {
1482                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1483                    // via panic
1484                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1485                }
1486            }
1487        }
1488        Ok(last_checkpoint)
1489    }
1490
1491    #[instrument(level = "debug", skip_all)]
1492    async fn create_checkpoints(
1493        &self,
1494        all_effects: Vec<TransactionEffects>,
1495        details: &PendingCheckpointInfo,
1496    ) -> anyhow::Result<NonEmpty<(CheckpointSummary, CheckpointContents)>> {
1497        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1498        let total = all_effects.len();
1499        let mut last_checkpoint =
1500            Self::load_last_built_checkpoint_summary(&self.epoch_store, &self.store)?;
1501        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1502        debug!(
1503            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1504            checkpoint_timestamp = details.timestamp_ms,
1505            "Creating checkpoint(s) for {} transactions",
1506            all_effects.len(),
1507        );
1508
1509        let all_digests: Vec<_> = all_effects
1510            .iter()
1511            .map(|effect| *effect.transaction_digest())
1512            .collect();
1513        let transactions_and_sizes = self
1514            .state
1515            .get_transaction_cache_reader()
1516            .try_get_transactions_and_serialized_sizes(&all_digests)?;
1517        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1518        let mut transactions = Vec::with_capacity(all_effects.len());
1519        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1520        let mut randomness_rounds = BTreeMap::new();
1521        {
1522            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1523            debug!(
1524                ?last_checkpoint_seq,
1525                "Waiting for {:?} certificates to appear in consensus",
1526                all_effects.len()
1527            );
1528
1529            for (effects, transaction_and_size) in all_effects
1530                .into_iter()
1531                .zip(transactions_and_sizes.into_iter())
1532            {
1533                let (transaction, size) = transaction_and_size
1534                    .unwrap_or_else(|| panic!("Could not find executed transaction {effects:?}"));
1535                match transaction.inner().transaction_data().kind() {
1536                    TransactionKind::ConsensusCommitPrologueV1(_)
1537                    | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1538                        // ConsensusCommitPrologue and
1539                        // AuthenticatorStateUpdateV1
1540                        // are guaranteed to be
1541                        // processed before we reach here.
1542                    }
1543                    TransactionKind::RandomnessStateUpdate(rsu) => {
1544                        randomness_rounds
1545                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1546                    }
1547                    _ => {
1548                        // All other tx should be included in the call to
1549                        // `consensus_messages_processed_notify`.
1550                        transaction_keys.push(SequencedConsensusTransactionKey::External(
1551                            ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1552                        ));
1553                    }
1554                }
1555                transactions.push(transaction);
1556                all_effects_and_transaction_sizes.push((effects, size));
1557            }
1558
1559            self.epoch_store
1560                .consensus_messages_processed_notify(transaction_keys)
1561                .await?;
1562        }
1563
1564        let signatures = self
1565            .epoch_store
1566            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1567        debug!(
1568            ?last_checkpoint_seq,
1569            "Received {} checkpoint user signatures from consensus",
1570            signatures.len()
1571        );
1572
1573        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1574        let chunks_count = chunks.len();
1575
1576        let mut checkpoints = Vec::with_capacity(chunks_count);
1577        debug!(
1578            ?last_checkpoint_seq,
1579            "Creating {} checkpoints with {} transactions", chunks_count, total,
1580        );
1581
1582        let epoch = self.epoch_store.epoch();
1583        for (index, transactions) in chunks.into_iter().enumerate() {
1584            let first_checkpoint_of_epoch = index == 0
1585                && last_checkpoint
1586                    .as_ref()
1587                    .map(|(_, c)| c.epoch != epoch)
1588                    .unwrap_or(true);
1589            if first_checkpoint_of_epoch {
1590                self.epoch_store
1591                    .record_epoch_first_checkpoint_creation_time_metric();
1592            }
1593            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1594
1595            let sequence_number = last_checkpoint
1596                .as_ref()
1597                .map(|(_, c)| c.sequence_number + 1)
1598                .unwrap_or_default();
1599            let timestamp_ms = details.timestamp_ms;
1600            if let Some((_, last_checkpoint)) = &last_checkpoint {
1601                if last_checkpoint.timestamp_ms > timestamp_ms {
1602                    error!(
1603                        "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1604                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1605                    );
1606                }
1607            }
1608
1609            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1610            let epoch_rolling_gas_cost_summary =
1611                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1612
1613            let end_of_epoch_data = if last_checkpoint_of_epoch {
1614                let (system_state_obj, system_epoch_info_event) = self
1615                    .augment_epoch_last_checkpoint(
1616                        &epoch_rolling_gas_cost_summary,
1617                        timestamp_ms,
1618                        &mut effects,
1619                        &mut signatures,
1620                        sequence_number,
1621                    )
1622                    .await?;
1623
1624                // The system epoch info event can be `None` in case if the `advance_epoch`
1625                // Move function call failed and was executed in the safe mode.
1626                // In this case, the tokens supply should be unchanged.
1627                //
1628                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1629                // to those small numbers, no overflows will occur during conversion or
1630                // subtraction.
1631                let epoch_supply_change =
1632                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1633
1634                let committee = system_state_obj
1635                    .get_current_epoch_committee()
1636                    .committee()
1637                    .clone();
1638
1639                // This must happen after the call to augment_epoch_last_checkpoint,
1640                // otherwise we will not capture the change_epoch tx.
1641                let root_state_digest = {
1642                    let state_acc = self
1643                        .accumulator
1644                        .upgrade()
1645                        .expect("No checkpoints should be getting built after local configuration");
1646                    let acc = state_acc.accumulate_checkpoint(
1647                        &effects,
1648                        sequence_number,
1649                        &self.epoch_store,
1650                    )?;
1651
1652                    state_acc
1653                        .wait_for_previous_running_root(&self.epoch_store, sequence_number)
1654                        .await?;
1655
1656                    state_acc.accumulate_running_root(
1657                        &self.epoch_store,
1658                        sequence_number,
1659                        Some(acc),
1660                    )?;
1661                    state_acc
1662                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1663                        .await?
1664                };
1665                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1666                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1667
1668                let epoch_commitments = vec![root_state_digest.into()];
1669
1670                Some(EndOfEpochData {
1671                    next_epoch_committee: committee.voting_rights,
1672                    next_epoch_protocol_version: ProtocolVersion::new(
1673                        system_state_obj.protocol_version(),
1674                    ),
1675                    epoch_commitments,
1676                    epoch_supply_change,
1677                })
1678            } else {
1679                None
1680            };
1681            let contents = CheckpointContents::new_with_digests_and_signatures(
1682                effects.iter().map(TransactionEffects::execution_digests),
1683                signatures,
1684            );
1685
1686            let num_txns = contents.size() as u64;
1687
1688            let network_total_transactions = last_checkpoint
1689                .as_ref()
1690                .map(|(_, c)| c.network_total_transactions + num_txns)
1691                .unwrap_or(num_txns);
1692
1693            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1694
1695            let matching_randomness_rounds: Vec<_> = effects
1696                .iter()
1697                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1698                .copied()
1699                .collect();
1700
1701            let summary = CheckpointSummary::new(
1702                self.epoch_store.protocol_config(),
1703                epoch,
1704                sequence_number,
1705                network_total_transactions,
1706                &contents,
1707                previous_digest,
1708                epoch_rolling_gas_cost_summary,
1709                end_of_epoch_data,
1710                timestamp_ms,
1711                matching_randomness_rounds,
1712            );
1713            summary.report_checkpoint_age(&self.metrics.last_created_checkpoint_age);
1714            if last_checkpoint_of_epoch {
1715                info!(
1716                    checkpoint_seq = sequence_number,
1717                    "creating last checkpoint of epoch {}", epoch
1718                );
1719                if let Some(stats) = self.store.get_epoch_stats(epoch, &summary) {
1720                    self.epoch_store
1721                        .report_epoch_metrics_at_last_checkpoint(stats);
1722                }
1723            }
1724            last_checkpoint = Some((sequence_number, summary.clone()));
1725            checkpoints.push((summary, contents));
1726        }
1727
1728        Ok(NonEmpty::from_vec(checkpoints).expect("at least one checkpoint"))
1729    }
1730
1731    fn get_epoch_total_gas_cost(
1732        &self,
1733        last_checkpoint: Option<&CheckpointSummary>,
1734        cur_checkpoint_effects: &[TransactionEffects],
1735    ) -> GasCostSummary {
1736        let (previous_epoch, previous_gas_costs) = last_checkpoint
1737            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1738            .unwrap_or_default();
1739        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1740        if previous_epoch == self.epoch_store.epoch() {
1741            // sum only when we are within the same epoch
1742            GasCostSummary::new(
1743                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1744                previous_gas_costs.computation_cost_burned
1745                    + current_gas_costs.computation_cost_burned,
1746                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1747                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1748                previous_gas_costs.non_refundable_storage_fee
1749                    + current_gas_costs.non_refundable_storage_fee,
1750            )
1751        } else {
1752            current_gas_costs
1753        }
1754    }
1755
1756    /// Augments the last checkpoint of the epoch by creating and executing an
1757    /// advance epoch transaction.
1758    #[instrument(level = "error", skip_all)]
1759    async fn augment_epoch_last_checkpoint(
1760        &self,
1761        epoch_total_gas_cost: &GasCostSummary,
1762        epoch_start_timestamp_ms: CheckpointTimestamp,
1763        checkpoint_effects: &mut Vec<TransactionEffects>,
1764        signatures: &mut Vec<Vec<GenericSignature>>,
1765        checkpoint: CheckpointSequenceNumber,
1766    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1767        let (system_state, system_epoch_info_event, effects) = self
1768            .state
1769            .create_and_execute_advance_epoch_tx(
1770                &self.epoch_store,
1771                epoch_total_gas_cost,
1772                checkpoint,
1773                epoch_start_timestamp_ms,
1774            )
1775            .await?;
1776        checkpoint_effects.push(effects);
1777        signatures.push(vec![]);
1778        Ok((system_state, system_epoch_info_event))
1779    }
1780
1781    /// For the given roots return complete list of effects to include in
1782    /// checkpoint This list includes the roots and all their dependencies,
1783    /// which are not part of checkpoint already. Note that this function
1784    /// may be called multiple times to construct the checkpoint.
1785    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1786    /// that are already included in the checkpoint. Txs in `roots` that
1787    /// need to be included in the checkpoint will be added to
1788    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1789    #[instrument(level = "debug", skip_all)]
1790    fn complete_checkpoint_effects(
1791        &self,
1792        mut roots: Vec<TransactionEffects>,
1793        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1794    ) -> IotaResult<Vec<TransactionEffects>> {
1795        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1796        let mut results = vec![];
1797        let mut seen = HashSet::new();
1798        loop {
1799            let mut pending = HashSet::new();
1800
1801            let transactions_included = self
1802                .epoch_store
1803                .builder_included_transactions_in_checkpoint(
1804                    roots.iter().map(|e| e.transaction_digest()),
1805                )?;
1806
1807            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1808                let digest = effect.transaction_digest();
1809                // Unnecessary to read effects of a dependency if the effect is already
1810                // processed.
1811                seen.insert(*digest);
1812
1813                // Skip roots that are already included in the checkpoint.
1814                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1815                    continue;
1816                }
1817
1818                // Skip roots already included in checkpoints or roots from previous epochs
1819                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1820                    continue;
1821                }
1822
1823                let existing_effects = self
1824                    .epoch_store
1825                    .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1826
1827                for (dependency, effects_signature_exists) in
1828                    effect.dependencies().iter().zip(existing_effects.iter())
1829                {
1830                    // Skip here if dependency not executed in the current epoch.
1831                    // Note that the existence of an effects signature in the
1832                    // epoch store for the given digest indicates that the transaction
1833                    // was locally executed in the current epoch
1834                    if !effects_signature_exists {
1835                        continue;
1836                    }
1837                    if seen.insert(*dependency) {
1838                        pending.insert(*dependency);
1839                    }
1840                }
1841                results.push(effect);
1842            }
1843            if pending.is_empty() {
1844                break;
1845            }
1846            let pending = pending.into_iter().collect::<Vec<_>>();
1847            let effects = self
1848                .effects_store
1849                .try_multi_get_executed_effects(&pending)?;
1850            let effects = effects
1851                .into_iter()
1852                .zip(pending)
1853                .map(|(opt, digest)| match opt {
1854                    Some(x) => x,
1855                    None => panic!(
1856                        "Can not find effect for transaction {digest:?}, however transaction that depend on it was already executed"
1857                    ),
1858                })
1859                .collect::<Vec<_>>();
1860            roots = effects;
1861        }
1862
1863        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1864        Ok(results)
1865    }
1866
1867    // This function is used to check the invariants of the consensus commit
1868    // prologue transactions in the checkpoint in simtest.
1869    #[cfg(msim)]
1870    fn expensive_consensus_commit_prologue_invariants_check(
1871        &self,
1872        root_digests: &[TransactionDigest],
1873        sorted: &[TransactionEffects],
1874    ) {
1875        // Gets all the consensus commit prologue transactions from the roots.
1876        let root_txs = self
1877            .state
1878            .get_transaction_cache_reader()
1879            .multi_get_transaction_blocks(root_digests);
1880        let ccps = root_txs
1881            .iter()
1882            .filter_map(|tx| {
1883                tx.as_ref().filter(|tx| {
1884                    matches!(
1885                        tx.transaction_data().kind(),
1886                        TransactionKind::ConsensusCommitPrologueV1(_)
1887                    )
1888                })
1889            })
1890            .collect::<Vec<_>>();
1891
1892        // There should be at most one consensus commit prologue transaction in the
1893        // roots.
1894        assert!(ccps.len() <= 1);
1895
1896        // Get all the transactions in the checkpoint.
1897        let txs = self
1898            .state
1899            .get_transaction_cache_reader()
1900            .multi_get_transaction_blocks(
1901                &sorted
1902                    .iter()
1903                    .map(|tx| *tx.transaction_digest())
1904                    .collect::<Vec<_>>(),
1905            );
1906
1907        if ccps.is_empty() {
1908            // If there is no consensus commit prologue transaction in the roots, then there
1909            // should be no consensus commit prologue transaction in the
1910            // checkpoint.
1911            for tx in txs.iter().flatten() {
1912                assert!(!matches!(
1913                    tx.transaction_data().kind(),
1914                    TransactionKind::ConsensusCommitPrologueV1(_)
1915                ));
1916            }
1917        } else {
1918            // If there is one consensus commit prologue, it must be the first one in the
1919            // checkpoint.
1920            assert!(matches!(
1921                txs[0].as_ref().unwrap().transaction_data().kind(),
1922                TransactionKind::ConsensusCommitPrologueV1(_)
1923            ));
1924
1925            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1926
1927            for tx in txs.iter().skip(1).flatten() {
1928                assert!(!matches!(
1929                    tx.transaction_data().kind(),
1930                    TransactionKind::ConsensusCommitPrologueV1(_)
1931                ));
1932            }
1933        }
1934    }
1935}
1936
1937impl CheckpointAggregator {
1938    fn new(
1939        tables: Arc<CheckpointStore>,
1940        epoch_store: Arc<AuthorityPerEpochStore>,
1941        notify: Arc<Notify>,
1942        output: Box<dyn CertifiedCheckpointOutput>,
1943        state: Arc<AuthorityState>,
1944        metrics: Arc<CheckpointMetrics>,
1945    ) -> Self {
1946        let current = None;
1947        Self {
1948            store: tables,
1949            epoch_store,
1950            notify,
1951            current,
1952            output,
1953            state,
1954            metrics,
1955        }
1956    }
1957
1958    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1959    /// aggregation of checkpoints.
1960    /// The function ensures continuous aggregation of checkpoints, handling
1961    /// errors and retries gracefully, and allowing for proper shutdown on
1962    /// receiving an exit signal.
1963    async fn run(mut self) {
1964        info!("Starting CheckpointAggregator");
1965        loop {
1966            if let Err(e) = self.run_and_notify().await {
1967                error!(
1968                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1969                    e
1970                );
1971                self.metrics.checkpoint_errors.inc();
1972                tokio::time::sleep(Duration::from_secs(1)).await;
1973                continue;
1974            }
1975
1976            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1977        }
1978    }
1979
1980    async fn run_and_notify(&mut self) -> IotaResult {
1981        let summaries = self.run_inner()?;
1982        for summary in summaries {
1983            self.output.certified_checkpoint_created(&summary).await?;
1984        }
1985        Ok(())
1986    }
1987
1988    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1989        let _scope = monitored_scope("CheckpointAggregator");
1990        let mut result = vec![];
1991        'outer: loop {
1992            let next_to_certify = self.next_checkpoint_to_certify()?;
1993            let current = if let Some(current) = &mut self.current {
1994                // It's possible that the checkpoint was already certified by
1995                // the rest of the network and we've already received the
1996                // certified checkpoint via StateSync. In this case, we reset
1997                // the current signature aggregator to the next checkpoint to
1998                // be certified
1999                if current.summary.sequence_number < next_to_certify {
2000                    self.current = None;
2001                    continue;
2002                }
2003                current
2004            } else {
2005                let Some(summary) = self
2006                    .epoch_store
2007                    .get_built_checkpoint_summary(next_to_certify)?
2008                else {
2009                    return Ok(result);
2010                };
2011                self.current = Some(CheckpointSignatureAggregator {
2012                    next_index: 0,
2013                    digest: summary.digest(),
2014                    summary,
2015                    signatures_by_digest: MultiStakeAggregator::new(
2016                        self.epoch_store.committee().clone(),
2017                    ),
2018                    store: self.store.clone(),
2019                    state: self.state.clone(),
2020                    metrics: self.metrics.clone(),
2021                });
2022                self.current.as_mut().unwrap()
2023            };
2024
2025            let epoch_tables = self
2026                .epoch_store
2027                .tables()
2028                .expect("should not run past end of epoch");
2029            let iter = epoch_tables
2030                .pending_checkpoint_signatures
2031                .safe_iter_with_bounds(
2032                    Some((current.summary.sequence_number, current.next_index)),
2033                    None,
2034                );
2035            for item in iter {
2036                let ((seq, index), data) = item?;
2037                if seq != current.summary.sequence_number {
2038                    trace!(
2039                        checkpoint_seq =? current.summary.sequence_number,
2040                        "Not enough checkpoint signatures",
2041                    );
2042                    // No more signatures (yet) for this checkpoint
2043                    return Ok(result);
2044                }
2045                trace!(
2046                    checkpoint_seq = current.summary.sequence_number,
2047                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
2048                    current.summary.digest(),
2049                    data.summary.auth_sig().authority.concise()
2050                );
2051                self.metrics
2052                    .checkpoint_participation
2053                    .with_label_values(&[&format!(
2054                        "{:?}",
2055                        data.summary.auth_sig().authority.concise()
2056                    )])
2057                    .inc();
2058                if let Ok(auth_signature) = current.try_aggregate(data) {
2059                    debug!(
2060                        checkpoint_seq = current.summary.sequence_number,
2061                        "Successfully aggregated signatures for checkpoint (digest: {:?})",
2062                        current.summary.digest(),
2063                    );
2064                    let summary = VerifiedCheckpoint::new_unchecked(
2065                        CertifiedCheckpointSummary::new_from_data_and_sig(
2066                            current.summary.clone(),
2067                            auth_signature,
2068                        ),
2069                    );
2070
2071                    self.store.insert_certified_checkpoint(&summary)?;
2072                    self.metrics
2073                        .last_certified_checkpoint
2074                        .set(current.summary.sequence_number as i64);
2075                    current
2076                        .summary
2077                        .report_checkpoint_age(&self.metrics.last_certified_checkpoint_age);
2078                    result.push(summary.into_inner());
2079                    self.current = None;
2080                    continue 'outer;
2081                } else {
2082                    current.next_index = index + 1;
2083                }
2084            }
2085            break;
2086        }
2087        Ok(result)
2088    }
2089
2090    fn next_checkpoint_to_certify(&self) -> IotaResult<CheckpointSequenceNumber> {
2091        Ok(self
2092            .store
2093            .tables
2094            .certified_checkpoints
2095            .reversed_safe_iter_with_bounds(None, None)?
2096            .next()
2097            .transpose()?
2098            .map(|(seq, _)| seq + 1)
2099            .unwrap_or_default())
2100    }
2101}
2102
2103impl CheckpointSignatureAggregator {
2104    #[expect(clippy::result_unit_err)]
2105    pub fn try_aggregate(
2106        &mut self,
2107        data: CheckpointSignatureMessage,
2108    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
2109        let their_digest = *data.summary.digest();
2110        let (_, signature) = data.summary.into_data_and_sig();
2111        let author = signature.authority;
2112        let envelope =
2113            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
2114        match self.signatures_by_digest.insert(their_digest, envelope) {
2115            // ignore repeated signatures
2116            InsertResult::Failed {
2117                error:
2118                    IotaError::StakeAggregatorRepeatedSigner {
2119                        conflicting_sig: false,
2120                        ..
2121                    },
2122            } => Err(()),
2123            InsertResult::Failed { error } => {
2124                warn!(
2125                    checkpoint_seq = self.summary.sequence_number,
2126                    "Failed to aggregate new signature from validator {:?}: {:?}",
2127                    author.concise(),
2128                    error
2129                );
2130                self.check_for_split_brain();
2131                Err(())
2132            }
2133            InsertResult::QuorumReached(cert) => {
2134                // It is not guaranteed that signature.authority == consensus_cert.author, but
2135                // we do verify the signature so we know that the author signed
2136                // the message at some point.
2137                if their_digest != self.digest {
2138                    self.metrics.remote_checkpoint_forks.inc();
2139                    warn!(
2140                        checkpoint_seq = self.summary.sequence_number,
2141                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
2142                        author.concise(),
2143                        their_digest,
2144                        self.digest
2145                    );
2146                    return Err(());
2147                }
2148                Ok(cert)
2149            }
2150            InsertResult::NotEnoughVotes {
2151                bad_votes: _,
2152                bad_authorities: _,
2153            } => {
2154                self.check_for_split_brain();
2155                Err(())
2156            }
2157        }
2158    }
2159
2160    /// Check if there is a split brain condition in checkpoint signature
2161    /// aggregation, defined as any state wherein it is no longer possible
2162    /// to achieve quorum on a checkpoint proposal, irrespective of the
2163    /// outcome of any outstanding votes.
2164    fn check_for_split_brain(&self) {
2165        debug!(
2166            checkpoint_seq = self.summary.sequence_number,
2167            "Checking for split brain condition"
2168        );
2169        if self.signatures_by_digest.quorum_unreachable() {
2170            // TODO: at this point we should immediately halt processing
2171            // of new transaction certificates to avoid building on top of
2172            // forked output
2173            // self.halt_all_execution();
2174
2175            let digests_by_stake_messages = self
2176                .signatures_by_digest
2177                .get_all_unique_values()
2178                .into_iter()
2179                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
2180                .map(|(digest, (_authorities, total_stake))| {
2181                    format!("{digest:?} (total stake: {total_stake})")
2182                })
2183                .collect::<Vec<String>>();
2184            error!(
2185                checkpoint_seq = self.summary.sequence_number,
2186                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
2187                self.signatures_by_digest.uncommitted_stake(),
2188                digests_by_stake_messages,
2189            );
2190            self.metrics.split_brain_checkpoint_forks.inc();
2191
2192            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
2193            let local_summary = self.summary.clone();
2194            let state = self.state.clone();
2195            let tables = self.store.clone();
2196
2197            tokio::spawn(async move {
2198                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
2199            });
2200        }
2201    }
2202}
2203
2204/// Create data dump containing relevant data for diagnosing cause of the
2205/// split brain by querying one disagreeing validator for full checkpoint
2206/// contents. To minimize peer chatter, we only query one validator at random
2207/// from each disagreeing faction, as all honest validators that participated in
2208/// this round may inevitably run the same process.
2209async fn diagnose_split_brain(
2210    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
2211    local_summary: CheckpointSummary,
2212    state: Arc<AuthorityState>,
2213    tables: Arc<CheckpointStore>,
2214) {
2215    debug!(
2216        checkpoint_seq = local_summary.sequence_number,
2217        "Running split brain diagnostics..."
2218    );
2219    let time = SystemTime::now();
2220    // collect one random disagreeing validator per differing digest
2221    let digest_to_validator = all_unique_values
2222        .iter()
2223        .filter_map(|(digest, (validators, _))| {
2224            if *digest != local_summary.digest() {
2225                let random_validator = validators.choose(&mut OsRng).unwrap();
2226                Some((*digest, *random_validator))
2227            } else {
2228                None
2229            }
2230        })
2231        .collect::<HashMap<_, _>>();
2232    if digest_to_validator.is_empty() {
2233        panic!(
2234            "Given split brain condition, there should be at \
2235                least one validator that disagrees with local signature"
2236        );
2237    }
2238
2239    let epoch_store = state.load_epoch_store_one_call_per_task();
2240    let committee = epoch_store
2241        .epoch_start_state()
2242        .get_iota_committee_with_network_metadata();
2243    let network_config = default_iota_network_config();
2244    let network_clients =
2245        make_network_authority_clients_with_network_config(&committee, &network_config);
2246
2247    // Query all disagreeing validators
2248    let response_futures = digest_to_validator
2249        .values()
2250        .cloned()
2251        .map(|validator| {
2252            let client = network_clients
2253                .get(&validator)
2254                .expect("Failed to get network client");
2255            let request = CheckpointRequest {
2256                sequence_number: Some(local_summary.sequence_number),
2257                request_content: true,
2258                certified: false,
2259            };
2260            client.handle_checkpoint(request)
2261        })
2262        .collect::<Vec<_>>();
2263
2264    let digest_name_pair = digest_to_validator.iter();
2265    let response_data = futures::future::join_all(response_futures)
2266        .await
2267        .into_iter()
2268        .zip(digest_name_pair)
2269        .filter_map(|(response, (digest, name))| match response {
2270            Ok(response) => match response {
2271                CheckpointResponse {
2272                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2273                    contents: Some(contents),
2274                } => Some((*name, *digest, summary, contents)),
2275                CheckpointResponse {
2276                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2277                    contents: _,
2278                } => {
2279                    panic!("Expected pending checkpoint, but got certified checkpoint");
2280                }
2281                CheckpointResponse {
2282                    checkpoint: None,
2283                    contents: _,
2284                } => {
2285                    error!(
2286                        "Summary for checkpoint {:?} not found on validator {:?}",
2287                        local_summary.sequence_number, name
2288                    );
2289                    None
2290                }
2291                CheckpointResponse {
2292                    checkpoint: _,
2293                    contents: None,
2294                } => {
2295                    error!(
2296                        "Contents for checkpoint {:?} not found on validator {:?}",
2297                        local_summary.sequence_number, name
2298                    );
2299                    None
2300                }
2301            },
2302            Err(e) => {
2303                error!(
2304                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2305                    e
2306                );
2307                None
2308            }
2309        })
2310        .collect::<Vec<_>>();
2311
2312    let local_checkpoint_contents = tables
2313        .get_checkpoint_contents(&local_summary.content_digest)
2314        .unwrap_or_else(|_| {
2315            panic!(
2316                "Could not find checkpoint contents for digest {:?}",
2317                local_summary.digest()
2318            )
2319        })
2320        .unwrap_or_else(|| {
2321            panic!(
2322                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2323                local_summary.sequence_number,
2324                local_summary.digest()
2325            )
2326        });
2327    let local_contents_text = format!("{local_checkpoint_contents:?}");
2328
2329    let local_summary_text = format!("{local_summary:?}");
2330    let local_validator = state.name.concise();
2331    let diff_patches = response_data
2332        .iter()
2333        .map(|(name, other_digest, other_summary, contents)| {
2334            let other_contents_text = format!("{contents:?}");
2335            let other_summary_text = format!("{other_summary:?}");
2336            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2337                .enumerate_transactions(&local_summary)
2338                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2339                .unzip();
2340            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2341                .enumerate_transactions(other_summary)
2342                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2343                .unzip();
2344            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2345            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2346            let local_transactions_text = format!("{local_transactions:#?}");
2347            let other_transactions_text = format!("{other_transactions:#?}");
2348            let transactions_patch =
2349                create_patch(&local_transactions_text, &other_transactions_text);
2350            let local_effects_text = format!("{local_effects:#?}");
2351            let other_effects_text = format!("{other_effects:#?}");
2352            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2353            let seq_number = local_summary.sequence_number;
2354            let local_digest = local_summary.digest();
2355            let other_validator = name.concise();
2356            format!(
2357                "Checkpoint: {seq_number:?}\n\
2358                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2359                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2360                Summary Diff: \n{summary_patch}\n\n\
2361                Contents Diff: \n{contents_patch}\n\n\
2362                Transactions Diff: \n{transactions_patch}\n\n\
2363                Effects Diff: \n{effects_patch}",
2364            )
2365        })
2366        .collect::<Vec<_>>()
2367        .join("\n\n\n");
2368
2369    let header = format!(
2370        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2371        Datetime: {time:?}"
2372    );
2373    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2374    let path = tempfile::tempdir()
2375        .expect("Failed to create tempdir")
2376        .keep()
2377        .join(Path::new("checkpoint_fork_dump.txt"));
2378    let mut file = File::create(path).unwrap();
2379    write!(file, "{fork_logs_text}").unwrap();
2380    debug!("{}", fork_logs_text);
2381
2382    fail_point!("split_brain_reached");
2383}
2384
2385pub trait CheckpointServiceNotify {
2386    fn notify_checkpoint_signature(
2387        &self,
2388        epoch_store: &AuthorityPerEpochStore,
2389        info: &CheckpointSignatureMessage,
2390    ) -> IotaResult;
2391
2392    fn notify_checkpoint(&self) -> IotaResult;
2393}
2394
2395enum CheckpointServiceState {
2396    Unstarted(Box<(CheckpointBuilder, CheckpointAggregator)>),
2397    Started,
2398}
2399
2400impl CheckpointServiceState {
2401    fn take_unstarted(&mut self) -> (CheckpointBuilder, CheckpointAggregator) {
2402        let mut state = CheckpointServiceState::Started;
2403        std::mem::swap(self, &mut state);
2404
2405        match state {
2406            CheckpointServiceState::Unstarted(tup) => (tup.0, tup.1),
2407            CheckpointServiceState::Started => panic!("CheckpointServiceState is already started"),
2408        }
2409    }
2410}
2411
2412pub struct CheckpointService {
2413    tables: Arc<CheckpointStore>,
2414    notify_builder: Arc<Notify>,
2415    notify_aggregator: Arc<Notify>,
2416    last_signature_index: Mutex<u64>,
2417    // A notification for the current highest built sequence number.
2418    highest_currently_built_seq_tx: watch::Sender<CheckpointSequenceNumber>,
2419    // The highest sequence number that had already been built at the time CheckpointService
2420    // was constructed
2421    highest_previously_built_seq: CheckpointSequenceNumber,
2422    metrics: Arc<CheckpointMetrics>,
2423    state: Mutex<CheckpointServiceState>,
2424}
2425
2426impl CheckpointService {
2427    /// Spawns the checkpoint service, initializing and starting the checkpoint
2428    /// builder and aggregator tasks.
2429    /// Constructs a new CheckpointService in an un-started state.
2430    pub fn build(
2431        state: Arc<AuthorityState>,
2432        checkpoint_store: Arc<CheckpointStore>,
2433        epoch_store: Arc<AuthorityPerEpochStore>,
2434        effects_store: Arc<dyn TransactionCacheRead>,
2435        accumulator: Weak<StateAccumulator>,
2436        checkpoint_output: Box<dyn CheckpointOutput>,
2437        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2438        metrics: Arc<CheckpointMetrics>,
2439        max_transactions_per_checkpoint: usize,
2440        max_checkpoint_size_bytes: usize,
2441    ) -> Arc<Self> {
2442        info!(
2443            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2444        );
2445        let notify_builder = Arc::new(Notify::new());
2446        let notify_aggregator = Arc::new(Notify::new());
2447
2448        // We may have built higher checkpoint numbers before restarting.
2449        let highest_previously_built_seq = checkpoint_store
2450            .get_latest_locally_computed_checkpoint()
2451            .expect("failed to get latest locally computed checkpoint")
2452            .map(|s| s.sequence_number)
2453            .unwrap_or(0);
2454
2455        let highest_currently_built_seq =
2456            CheckpointBuilder::load_last_built_checkpoint_summary(&epoch_store, &checkpoint_store)
2457                .expect("epoch should not have ended")
2458                .map(|(seq, _)| seq)
2459                .unwrap_or(0);
2460
2461        let (highest_currently_built_seq_tx, _) = watch::channel(highest_currently_built_seq);
2462
2463        let aggregator = CheckpointAggregator::new(
2464            checkpoint_store.clone(),
2465            epoch_store.clone(),
2466            notify_aggregator.clone(),
2467            certified_checkpoint_output,
2468            state.clone(),
2469            metrics.clone(),
2470        );
2471
2472        let builder = CheckpointBuilder::new(
2473            state.clone(),
2474            checkpoint_store.clone(),
2475            epoch_store.clone(),
2476            notify_builder.clone(),
2477            effects_store,
2478            accumulator,
2479            checkpoint_output,
2480            notify_aggregator.clone(),
2481            highest_currently_built_seq_tx.clone(),
2482            metrics.clone(),
2483            max_transactions_per_checkpoint,
2484            max_checkpoint_size_bytes,
2485        );
2486
2487        let last_signature_index = epoch_store
2488            .get_last_checkpoint_signature_index()
2489            .expect("should not cross end of epoch");
2490        let last_signature_index = Mutex::new(last_signature_index);
2491
2492        Arc::new(Self {
2493            tables: checkpoint_store,
2494            notify_builder,
2495            notify_aggregator,
2496            last_signature_index,
2497            highest_currently_built_seq_tx,
2498            highest_previously_built_seq,
2499            metrics,
2500            state: Mutex::new(CheckpointServiceState::Unstarted(Box::new((
2501                builder, aggregator,
2502            )))),
2503        })
2504    }
2505
2506    /// Starts the CheckpointService.
2507    ///
2508    /// This function blocks until the CheckpointBuilder re-builds all
2509    /// checkpoints that had been built before the most recent restart. You
2510    /// can think of this as a WAL replay operation. Upon startup, we may
2511    /// have a number of consensus commits and resulting checkpoints that
2512    /// were built but not committed to disk. We want to reprocess the
2513    /// commits and rebuild the checkpoints before starting normal operation.
2514    pub async fn spawn(&self) -> JoinSet<()> {
2515        let mut tasks = JoinSet::new();
2516
2517        let (builder, aggregator) = self.state.lock().take_unstarted();
2518        tasks.spawn(monitored_future!(builder.run()));
2519        tasks.spawn(monitored_future!(aggregator.run()));
2520
2521        // If this times out, the validator may still start up. The worst that can
2522        // happen is that we will crash later on instead of immediately. The eventual
2523        // crash would occur because we may be missing transactions that are below the
2524        // highest_synced_checkpoint watermark, which can cause a crash in
2525        // `CheckpointExecutor::extract_randomness_rounds`.
2526        if tokio::time::timeout(
2527            Duration::from_secs(120),
2528            self.wait_for_rebuilt_checkpoints(),
2529        )
2530        .await
2531        .is_err()
2532        {
2533            debug_fatal!("Timed out waiting for checkpoints to be rebuilt");
2534        }
2535
2536        tasks
2537    }
2538}
2539
2540impl CheckpointService {
2541    /// Waits until all checkpoints had been built before the node restarted
2542    /// are rebuilt. This is required to preserve the invariant that all
2543    /// checkpoints (and their transactions) below the
2544    /// highest_synced_checkpoint watermark are available. Once the
2545    /// checkpoints are constructed, we can be sure that the transactions
2546    /// have also been executed.
2547    pub async fn wait_for_rebuilt_checkpoints(&self) {
2548        let highest_previously_built_seq = self.highest_previously_built_seq;
2549        let mut rx = self.highest_currently_built_seq_tx.subscribe();
2550        let mut highest_currently_built_seq = *rx.borrow_and_update();
2551        info!(
2552            "Waiting for checkpoints to be rebuilt, previously built seq: \
2553            {highest_previously_built_seq}, currently built seq: {highest_currently_built_seq}"
2554        );
2555        loop {
2556            if highest_currently_built_seq >= highest_previously_built_seq {
2557                info!("Checkpoint rebuild complete");
2558                break;
2559            }
2560            rx.changed().await.unwrap();
2561            highest_currently_built_seq = *rx.borrow_and_update();
2562        }
2563    }
2564
2565    #[cfg(test)]
2566    fn write_and_notify_checkpoint_for_testing(
2567        &self,
2568        epoch_store: &AuthorityPerEpochStore,
2569        checkpoint: PendingCheckpoint,
2570    ) -> IotaResult {
2571        use crate::authority::authority_per_epoch_store::consensus_quarantine::ConsensusCommitOutput;
2572
2573        let mut output = ConsensusCommitOutput::new();
2574        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2575        output.set_default_commit_stats_for_testing();
2576        epoch_store.push_consensus_output_for_tests(output);
2577        self.notify_checkpoint()?;
2578        Ok(())
2579    }
2580}
2581
2582impl CheckpointServiceNotify for CheckpointService {
2583    fn notify_checkpoint_signature(
2584        &self,
2585        epoch_store: &AuthorityPerEpochStore,
2586        info: &CheckpointSignatureMessage,
2587    ) -> IotaResult {
2588        let sequence = info.summary.sequence_number;
2589        let signer = info.summary.auth_sig().authority.concise();
2590
2591        if let Some(highest_verified_checkpoint) = self
2592            .tables
2593            .get_highest_verified_checkpoint()?
2594            .map(|x| *x.sequence_number())
2595        {
2596            if sequence <= highest_verified_checkpoint {
2597                trace!(
2598                    checkpoint_seq = sequence,
2599                    "Ignore checkpoint signature from {} - already certified", signer,
2600                );
2601                self.metrics
2602                    .last_ignored_checkpoint_signature_received
2603                    .set(sequence as i64);
2604                return Ok(());
2605            }
2606        }
2607        trace!(
2608            checkpoint_seq = sequence,
2609            "Received checkpoint signature, digest {} from {}",
2610            info.summary.digest(),
2611            signer,
2612        );
2613        self.metrics
2614            .last_received_checkpoint_signatures
2615            .with_label_values(&[&signer.to_string()])
2616            .set(sequence as i64);
2617        // While it can be tempting to make last_signature_index into AtomicU64, this
2618        // won't work We need to make sure we write to `pending_signatures` and
2619        // trigger `notify_aggregator` without race conditions
2620        let mut index = self.last_signature_index.lock();
2621        *index += 1;
2622        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2623        self.notify_aggregator.notify_one();
2624        Ok(())
2625    }
2626
2627    fn notify_checkpoint(&self) -> IotaResult {
2628        self.notify_builder.notify_one();
2629        Ok(())
2630    }
2631}
2632
2633// test helper
2634pub struct CheckpointServiceNoop {}
2635impl CheckpointServiceNotify for CheckpointServiceNoop {
2636    fn notify_checkpoint_signature(
2637        &self,
2638        _: &AuthorityPerEpochStore,
2639        _: &CheckpointSignatureMessage,
2640    ) -> IotaResult {
2641        Ok(())
2642    }
2643
2644    fn notify_checkpoint(&self) -> IotaResult {
2645        Ok(())
2646    }
2647}
2648
2649#[cfg(test)]
2650mod tests {
2651    use std::{
2652        collections::{BTreeMap, HashMap},
2653        ops::Deref,
2654    };
2655
2656    use futures::{FutureExt as _, future::BoxFuture};
2657    use iota_macros::sim_test;
2658    use iota_protocol_config::{Chain, ProtocolConfig};
2659    use iota_types::{
2660        base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2661        crypto::Signature,
2662        digests::TransactionEventsDigest,
2663        effects::{TransactionEffects, TransactionEvents},
2664        messages_checkpoint::SignedCheckpointSummary,
2665        move_package::MovePackage,
2666        object,
2667        transaction::{GenesisObject, VerifiedTransaction},
2668    };
2669    use tokio::sync::mpsc;
2670
2671    use super::*;
2672    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2673
2674    #[sim_test]
2675    pub async fn checkpoint_builder_test() {
2676        telemetry_subscribers::init_for_testing();
2677
2678        let mut protocol_config =
2679            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2680        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2681        let state = TestAuthorityBuilder::new()
2682            .with_protocol_config(protocol_config)
2683            .build()
2684            .await;
2685
2686        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2687        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2688            vec![GenesisObject::RawObject {
2689                data: object::Data::Package(
2690                    MovePackage::new(
2691                        ObjectID::random(),
2692                        SequenceNumber::new(),
2693                        BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2694                        100_000,
2695                        // no modules so empty type_origin_table as no types are defined in this
2696                        // package
2697                        Vec::new(),
2698                        // no modules so empty linkage_table as no dependencies of this package
2699                        // exist
2700                        BTreeMap::new(),
2701                    )
2702                    .unwrap(),
2703                ),
2704                owner: object::Owner::Immutable,
2705            }],
2706            vec![],
2707        );
2708        for i in 0..15 {
2709            state
2710                .database_for_testing()
2711                .perpetual_tables
2712                .transactions
2713                .insert(&d(i), dummy_tx.serializable_ref())
2714                .unwrap();
2715        }
2716        for i in 15..20 {
2717            state
2718                .database_for_testing()
2719                .perpetual_tables
2720                .transactions
2721                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2722                .unwrap();
2723        }
2724
2725        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2726        commit_cert_for_test(
2727            &mut store,
2728            state.clone(),
2729            d(1),
2730            vec![d(2), d(3)],
2731            GasCostSummary::new(11, 11, 12, 11, 1),
2732        );
2733        commit_cert_for_test(
2734            &mut store,
2735            state.clone(),
2736            d(2),
2737            vec![d(3), d(4)],
2738            GasCostSummary::new(21, 21, 22, 21, 1),
2739        );
2740        commit_cert_for_test(
2741            &mut store,
2742            state.clone(),
2743            d(3),
2744            vec![],
2745            GasCostSummary::new(31, 31, 32, 31, 1),
2746        );
2747        commit_cert_for_test(
2748            &mut store,
2749            state.clone(),
2750            d(4),
2751            vec![],
2752            GasCostSummary::new(41, 41, 42, 41, 1),
2753        );
2754        for i in [5, 6, 7, 10, 11, 12, 13] {
2755            commit_cert_for_test(
2756                &mut store,
2757                state.clone(),
2758                d(i),
2759                vec![],
2760                GasCostSummary::new(41, 41, 42, 41, 1),
2761            );
2762        }
2763        for i in [15, 16, 17] {
2764            commit_cert_for_test(
2765                &mut store,
2766                state.clone(),
2767                d(i),
2768                vec![],
2769                GasCostSummary::new(51, 51, 52, 51, 1),
2770            );
2771        }
2772        let all_digests: Vec<_> = store.keys().copied().collect();
2773        for digest in all_digests {
2774            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2775            state
2776                .epoch_store_for_testing()
2777                .test_insert_user_signature(digest, vec![signature]);
2778        }
2779
2780        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2781        let (certified_output, mut certified_result) =
2782            mpsc::channel::<CertifiedCheckpointSummary>(10);
2783        let store = Arc::new(store);
2784
2785        let ckpt_dir = tempfile::tempdir().unwrap();
2786        let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2787        let epoch_store = state.epoch_store_for_testing();
2788
2789        let accumulator = Arc::new(StateAccumulator::new_for_tests(
2790            state.get_accumulator_store().clone(),
2791        ));
2792
2793        let checkpoint_service = CheckpointService::build(
2794            state.clone(),
2795            checkpoint_store,
2796            epoch_store.clone(),
2797            store,
2798            Arc::downgrade(&accumulator),
2799            Box::new(output),
2800            Box::new(certified_output),
2801            CheckpointMetrics::new_for_tests(),
2802            3,
2803            100_000,
2804        );
2805        let _tasks = checkpoint_service.spawn().await;
2806
2807        checkpoint_service
2808            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2809            .unwrap();
2810        checkpoint_service
2811            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2812            .unwrap();
2813        checkpoint_service
2814            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2815            .unwrap();
2816        checkpoint_service
2817            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2818            .unwrap();
2819        checkpoint_service
2820            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2821            .unwrap();
2822        checkpoint_service
2823            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2824            .unwrap();
2825
2826        let (c1c, c1s) = result.recv().await.unwrap();
2827        let (c2c, c2s) = result.recv().await.unwrap();
2828
2829        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2830        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2831        assert_eq!(c1t, vec![d(4)]);
2832        assert_eq!(c1s.previous_digest, None);
2833        assert_eq!(c1s.sequence_number, 0);
2834        assert_eq!(
2835            c1s.epoch_rolling_gas_cost_summary,
2836            GasCostSummary::new(41, 41, 42, 41, 1)
2837        );
2838
2839        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2840        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2841        assert_eq!(c2s.sequence_number, 1);
2842        assert_eq!(
2843            c2s.epoch_rolling_gas_cost_summary,
2844            GasCostSummary::new(104, 104, 108, 104, 4)
2845        );
2846
2847        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2848        // Verify that we split into 2 checkpoints.
2849        let (c3c, c3s) = result.recv().await.unwrap();
2850        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2851        let (c4c, c4s) = result.recv().await.unwrap();
2852        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2853        assert_eq!(c3s.sequence_number, 2);
2854        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2855        assert_eq!(c4s.sequence_number, 3);
2856        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2857        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2858        assert_eq!(c4t, vec![d(13)]);
2859
2860        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2861        // max. Verify that we split into 2 checkpoints.
2862        let (c5c, c5s) = result.recv().await.unwrap();
2863        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2864        let (c6c, c6s) = result.recv().await.unwrap();
2865        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2866        assert_eq!(c5s.sequence_number, 4);
2867        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2868        assert_eq!(c6s.sequence_number, 5);
2869        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2870        assert_eq!(c5t, vec![d(15), d(16)]);
2871        assert_eq!(c6t, vec![d(17)]);
2872
2873        // Pending at index 4 was too soon after the prior one and should be coalesced
2874        // into the next one.
2875        let (c7c, c7s) = result.recv().await.unwrap();
2876        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2877        assert_eq!(c7t, vec![d(5), d(6)]);
2878        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2879        assert_eq!(c7s.sequence_number, 6);
2880
2881        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2882        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2883
2884        checkpoint_service
2885            .notify_checkpoint_signature(
2886                &epoch_store,
2887                &CheckpointSignatureMessage { summary: c2ss },
2888            )
2889            .unwrap();
2890        checkpoint_service
2891            .notify_checkpoint_signature(
2892                &epoch_store,
2893                &CheckpointSignatureMessage { summary: c1ss },
2894            )
2895            .unwrap();
2896
2897        let c1sc = certified_result.recv().await.unwrap();
2898        let c2sc = certified_result.recv().await.unwrap();
2899        assert_eq!(c1sc.sequence_number, 0);
2900        assert_eq!(c2sc.sequence_number, 1);
2901    }
2902
2903    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2904        fn try_notify_read_executed_effects(
2905            &self,
2906            digests: &[TransactionDigest],
2907        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2908            std::future::ready(Ok(digests
2909                .iter()
2910                .map(|d| self.get(d).expect("effects not found").clone())
2911                .collect()))
2912            .boxed()
2913        }
2914
2915        fn try_notify_read_executed_effects_digests(
2916            &self,
2917            digests: &[TransactionDigest],
2918        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2919            std::future::ready(Ok(digests
2920                .iter()
2921                .map(|d| {
2922                    self.get(d)
2923                        .map(|fx| fx.digest())
2924                        .expect("effects not found")
2925                })
2926                .collect()))
2927            .boxed()
2928        }
2929
2930        fn try_multi_get_executed_effects(
2931            &self,
2932            digests: &[TransactionDigest],
2933        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2934            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2935        }
2936
2937        // Unimplemented methods - its unfortunate to have this big blob of useless
2938        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2939        // for these tests, as it caused a ton of complication in non-test code.
2940        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2941        // implementors).
2942
2943        fn try_multi_get_transaction_blocks(
2944            &self,
2945            _: &[TransactionDigest],
2946        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2947            unimplemented!()
2948        }
2949
2950        fn try_multi_get_executed_effects_digests(
2951            &self,
2952            _: &[TransactionDigest],
2953        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2954            unimplemented!()
2955        }
2956
2957        fn try_multi_get_effects(
2958            &self,
2959            _: &[TransactionEffectsDigest],
2960        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2961            unimplemented!()
2962        }
2963
2964        fn try_multi_get_events(
2965            &self,
2966            _: &[TransactionEventsDigest],
2967        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2968            unimplemented!()
2969        }
2970    }
2971
2972    #[async_trait::async_trait]
2973    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2974        async fn checkpoint_created(
2975            &self,
2976            summary: &CheckpointSummary,
2977            contents: &CheckpointContents,
2978            _epoch_store: &Arc<AuthorityPerEpochStore>,
2979            _checkpoint_store: &Arc<CheckpointStore>,
2980        ) -> IotaResult {
2981            self.try_send((contents.clone(), summary.clone())).unwrap();
2982            Ok(())
2983        }
2984    }
2985
2986    #[async_trait::async_trait]
2987    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2988        async fn certified_checkpoint_created(
2989            &self,
2990            summary: &CertifiedCheckpointSummary,
2991        ) -> IotaResult {
2992            self.try_send(summary.clone()).unwrap();
2993            Ok(())
2994        }
2995    }
2996
2997    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2998        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2999            roots: t
3000                .into_iter()
3001                .map(|t| TransactionKey::Digest(d(t)))
3002                .collect(),
3003            details: PendingCheckpointInfo {
3004                timestamp_ms,
3005                last_of_epoch: false,
3006                checkpoint_height: i,
3007            },
3008        })
3009    }
3010
3011    fn d(i: u8) -> TransactionDigest {
3012        let mut bytes: [u8; 32] = Default::default();
3013        bytes[0] = i;
3014        TransactionDigest::new(bytes)
3015    }
3016
3017    fn e(
3018        transaction_digest: TransactionDigest,
3019        dependencies: Vec<TransactionDigest>,
3020        gas_used: GasCostSummary,
3021    ) -> TransactionEffects {
3022        let mut effects = TransactionEffects::default();
3023        *effects.transaction_digest_mut_for_testing() = transaction_digest;
3024        *effects.dependencies_mut_for_testing() = dependencies;
3025        *effects.gas_cost_summary_mut_for_testing() = gas_used;
3026        effects
3027    }
3028
3029    fn commit_cert_for_test(
3030        store: &mut HashMap<TransactionDigest, TransactionEffects>,
3031        state: Arc<AuthorityState>,
3032        digest: TransactionDigest,
3033        dependencies: Vec<TransactionDigest>,
3034        gas_used: GasCostSummary,
3035    ) {
3036        let epoch_store = state.epoch_store_for_testing();
3037        let effects = e(digest, dependencies, gas_used);
3038        store.insert(digest, effects.clone());
3039        epoch_store
3040            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
3041            .expect("Inserting cert fx and sigs should not fail");
3042    }
3043}