iota_core/checkpoints/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod causal_order;
6pub mod checkpoint_executor;
7mod checkpoint_output;
8mod metrics;
9
10use std::{
11    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
12    fs::File,
13    io::Write,
14    path::Path,
15    sync::{Arc, Weak},
16    time::Duration,
17};
18
19use chrono::Utc;
20use diffy::create_patch;
21use iota_macros::fail_point;
22use iota_metrics::{MonitoredFutureExt, monitored_future, monitored_scope};
23use iota_network::default_iota_network_config;
24use iota_protocol_config::ProtocolVersion;
25use iota_types::{
26    base_types::{AuthorityName, ConciseableName, EpochId, TransactionDigest},
27    committee::StakeUnit,
28    crypto::AuthorityStrongQuorumSignInfo,
29    digests::{CheckpointContentsDigest, CheckpointDigest},
30    effects::{TransactionEffects, TransactionEffectsAPI},
31    error::{IotaError, IotaResult},
32    event::SystemEpochInfoEvent,
33    executable_transaction::VerifiedExecutableTransaction,
34    gas::GasCostSummary,
35    iota_system_state::{
36        IotaSystemState, IotaSystemStateTrait,
37        epoch_start_iota_system_state::EpochStartSystemStateTrait,
38    },
39    message_envelope::Message,
40    messages_checkpoint::{
41        CertifiedCheckpointSummary, CheckpointCommitment, CheckpointContents, CheckpointRequest,
42        CheckpointResponse, CheckpointSequenceNumber, CheckpointSignatureMessage,
43        CheckpointSummary, CheckpointSummaryResponse, CheckpointTimestamp, EndOfEpochData,
44        FullCheckpointContents, SignedCheckpointSummary, TrustedCheckpoint, VerifiedCheckpoint,
45        VerifiedCheckpointContents,
46    },
47    messages_consensus::ConsensusTransactionKey,
48    signature::GenericSignature,
49    transaction::{TransactionDataAPI, TransactionKey, TransactionKind},
50};
51use itertools::Itertools;
52use parking_lot::Mutex;
53use rand::{rngs::OsRng, seq::SliceRandom};
54use serde::{Deserialize, Serialize};
55use tokio::{sync::Notify, task::JoinSet, time::timeout};
56use tracing::{debug, error, info, instrument, warn};
57use typed_store::{
58    DBMapUtils, Map, TypedStoreError,
59    rocks::{DBMap, MetricConf},
60    traits::{TableSummary, TypedStoreDebug},
61};
62
63pub use crate::checkpoints::{
64    checkpoint_output::{
65        LogCheckpointOutput, SendCheckpointToStateSync, SubmitCheckpointToConsensus,
66    },
67    metrics::CheckpointMetrics,
68};
69use crate::{
70    authority::{AuthorityState, authority_per_epoch_store::AuthorityPerEpochStore},
71    authority_client::{AuthorityAPI, make_network_authority_clients_with_network_config},
72    checkpoints::{
73        causal_order::CausalOrder,
74        checkpoint_output::{CertifiedCheckpointOutput, CheckpointOutput},
75    },
76    consensus_handler::SequencedConsensusTransactionKey,
77    execution_cache::TransactionCacheRead,
78    stake_aggregator::{InsertResult, MultiStakeAggregator},
79    state_accumulator::StateAccumulator,
80};
81
82pub type CheckpointHeight = u64;
83
84pub struct EpochStats {
85    pub checkpoint_count: u64,
86    pub transaction_count: u64,
87    pub total_gas_reward: u64,
88}
89
90#[derive(Clone, Debug, Serialize, Deserialize)]
91pub struct PendingCheckpointInfo {
92    pub timestamp_ms: CheckpointTimestamp,
93    pub last_of_epoch: bool,
94    pub checkpoint_height: CheckpointHeight,
95}
96
97#[derive(Clone, Debug, Serialize, Deserialize)]
98pub enum PendingCheckpoint {
99    // This is an enum for future upgradability, though at the moment there is only one variant.
100    V1(PendingCheckpointContentsV1),
101}
102
103#[derive(Clone, Debug, Serialize, Deserialize)]
104pub struct PendingCheckpointContentsV1 {
105    pub roots: Vec<TransactionKey>,
106    pub details: PendingCheckpointInfo,
107}
108
109impl PendingCheckpoint {
110    pub fn as_v1(&self) -> &PendingCheckpointContentsV1 {
111        match self {
112            PendingCheckpoint::V1(contents) => contents,
113        }
114    }
115
116    pub fn into_v1(self) -> PendingCheckpointContentsV1 {
117        match self {
118            PendingCheckpoint::V1(contents) => contents,
119        }
120    }
121
122    pub fn roots(&self) -> &Vec<TransactionKey> {
123        &self.as_v1().roots
124    }
125
126    pub fn details(&self) -> &PendingCheckpointInfo {
127        &self.as_v1().details
128    }
129
130    pub fn height(&self) -> CheckpointHeight {
131        self.details().checkpoint_height
132    }
133}
134
135#[derive(Clone, Debug, Serialize, Deserialize)]
136pub struct BuilderCheckpointSummary {
137    pub summary: CheckpointSummary,
138    // Height at which this checkpoint summary was built. None for genesis checkpoint
139    pub checkpoint_height: Option<CheckpointHeight>,
140    pub position_in_commit: usize,
141}
142
143#[derive(DBMapUtils)]
144pub struct CheckpointStore {
145    /// Maps checkpoint contents digest to checkpoint contents
146    pub(crate) checkpoint_content: DBMap<CheckpointContentsDigest, CheckpointContents>,
147
148    /// Maps checkpoint contents digest to checkpoint sequence number
149    pub(crate) checkpoint_sequence_by_contents_digest:
150        DBMap<CheckpointContentsDigest, CheckpointSequenceNumber>,
151
152    /// Stores entire checkpoint contents from state sync, indexed by sequence
153    /// number, for efficient reads of full checkpoints. Entries from this
154    /// table are deleted after state accumulation has completed.
155    full_checkpoint_content: DBMap<CheckpointSequenceNumber, FullCheckpointContents>,
156
157    /// Stores certified checkpoints
158    pub(crate) certified_checkpoints: DBMap<CheckpointSequenceNumber, TrustedCheckpoint>,
159    /// Map from checkpoint digest to certified checkpoint
160    pub(crate) checkpoint_by_digest: DBMap<CheckpointDigest, TrustedCheckpoint>,
161
162    /// Store locally computed checkpoint summaries so that we can detect forks
163    /// and log useful information. Can be pruned as soon as we verify that
164    /// we are in agreement with the latest certified checkpoint.
165    pub(crate) locally_computed_checkpoints: DBMap<CheckpointSequenceNumber, CheckpointSummary>,
166
167    /// A map from epoch ID to the sequence number of the last checkpoint in
168    /// that epoch.
169    epoch_last_checkpoint_map: DBMap<EpochId, CheckpointSequenceNumber>,
170
171    /// Watermarks used to determine the highest verified, fully synced, and
172    /// fully executed checkpoints
173    pub(crate) watermarks: DBMap<CheckpointWatermark, (CheckpointSequenceNumber, CheckpointDigest)>,
174}
175
176impl CheckpointStore {
177    pub fn new(path: &Path) -> Arc<Self> {
178        Arc::new(Self::open_tables_read_write(
179            path.to_path_buf(),
180            MetricConf::new("checkpoint"),
181            None,
182            None,
183        ))
184    }
185
186    pub fn open_readonly(path: &Path) -> CheckpointStoreReadOnly {
187        Self::get_read_only_handle(
188            path.to_path_buf(),
189            None,
190            None,
191            MetricConf::new("checkpoint_readonly"),
192        )
193    }
194
195    #[instrument(level = "info", skip_all)]
196    pub fn insert_genesis_checkpoint(
197        &self,
198        checkpoint: VerifiedCheckpoint,
199        contents: CheckpointContents,
200        epoch_store: &AuthorityPerEpochStore,
201    ) {
202        assert_eq!(
203            checkpoint.epoch(),
204            0,
205            "can't call insert_genesis_checkpoint with a checkpoint not in epoch 0"
206        );
207        assert_eq!(
208            *checkpoint.sequence_number(),
209            0,
210            "can't call insert_genesis_checkpoint with a checkpoint that doesn't have a sequence number of 0"
211        );
212
213        // Only insert the genesis checkpoint if the DB is empty and doesn't have it
214        // already
215        if self
216            .get_checkpoint_by_digest(checkpoint.digest())
217            .unwrap()
218            .is_none()
219        {
220            if epoch_store.epoch() == checkpoint.epoch {
221                epoch_store
222                    .put_genesis_checkpoint_in_builder(checkpoint.data(), &contents)
223                    .unwrap();
224            } else {
225                debug!(
226                    validator_epoch =% epoch_store.epoch(),
227                    genesis_epoch =% checkpoint.epoch(),
228                    "Not inserting checkpoint builder data for genesis checkpoint",
229                );
230            }
231            self.insert_checkpoint_contents(contents).unwrap();
232            self.insert_verified_checkpoint(&checkpoint).unwrap();
233            self.update_highest_synced_checkpoint(&checkpoint).unwrap();
234        }
235    }
236
237    pub fn get_checkpoint_by_digest(
238        &self,
239        digest: &CheckpointDigest,
240    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
241        self.checkpoint_by_digest
242            .get(digest)
243            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
244    }
245
246    pub fn get_checkpoint_by_sequence_number(
247        &self,
248        sequence_number: CheckpointSequenceNumber,
249    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
250        self.certified_checkpoints
251            .get(&sequence_number)
252            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
253    }
254
255    pub fn get_locally_computed_checkpoint(
256        &self,
257        sequence_number: CheckpointSequenceNumber,
258    ) -> Result<Option<CheckpointSummary>, TypedStoreError> {
259        self.locally_computed_checkpoints.get(&sequence_number)
260    }
261
262    pub fn get_sequence_number_by_contents_digest(
263        &self,
264        digest: &CheckpointContentsDigest,
265    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
266        self.checkpoint_sequence_by_contents_digest.get(digest)
267    }
268
269    pub fn delete_contents_digest_sequence_number_mapping(
270        &self,
271        digest: &CheckpointContentsDigest,
272    ) -> Result<(), TypedStoreError> {
273        self.checkpoint_sequence_by_contents_digest.remove(digest)
274    }
275
276    pub fn get_latest_certified_checkpoint(&self) -> Option<VerifiedCheckpoint> {
277        self.certified_checkpoints
278            .unbounded_iter()
279            .skip_to_last()
280            .next()
281            .map(|(_, v)| v.into())
282    }
283
284    pub fn get_latest_locally_computed_checkpoint(&self) -> Option<CheckpointSummary> {
285        self.locally_computed_checkpoints
286            .unbounded_iter()
287            .skip_to_last()
288            .next()
289            .map(|(_, v)| v)
290    }
291
292    pub fn multi_get_checkpoint_by_sequence_number(
293        &self,
294        sequence_numbers: &[CheckpointSequenceNumber],
295    ) -> Result<Vec<Option<VerifiedCheckpoint>>, TypedStoreError> {
296        let checkpoints = self
297            .certified_checkpoints
298            .multi_get(sequence_numbers)?
299            .into_iter()
300            .map(|maybe_checkpoint| maybe_checkpoint.map(|c| c.into()))
301            .collect();
302
303        Ok(checkpoints)
304    }
305
306    pub fn multi_get_checkpoint_content(
307        &self,
308        contents_digest: &[CheckpointContentsDigest],
309    ) -> Result<Vec<Option<CheckpointContents>>, TypedStoreError> {
310        self.checkpoint_content.multi_get(contents_digest)
311    }
312
313    pub fn get_highest_verified_checkpoint(
314        &self,
315    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
316        let highest_verified = if let Some(highest_verified) =
317            self.watermarks.get(&CheckpointWatermark::HighestVerified)?
318        {
319            highest_verified
320        } else {
321            return Ok(None);
322        };
323        self.get_checkpoint_by_digest(&highest_verified.1)
324    }
325
326    pub fn get_highest_synced_checkpoint(
327        &self,
328    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
329        let highest_synced = if let Some(highest_synced) =
330            self.watermarks.get(&CheckpointWatermark::HighestSynced)?
331        {
332            highest_synced
333        } else {
334            return Ok(None);
335        };
336        self.get_checkpoint_by_digest(&highest_synced.1)
337    }
338
339    pub fn get_highest_executed_checkpoint_seq_number(
340        &self,
341    ) -> Result<Option<CheckpointSequenceNumber>, TypedStoreError> {
342        if let Some(highest_executed) =
343            self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
344        {
345            Ok(Some(highest_executed.0))
346        } else {
347            Ok(None)
348        }
349    }
350
351    pub fn get_highest_executed_checkpoint(
352        &self,
353    ) -> Result<Option<VerifiedCheckpoint>, TypedStoreError> {
354        let highest_executed = if let Some(highest_executed) =
355            self.watermarks.get(&CheckpointWatermark::HighestExecuted)?
356        {
357            highest_executed
358        } else {
359            return Ok(None);
360        };
361        self.get_checkpoint_by_digest(&highest_executed.1)
362    }
363
364    pub fn get_highest_pruned_checkpoint_seq_number(
365        &self,
366    ) -> Result<CheckpointSequenceNumber, TypedStoreError> {
367        Ok(self
368            .watermarks
369            .get(&CheckpointWatermark::HighestPruned)?
370            .unwrap_or_default()
371            .0)
372    }
373
374    pub fn get_checkpoint_contents(
375        &self,
376        digest: &CheckpointContentsDigest,
377    ) -> Result<Option<CheckpointContents>, TypedStoreError> {
378        self.checkpoint_content.get(digest)
379    }
380
381    pub fn get_full_checkpoint_contents_by_sequence_number(
382        &self,
383        seq: CheckpointSequenceNumber,
384    ) -> Result<Option<FullCheckpointContents>, TypedStoreError> {
385        self.full_checkpoint_content.get(&seq)
386    }
387
388    fn prune_local_summaries(&self) -> IotaResult {
389        if let Some((last_local_summary, _)) = self
390            .locally_computed_checkpoints
391            .unbounded_iter()
392            .skip_to_last()
393            .next()
394        {
395            let mut batch = self.locally_computed_checkpoints.batch();
396            batch.schedule_delete_range(
397                &self.locally_computed_checkpoints,
398                &0,
399                &last_local_summary,
400            )?;
401            batch.write()?;
402            info!("Pruned local summaries up to {:?}", last_local_summary);
403        }
404        Ok(())
405    }
406
407    fn check_for_checkpoint_fork(
408        &self,
409        local_checkpoint: &CheckpointSummary,
410        verified_checkpoint: &VerifiedCheckpoint,
411    ) {
412        if local_checkpoint != verified_checkpoint.data() {
413            let verified_contents = self
414                .get_checkpoint_contents(&verified_checkpoint.content_digest)
415                .map(|opt_contents| {
416                    opt_contents
417                        .map(|contents| format!("{:?}", contents))
418                        .unwrap_or_else(|| {
419                            format!(
420                                "Verified checkpoint contents not found, digest: {:?}",
421                                verified_checkpoint.content_digest,
422                            )
423                        })
424                })
425                .map_err(|e| {
426                    format!(
427                        "Failed to get verified checkpoint contents, digest: {:?} error: {:?}",
428                        verified_checkpoint.content_digest, e
429                    )
430                })
431                .unwrap_or_else(|err_msg| err_msg);
432
433            let local_contents = self
434                .get_checkpoint_contents(&local_checkpoint.content_digest)
435                .map(|opt_contents| {
436                    opt_contents
437                        .map(|contents| format!("{:?}", contents))
438                        .unwrap_or_else(|| {
439                            format!(
440                                "Local checkpoint contents not found, digest: {:?}",
441                                local_checkpoint.content_digest
442                            )
443                        })
444                })
445                .map_err(|e| {
446                    format!(
447                        "Failed to get local checkpoint contents, digest: {:?} error: {:?}",
448                        local_checkpoint.content_digest, e
449                    )
450                })
451                .unwrap_or_else(|err_msg| err_msg);
452
453            // checkpoint contents may be too large for panic message.
454            error!(
455                verified_checkpoint = ?verified_checkpoint.data(),
456                ?verified_contents,
457                ?local_checkpoint,
458                ?local_contents,
459                "Local checkpoint fork detected!",
460            );
461            panic!(
462                "Local checkpoint fork detected for sequence number: {}",
463                local_checkpoint.sequence_number()
464            );
465        }
466    }
467
468    // Called by consensus (ConsensusAggregator).
469    // Different from `insert_verified_checkpoint`, it does not touch
470    // the highest_verified_checkpoint watermark such that state sync
471    // will have a chance to process this checkpoint and perform some
472    // state-sync only things.
473    pub fn insert_certified_checkpoint(
474        &self,
475        checkpoint: &VerifiedCheckpoint,
476    ) -> Result<(), TypedStoreError> {
477        debug!(
478            checkpoint_seq = checkpoint.sequence_number(),
479            "Inserting certified checkpoint",
480        );
481        let mut batch = self.certified_checkpoints.batch();
482        batch
483            .insert_batch(
484                &self.certified_checkpoints,
485                [(checkpoint.sequence_number(), checkpoint.serializable_ref())],
486            )?
487            .insert_batch(
488                &self.checkpoint_by_digest,
489                [(checkpoint.digest(), checkpoint.serializable_ref())],
490            )?;
491        if checkpoint.next_epoch_committee().is_some() {
492            batch.insert_batch(
493                &self.epoch_last_checkpoint_map,
494                [(&checkpoint.epoch(), checkpoint.sequence_number())],
495            )?;
496        }
497        batch.write()?;
498
499        if let Some(local_checkpoint) = self
500            .locally_computed_checkpoints
501            .get(checkpoint.sequence_number())?
502        {
503            self.check_for_checkpoint_fork(&local_checkpoint, checkpoint);
504        }
505
506        Ok(())
507    }
508
509    // Called by state sync, apart from inserting the checkpoint and updating
510    // related tables, it also bumps the highest_verified_checkpoint watermark.
511    #[instrument(level = "debug", skip_all)]
512    pub fn insert_verified_checkpoint(
513        &self,
514        checkpoint: &VerifiedCheckpoint,
515    ) -> Result<(), TypedStoreError> {
516        self.insert_certified_checkpoint(checkpoint)?;
517        self.update_highest_verified_checkpoint(checkpoint)
518    }
519
520    pub fn update_highest_verified_checkpoint(
521        &self,
522        checkpoint: &VerifiedCheckpoint,
523    ) -> Result<(), TypedStoreError> {
524        if Some(*checkpoint.sequence_number())
525            > self
526                .get_highest_verified_checkpoint()?
527                .map(|x| *x.sequence_number())
528        {
529            debug!(
530                checkpoint_seq = checkpoint.sequence_number(),
531                "Updating highest verified checkpoint",
532            );
533            self.watermarks.insert(
534                &CheckpointWatermark::HighestVerified,
535                &(*checkpoint.sequence_number(), *checkpoint.digest()),
536            )?;
537        }
538
539        Ok(())
540    }
541
542    pub fn update_highest_synced_checkpoint(
543        &self,
544        checkpoint: &VerifiedCheckpoint,
545    ) -> Result<(), TypedStoreError> {
546        debug!(
547            checkpoint_seq = checkpoint.sequence_number(),
548            "Updating highest synced checkpoint",
549        );
550        self.watermarks.insert(
551            &CheckpointWatermark::HighestSynced,
552            &(*checkpoint.sequence_number(), *checkpoint.digest()),
553        )
554    }
555
556    pub fn update_highest_executed_checkpoint(
557        &self,
558        checkpoint: &VerifiedCheckpoint,
559    ) -> Result<(), TypedStoreError> {
560        if let Some(seq_number) = self.get_highest_executed_checkpoint_seq_number()? {
561            if seq_number >= *checkpoint.sequence_number() {
562                return Ok(());
563            }
564            assert_eq!(
565                seq_number + 1,
566                *checkpoint.sequence_number(),
567                "Cannot update highest executed checkpoint to {} when current highest executed checkpoint is {}",
568                checkpoint.sequence_number(),
569                seq_number
570            );
571        }
572        debug!(
573            checkpoint_seq = checkpoint.sequence_number(),
574            "Updating highest executed checkpoint",
575        );
576        self.watermarks.insert(
577            &CheckpointWatermark::HighestExecuted,
578            &(*checkpoint.sequence_number(), *checkpoint.digest()),
579        )
580    }
581
582    pub fn update_highest_pruned_checkpoint(
583        &self,
584        checkpoint: &VerifiedCheckpoint,
585    ) -> Result<(), TypedStoreError> {
586        self.watermarks.insert(
587            &CheckpointWatermark::HighestPruned,
588            &(*checkpoint.sequence_number(), *checkpoint.digest()),
589        )
590    }
591
592    /// Sets highest executed checkpoint to any value.
593    ///
594    /// WARNING: This method is very subtle and can corrupt the database if used
595    /// incorrectly. It should only be used in one-off cases or tests after
596    /// fully understanding the risk.
597    pub fn set_highest_executed_checkpoint_subtle(
598        &self,
599        checkpoint: &VerifiedCheckpoint,
600    ) -> Result<(), TypedStoreError> {
601        self.watermarks.insert(
602            &CheckpointWatermark::HighestExecuted,
603            &(*checkpoint.sequence_number(), *checkpoint.digest()),
604        )
605    }
606
607    pub fn insert_checkpoint_contents(
608        &self,
609        contents: CheckpointContents,
610    ) -> Result<(), TypedStoreError> {
611        debug!(
612            checkpoint_seq = ?contents.digest(),
613            "Inserting checkpoint contents",
614        );
615        self.checkpoint_content.insert(contents.digest(), &contents)
616    }
617
618    pub fn insert_verified_checkpoint_contents(
619        &self,
620        checkpoint: &VerifiedCheckpoint,
621        full_contents: VerifiedCheckpointContents,
622    ) -> Result<(), TypedStoreError> {
623        let mut batch = self.full_checkpoint_content.batch();
624        batch.insert_batch(
625            &self.checkpoint_sequence_by_contents_digest,
626            [(&checkpoint.content_digest, checkpoint.sequence_number())],
627        )?;
628        let full_contents = full_contents.into_inner();
629        batch.insert_batch(
630            &self.full_checkpoint_content,
631            [(checkpoint.sequence_number(), &full_contents)],
632        )?;
633
634        let contents = full_contents.into_checkpoint_contents();
635        assert_eq!(&checkpoint.content_digest, contents.digest());
636
637        batch.insert_batch(&self.checkpoint_content, [(contents.digest(), &contents)])?;
638
639        batch.write()
640    }
641
642    pub fn delete_full_checkpoint_contents(
643        &self,
644        seq: CheckpointSequenceNumber,
645    ) -> Result<(), TypedStoreError> {
646        self.full_checkpoint_content.remove(&seq)
647    }
648
649    pub fn get_epoch_last_checkpoint(
650        &self,
651        epoch_id: EpochId,
652    ) -> IotaResult<Option<VerifiedCheckpoint>> {
653        let seq = self.epoch_last_checkpoint_map.get(&epoch_id)?;
654        let checkpoint = match seq {
655            Some(seq) => self.get_checkpoint_by_sequence_number(seq)?,
656            None => None,
657        };
658        Ok(checkpoint)
659    }
660
661    pub fn insert_epoch_last_checkpoint(
662        &self,
663        epoch_id: EpochId,
664        checkpoint: &VerifiedCheckpoint,
665    ) -> IotaResult {
666        self.epoch_last_checkpoint_map
667            .insert(&epoch_id, checkpoint.sequence_number())?;
668        Ok(())
669    }
670
671    pub fn get_epoch_state_commitments(
672        &self,
673        epoch: EpochId,
674    ) -> IotaResult<Option<Vec<CheckpointCommitment>>> {
675        let commitments = self.get_epoch_last_checkpoint(epoch)?.map(|checkpoint| {
676            checkpoint
677                .end_of_epoch_data
678                .as_ref()
679                .expect("Last checkpoint of epoch expected to have EndOfEpochData")
680                .epoch_commitments
681                .clone()
682        });
683        Ok(commitments)
684    }
685
686    /// Given the epoch ID, and the last checkpoint of the epoch, derive a few
687    /// statistics of the epoch.
688    pub fn get_epoch_stats(
689        &self,
690        epoch: EpochId,
691        last_checkpoint: &CheckpointSummary,
692    ) -> Option<EpochStats> {
693        let (first_checkpoint, prev_epoch_network_transactions) = if epoch == 0 {
694            (0, 0)
695        } else if let Ok(Some(checkpoint)) = self.get_epoch_last_checkpoint(epoch - 1) {
696            (
697                checkpoint.sequence_number + 1,
698                checkpoint.network_total_transactions,
699            )
700        } else {
701            return None;
702        };
703        Some(EpochStats {
704            checkpoint_count: last_checkpoint.sequence_number - first_checkpoint + 1,
705            transaction_count: last_checkpoint.network_total_transactions
706                - prev_epoch_network_transactions,
707            total_gas_reward: last_checkpoint
708                .epoch_rolling_gas_cost_summary
709                .computation_cost,
710        })
711    }
712
713    pub fn checkpoint_db(&self, path: &Path) -> IotaResult {
714        // This checkpoints the entire db and not one column family
715        self.checkpoint_content
716            .checkpoint_db(path)
717            .map_err(Into::into)
718    }
719
720    pub fn delete_highest_executed_checkpoint_test_only(&self) -> Result<(), TypedStoreError> {
721        let mut wb = self.watermarks.batch();
722        wb.delete_batch(
723            &self.watermarks,
724            std::iter::once(CheckpointWatermark::HighestExecuted),
725        )?;
726        wb.write()?;
727        Ok(())
728    }
729
730    pub fn reset_db_for_execution_since_genesis(&self) -> IotaResult {
731        self.delete_highest_executed_checkpoint_test_only()?;
732        self.watermarks.rocksdb.flush()?;
733        Ok(())
734    }
735
736    /// Re-executes all transactions from all local, uncertified checkpoints for
737    /// crash recovery. All transactions thus re-executed are guaranteed to
738    /// not have any missing dependencies, because we start from the highest
739    /// executed checkpoint, and proceed through checkpoints in order.
740    #[instrument(level = "debug", skip_all)]
741    pub async fn reexecute_local_checkpoints(
742        &self,
743        state: &AuthorityState,
744        epoch_store: &AuthorityPerEpochStore,
745    ) {
746        info!("rexecuting locally computed checkpoints for crash recovery");
747        let epoch = epoch_store.epoch();
748        let highest_executed = self
749            .get_highest_executed_checkpoint_seq_number()
750            .expect("get_highest_executed_checkpoint_seq_number should not fail")
751            .unwrap_or(0);
752
753        let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else {
754            info!("no locally built checkpoints to verify");
755            return;
756        };
757
758        for seq in highest_executed + 1..=*highest_built.sequence_number() {
759            info!(?seq, "Re-executing locally computed checkpoint");
760            let Some(checkpoint) = self
761                .get_locally_computed_checkpoint(seq)
762                .expect("get_locally_computed_checkpoint should not fail")
763            else {
764                panic!("locally computed checkpoint {:?} not found", seq);
765            };
766
767            let Some(contents) = self
768                .get_checkpoint_contents(&checkpoint.content_digest)
769                .expect("get_checkpoint_contents should not fail")
770            else {
771                panic!(
772                    "checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})",
773                    seq, checkpoint.content_digest
774                );
775            };
776
777            let cache = state.get_transaction_cache_reader();
778
779            let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect();
780            let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect();
781            let txns = cache
782                .multi_get_transaction_blocks(&tx_digests)
783                .expect("multi_get_transaction_blocks should not fail");
784            for (tx, digest) in txns.iter().zip(tx_digests.iter()) {
785                if tx.is_none() {
786                    panic!("transaction {:?} not found", digest);
787                }
788            }
789
790            let txns: Vec<_> = txns
791                .into_iter()
792                .map(|tx| tx.unwrap())
793                .zip(fx_digests.into_iter())
794                // end of epoch transaction can only be executed by CheckpointExecutor
795                .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx())
796                .map(|(tx, fx)| {
797                    (
798                        VerifiedExecutableTransaction::new_from_checkpoint(
799                            (*tx).clone(),
800                            epoch,
801                            seq,
802                        ),
803                        fx,
804                    )
805                })
806                .collect();
807
808            let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect();
809
810            info!(
811                ?seq,
812                ?tx_digests,
813                "Re-executing transactions for locally built checkpoint"
814            );
815            // this will panic if any re-execution diverges from the previously recorded
816            // effects digest
817            state.enqueue_with_expected_effects_digest(txns, epoch_store);
818
819            // a task that logs every so often until it is cancelled
820            // This should normally finish very quickly, so seeing this log more than once
821            // or twice is likely a sign of a problem.
822            let waiting_logger = tokio::task::spawn(async move {
823                let mut interval = tokio::time::interval(Duration::from_secs(1));
824                loop {
825                    interval.tick().await;
826                    warn!(?seq, "Still waiting for re-execution to complete");
827                }
828            });
829
830            cache
831                .notify_read_executed_effects_digests(&tx_digests)
832                .await
833                .expect("notify_read_executed_effects_digests should not fail");
834
835            waiting_logger.abort();
836            waiting_logger.await.ok();
837            info!(?seq, "Re-execution completed for locally built checkpoint");
838        }
839
840        info!("Re-execution of locally built checkpoints completed");
841    }
842}
843
844#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
845pub enum CheckpointWatermark {
846    HighestVerified,
847    HighestSynced,
848    HighestExecuted,
849    HighestPruned,
850}
851
852pub struct CheckpointBuilder {
853    state: Arc<AuthorityState>,
854    tables: Arc<CheckpointStore>,
855    epoch_store: Arc<AuthorityPerEpochStore>,
856    notify: Arc<Notify>,
857    notify_aggregator: Arc<Notify>,
858    effects_store: Arc<dyn TransactionCacheRead>,
859    accumulator: Weak<StateAccumulator>,
860    output: Box<dyn CheckpointOutput>,
861    metrics: Arc<CheckpointMetrics>,
862    max_transactions_per_checkpoint: usize,
863    max_checkpoint_size_bytes: usize,
864}
865
866pub struct CheckpointAggregator {
867    tables: Arc<CheckpointStore>,
868    epoch_store: Arc<AuthorityPerEpochStore>,
869    notify: Arc<Notify>,
870    current: Option<CheckpointSignatureAggregator>,
871    output: Box<dyn CertifiedCheckpointOutput>,
872    state: Arc<AuthorityState>,
873    metrics: Arc<CheckpointMetrics>,
874}
875
876// This holds information to aggregate signatures for one checkpoint
877pub struct CheckpointSignatureAggregator {
878    next_index: u64,
879    summary: CheckpointSummary,
880    digest: CheckpointDigest,
881    /// Aggregates voting stake for each signed checkpoint proposal by authority
882    signatures_by_digest: MultiStakeAggregator<CheckpointDigest, CheckpointSummary, true>,
883    tables: Arc<CheckpointStore>,
884    state: Arc<AuthorityState>,
885    metrics: Arc<CheckpointMetrics>,
886}
887
888impl CheckpointBuilder {
889    fn new(
890        state: Arc<AuthorityState>,
891        tables: Arc<CheckpointStore>,
892        epoch_store: Arc<AuthorityPerEpochStore>,
893        notify: Arc<Notify>,
894        effects_store: Arc<dyn TransactionCacheRead>,
895        accumulator: Weak<StateAccumulator>,
896        output: Box<dyn CheckpointOutput>,
897        notify_aggregator: Arc<Notify>,
898        metrics: Arc<CheckpointMetrics>,
899        max_transactions_per_checkpoint: usize,
900        max_checkpoint_size_bytes: usize,
901    ) -> Self {
902        Self {
903            state,
904            tables,
905            epoch_store,
906            notify,
907            effects_store,
908            accumulator,
909            output,
910            notify_aggregator,
911            metrics,
912            max_transactions_per_checkpoint,
913            max_checkpoint_size_bytes,
914        }
915    }
916
917    /// Runs the `CheckpointBuilder` in an asynchronous loop, managing the
918    /// creation of checkpoints.
919    async fn run(mut self) {
920        info!("Starting CheckpointBuilder");
921        loop {
922            self.maybe_build_checkpoints().await;
923
924            self.notify.notified().await;
925        }
926    }
927
928    async fn maybe_build_checkpoints(&mut self) {
929        let _scope = monitored_scope("BuildCheckpoints");
930
931        // Collect info about the most recently built checkpoint.
932        let summary = self
933            .epoch_store
934            .last_built_checkpoint_builder_summary()
935            .expect("epoch should not have ended");
936        let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
937        let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);
938
939        let min_checkpoint_interval_ms = self
940            .epoch_store
941            .protocol_config()
942            .min_checkpoint_interval_ms_as_option()
943            .unwrap_or_default();
944        let mut grouped_pending_checkpoints = Vec::new();
945        let mut checkpoints_iter = self
946            .epoch_store
947            .get_pending_checkpoints(last_height)
948            .expect("unexpected epoch store error")
949            .into_iter()
950            .peekable();
951        while let Some((height, pending)) = checkpoints_iter.next() {
952            // Group PendingCheckpoints until:
953            // - minimum interval has elapsed ...
954            let current_timestamp = pending.details().timestamp_ms;
955            let can_build = match last_timestamp {
956                    Some(last_timestamp) => {
957                        current_timestamp >= last_timestamp + min_checkpoint_interval_ms
958                    }
959                    None => true,
960                // - or, next PendingCheckpoint is last-of-epoch (since the last-of-epoch checkpoint
961                //   should be written separately) ...
962                } || checkpoints_iter
963                    .peek()
964                    .is_some_and(|(_, next_pending)| next_pending.details().last_of_epoch)
965                // - or, we have reached end of epoch.
966                    || pending.details().last_of_epoch;
967            grouped_pending_checkpoints.push(pending);
968            if !can_build {
969                debug!(
970                    checkpoint_commit_height = height,
971                    ?last_timestamp,
972                    ?current_timestamp,
973                    "waiting for more PendingCheckpoints: minimum interval not yet elapsed"
974                );
975                continue;
976            }
977
978            // Min interval has elapsed, we can now coalesce and build a checkpoint.
979            last_height = Some(height);
980            last_timestamp = Some(current_timestamp);
981            debug!(
982                checkpoint_commit_height = height,
983                "Making checkpoint at commit height"
984            );
985            if let Err(e) = self
986                .make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
987                .await
988            {
989                error!("Error while making checkpoint, will retry in 1s: {:?}", e);
990                tokio::time::sleep(Duration::from_secs(1)).await;
991                self.metrics.checkpoint_errors.inc();
992                return;
993            }
994        }
995        debug!(
996            "Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
997            grouped_pending_checkpoints.len(),
998        );
999    }
1000
1001    #[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
1002    async fn make_checkpoint(&self, pendings: Vec<PendingCheckpoint>) -> anyhow::Result<()> {
1003        let last_details = pendings.last().unwrap().details().clone();
1004
1005        // Keeps track of the effects that are already included in the current
1006        // checkpoint. This is used when there are multiple pending checkpoints
1007        // to create a single checkpoint because in such scenarios, dependencies
1008        // of a transaction may in earlier created checkpoints, or in earlier
1009        // pending checkpoints.
1010        let mut effects_in_current_checkpoint = BTreeSet::new();
1011
1012        // Stores the transactions that should be included in the checkpoint.
1013        // Transactions will be recorded in the checkpoint in this order.
1014        let mut sorted_tx_effects_included_in_checkpoint = Vec::new();
1015        for pending_checkpoint in pendings.into_iter() {
1016            let pending = pending_checkpoint.into_v1();
1017            let txn_in_checkpoint = self
1018                .resolve_checkpoint_transactions(pending.roots, &mut effects_in_current_checkpoint)
1019                .await?;
1020            sorted_tx_effects_included_in_checkpoint.extend(txn_in_checkpoint);
1021        }
1022        let new_checkpoint = self
1023            .create_checkpoints(sorted_tx_effects_included_in_checkpoint, &last_details)
1024            .await?;
1025        self.write_checkpoints(last_details.checkpoint_height, new_checkpoint)
1026            .await?;
1027        Ok(())
1028    }
1029
1030    // Given the root transactions of a pending checkpoint, resolve the transactions
1031    // should be included in the checkpoint, and return them in the order they
1032    // should be included in the checkpoint. `effects_in_current_checkpoint`
1033    // tracks the transactions that already exist in the current checkpoint.
1034    #[instrument(level = "debug", skip_all)]
1035    async fn resolve_checkpoint_transactions(
1036        &self,
1037        roots: Vec<TransactionKey>,
1038        effects_in_current_checkpoint: &mut BTreeSet<TransactionDigest>,
1039    ) -> IotaResult<Vec<TransactionEffects>> {
1040        self.metrics
1041            .checkpoint_roots_count
1042            .inc_by(roots.len() as u64);
1043
1044        let root_digests = self
1045            .epoch_store
1046            .notify_read_executed_digests(&roots)
1047            .in_monitored_scope("CheckpointNotifyDigests")
1048            .await?;
1049        let root_effects = self
1050            .effects_store
1051            .notify_read_executed_effects(&root_digests)
1052            .in_monitored_scope("CheckpointNotifyRead")
1053            .await?;
1054
1055        let _scope = monitored_scope("CheckpointBuilder");
1056
1057        let consensus_commit_prologue = {
1058            // If the roots contains consensus commit prologue transaction, we want to
1059            // extract it, and put it to the front of the checkpoint.
1060
1061            let consensus_commit_prologue = self
1062                .extract_consensus_commit_prologue(&root_digests, &root_effects)
1063                .await?;
1064
1065            // Get the un-included dependencies of the consensus commit prologue. We should
1066            // expect no other dependencies that haven't been included in any
1067            // previous checkpoints.
1068            if let Some((ccp_digest, ccp_effects)) = &consensus_commit_prologue {
1069                let unsorted_ccp = self.complete_checkpoint_effects(
1070                    vec![ccp_effects.clone()],
1071                    effects_in_current_checkpoint,
1072                )?;
1073
1074                // No other dependencies of this consensus commit prologue that haven't been
1075                // included in any previous checkpoint.
1076                assert_eq!(unsorted_ccp.len(), 1);
1077                assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest);
1078            }
1079            consensus_commit_prologue
1080        };
1081
1082        let unsorted =
1083            self.complete_checkpoint_effects(root_effects, effects_in_current_checkpoint)?;
1084
1085        let _scope = monitored_scope("CheckpointBuilder::causal_sort");
1086        let mut sorted: Vec<TransactionEffects> = Vec::with_capacity(unsorted.len() + 1);
1087        if let Some((_ccp_digest, ccp_effects)) = consensus_commit_prologue {
1088            #[cfg(debug_assertions)]
1089            {
1090                // When consensus_commit_prologue is extracted, it should not be included in the
1091                // `unsorted`.
1092                for tx in unsorted.iter() {
1093                    assert!(tx.transaction_digest() != &_ccp_digest);
1094                }
1095            }
1096            sorted.push(ccp_effects);
1097        }
1098        sorted.extend(CausalOrder::causal_sort(unsorted));
1099
1100        #[cfg(msim)]
1101        {
1102            // Check consensus commit prologue invariants in sim test.
1103            self.expensive_consensus_commit_prologue_invariants_check(&root_digests, &sorted);
1104        }
1105
1106        Ok(sorted)
1107    }
1108
1109    // This function is used to extract the consensus commit prologue digest and
1110    // effects from the root transactions.
1111    // The consensus commit prologue is expected to be the first transaction in the
1112    // roots.
1113    async fn extract_consensus_commit_prologue(
1114        &self,
1115        root_digests: &[TransactionDigest],
1116        root_effects: &[TransactionEffects],
1117    ) -> IotaResult<Option<(TransactionDigest, TransactionEffects)>> {
1118        let _scope = monitored_scope("CheckpointBuilder::extract_consensus_commit_prologue");
1119        if root_digests.is_empty() {
1120            return Ok(None);
1121        }
1122
1123        // Reads the first transaction in the roots, and checks whether it is a
1124        // consensus commit prologue transaction.
1125        // The consensus commit prologue transaction should be the first transaction
1126        // in the roots written by the consensus handler.
1127        let first_tx = self
1128            .state
1129            .get_transaction_cache_reader()
1130            .get_transaction_block(&root_digests[0])?
1131            .expect("Transaction block must exist");
1132
1133        Ok(match first_tx.transaction_data().kind() {
1134            TransactionKind::ConsensusCommitPrologueV1(_) => {
1135                assert_eq!(first_tx.digest(), root_effects[0].transaction_digest());
1136                Some((*first_tx.digest(), root_effects[0].clone()))
1137            }
1138            _ => None,
1139        })
1140    }
1141
1142    /// Writes the new checkpoints to the DB storage and processes them.
1143    #[instrument(level = "debug", skip_all)]
1144    async fn write_checkpoints(
1145        &self,
1146        height: CheckpointHeight,
1147        new_checkpoints: Vec<(CheckpointSummary, CheckpointContents)>,
1148    ) -> IotaResult {
1149        let _scope = monitored_scope("CheckpointBuilder::write_checkpoints");
1150        let mut batch = self.tables.checkpoint_content.batch();
1151        let mut all_tx_digests =
1152            Vec::with_capacity(new_checkpoints.iter().map(|(_, c)| c.size()).sum());
1153
1154        for (summary, contents) in &new_checkpoints {
1155            debug!(
1156                checkpoint_commit_height = height,
1157                checkpoint_seq = summary.sequence_number,
1158                contents_digest = ?contents.digest(),
1159                "writing checkpoint",
1160            );
1161            all_tx_digests.extend(contents.iter().map(|digests| digests.transaction));
1162
1163            self.output
1164                .checkpoint_created(summary, contents, &self.epoch_store, &self.tables)
1165                .await?;
1166
1167            self.metrics
1168                .transactions_included_in_checkpoint
1169                .inc_by(contents.size() as u64);
1170            let sequence_number = summary.sequence_number;
1171            self.metrics
1172                .last_constructed_checkpoint
1173                .set(sequence_number as i64);
1174
1175            batch.insert_batch(
1176                &self.tables.checkpoint_content,
1177                [(contents.digest(), contents)],
1178            )?;
1179
1180            batch.insert_batch(
1181                &self.tables.locally_computed_checkpoints,
1182                [(sequence_number, summary)],
1183            )?;
1184        }
1185
1186        // Durably commit transactions (but not their outputs) to the database.
1187        // Called before writing a locally built checkpoint to the CheckpointStore, so
1188        // that the inputs of the checkpoint cannot be lost.
1189        // These transactions are guaranteed to be final unless this validator
1190        // forks (i.e. constructs a checkpoint which will never be certified). In this
1191        // case some non-final transactions could be left in the database.
1192        //
1193        // This is an intermediate solution until we delay commits to the epoch db.
1194        // After we have done that, crash recovery will be done by re-processing
1195        // consensus commits and pending_consensus_transactions, and this method
1196        // can be removed.
1197        self.state
1198            .get_cache_commit()
1199            .persist_transactions(&all_tx_digests)
1200            .await?;
1201
1202        batch.write()?;
1203
1204        for (local_checkpoint, _) in &new_checkpoints {
1205            if let Some(certified_checkpoint) = self
1206                .tables
1207                .certified_checkpoints
1208                .get(local_checkpoint.sequence_number())?
1209            {
1210                self.tables
1211                    .check_for_checkpoint_fork(local_checkpoint, &certified_checkpoint.into());
1212            }
1213        }
1214
1215        self.notify_aggregator.notify_one();
1216        self.epoch_store
1217            .process_pending_checkpoint(height, new_checkpoints)?;
1218        Ok(())
1219    }
1220
1221    #[expect(clippy::type_complexity)]
1222    fn split_checkpoint_chunks(
1223        &self,
1224        effects_and_transaction_sizes: Vec<(TransactionEffects, usize)>,
1225        signatures: Vec<Vec<GenericSignature>>,
1226    ) -> anyhow::Result<Vec<Vec<(TransactionEffects, Vec<GenericSignature>)>>> {
1227        let _guard = monitored_scope("CheckpointBuilder::split_checkpoint_chunks");
1228        let mut chunks = Vec::new();
1229        let mut chunk = Vec::new();
1230        let mut chunk_size: usize = 0;
1231        for ((effects, transaction_size), signatures) in effects_and_transaction_sizes
1232            .into_iter()
1233            .zip(signatures.into_iter())
1234        {
1235            // Roll over to a new chunk after either max count or max size is reached.
1236            // The size calculation here is intended to estimate the size of the
1237            // FullCheckpointContents struct. If this code is modified, that struct
1238            // should also be updated accordingly.
1239            let size = transaction_size
1240                + bcs::serialized_size(&effects)?
1241                + bcs::serialized_size(&signatures)?;
1242            if chunk.len() == self.max_transactions_per_checkpoint
1243                || (chunk_size + size) > self.max_checkpoint_size_bytes
1244            {
1245                if chunk.is_empty() {
1246                    // Always allow at least one tx in a checkpoint.
1247                    warn!(
1248                        "Size of single transaction ({size}) exceeds max checkpoint size ({}); allowing excessively large checkpoint to go through.",
1249                        self.max_checkpoint_size_bytes
1250                    );
1251                } else {
1252                    chunks.push(chunk);
1253                    chunk = Vec::new();
1254                    chunk_size = 0;
1255                }
1256            }
1257
1258            chunk.push((effects, signatures));
1259            chunk_size += size;
1260        }
1261
1262        if !chunk.is_empty() || chunks.is_empty() {
1263            // We intentionally create an empty checkpoint if there is no content provided
1264            // to make a 'heartbeat' checkpoint.
1265            // Important: if some conditions are added here later, we need to make sure we
1266            // always have at least one chunk if last_pending_of_epoch is set
1267            chunks.push(chunk);
1268            // Note: empty checkpoints are ok - they shouldn't happen at all on
1269            // a network with even modest load. Even if they do
1270            // happen, it is still useful as it allows fullnodes to
1271            // distinguish between "no transactions have happened" and "i am not
1272            // receiving new checkpoints".
1273        }
1274        Ok(chunks)
1275    }
1276
1277    /// Creates checkpoints using the provided transaction effects and pending
1278    /// checkpoint information.
1279    #[instrument(level = "debug", skip_all)]
1280    async fn create_checkpoints(
1281        &self,
1282        all_effects: Vec<TransactionEffects>,
1283        details: &PendingCheckpointInfo,
1284    ) -> anyhow::Result<Vec<(CheckpointSummary, CheckpointContents)>> {
1285        let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
1286        let total = all_effects.len();
1287        let mut last_checkpoint = self.epoch_store.last_built_checkpoint_summary()?;
1288        if last_checkpoint.is_none() {
1289            let epoch = self.epoch_store.epoch();
1290            if epoch > 0 {
1291                let previous_epoch = epoch - 1;
1292                let last_verified = self.tables.get_epoch_last_checkpoint(previous_epoch)?;
1293                last_checkpoint = last_verified.map(VerifiedCheckpoint::into_summary_and_sequence);
1294                if let Some((ref seq, _)) = last_checkpoint {
1295                    debug!(
1296                        "No checkpoints in builder DB, taking checkpoint from previous epoch with sequence {seq}"
1297                    );
1298                } else {
1299                    // This is some serious bug with when CheckpointBuilder started so surfacing it
1300                    // via panic
1301                    panic!("Can not find last checkpoint for previous epoch {previous_epoch}");
1302                }
1303            }
1304        }
1305        let last_checkpoint_seq = last_checkpoint.as_ref().map(|(seq, _)| *seq);
1306        debug!(
1307            next_checkpoint_seq = last_checkpoint_seq.unwrap_or_default() + 1,
1308            checkpoint_timestamp = details.timestamp_ms,
1309            "Creating checkpoint(s) for {} transactions",
1310            all_effects.len(),
1311        );
1312
1313        let all_digests: Vec<_> = all_effects
1314            .iter()
1315            .map(|effect| *effect.transaction_digest())
1316            .collect();
1317        let transactions_and_sizes = self
1318            .state
1319            .get_transaction_cache_reader()
1320            .get_transactions_and_serialized_sizes(&all_digests)?;
1321        let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
1322        let mut transactions = Vec::with_capacity(all_effects.len());
1323        let mut transaction_keys = Vec::with_capacity(all_effects.len());
1324        let mut randomness_rounds = BTreeMap::new();
1325        {
1326            let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
1327            debug!(
1328                ?last_checkpoint_seq,
1329                "Waiting for {:?} certificates to appear in consensus",
1330                all_effects.len()
1331            );
1332
1333            for (effects, transaction_and_size) in all_effects
1334                .into_iter()
1335                .zip(transactions_and_sizes.into_iter())
1336            {
1337                let (transaction, size) = transaction_and_size
1338                    .unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
1339                match transaction.inner().transaction_data().kind() {
1340                    TransactionKind::ConsensusCommitPrologueV1(_)
1341                    | TransactionKind::AuthenticatorStateUpdateV1(_) => {
1342                        // ConsensusCommitPrologue and
1343                        // AuthenticatorStateUpdateV1
1344                        // are guaranteed to be
1345                        // processed before we reach here.
1346                    }
1347                    TransactionKind::RandomnessStateUpdate(rsu) => {
1348                        randomness_rounds
1349                            .insert(*effects.transaction_digest(), rsu.randomness_round);
1350                    }
1351                    _ => {
1352                        // All other tx should be included in the call to
1353                        // `consensus_messages_processed_notify`.
1354                        transaction_keys.push(SequencedConsensusTransactionKey::External(
1355                            ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
1356                        ));
1357                    }
1358                }
1359                transactions.push(transaction);
1360                all_effects_and_transaction_sizes.push((effects, size));
1361            }
1362
1363            self.epoch_store
1364                .consensus_messages_processed_notify(transaction_keys)
1365                .await?;
1366        }
1367
1368        let signatures = self
1369            .epoch_store
1370            .user_signatures_for_checkpoint(&transactions, &all_digests)?;
1371        debug!(
1372            ?last_checkpoint_seq,
1373            "Received {} checkpoint user signatures from consensus",
1374            signatures.len()
1375        );
1376
1377        let chunks = self.split_checkpoint_chunks(all_effects_and_transaction_sizes, signatures)?;
1378        let chunks_count = chunks.len();
1379
1380        let mut checkpoints = Vec::with_capacity(chunks_count);
1381        debug!(
1382            ?last_checkpoint_seq,
1383            "Creating {} checkpoints with {} transactions", chunks_count, total,
1384        );
1385
1386        let epoch = self.epoch_store.epoch();
1387        for (index, transactions) in chunks.into_iter().enumerate() {
1388            let first_checkpoint_of_epoch = index == 0
1389                && last_checkpoint
1390                    .as_ref()
1391                    .map(|(_, c)| c.epoch != epoch)
1392                    .unwrap_or(true);
1393            if first_checkpoint_of_epoch {
1394                self.epoch_store
1395                    .record_epoch_first_checkpoint_creation_time_metric();
1396            }
1397            let last_checkpoint_of_epoch = details.last_of_epoch && index == chunks_count - 1;
1398
1399            let sequence_number = last_checkpoint
1400                .as_ref()
1401                .map(|(_, c)| c.sequence_number + 1)
1402                .unwrap_or_default();
1403            let timestamp_ms = details.timestamp_ms;
1404            if let Some((_, last_checkpoint)) = &last_checkpoint {
1405                if last_checkpoint.timestamp_ms > timestamp_ms {
1406                    error!(
1407                        "Unexpected decrease of checkpoint timestamp, sequence: {}, previous: {}, current: {}",
1408                        sequence_number, last_checkpoint.timestamp_ms, timestamp_ms
1409                    );
1410                }
1411            }
1412
1413            let (mut effects, mut signatures): (Vec<_>, Vec<_>) = transactions.into_iter().unzip();
1414            let epoch_rolling_gas_cost_summary =
1415                self.get_epoch_total_gas_cost(last_checkpoint.as_ref().map(|(_, c)| c), &effects);
1416
1417            let end_of_epoch_data = if last_checkpoint_of_epoch {
1418                let (system_state_obj, system_epoch_info_event) = self
1419                    .augment_epoch_last_checkpoint(
1420                        &epoch_rolling_gas_cost_summary,
1421                        timestamp_ms,
1422                        &mut effects,
1423                        &mut signatures,
1424                        sequence_number,
1425                    )
1426                    .await?;
1427
1428                // The system epoch info event can be `None` in case if the `advance_epoch`
1429                // Move function call failed and was executed in the safe mode.
1430                // In this case, the tokens supply should be unchanged.
1431                //
1432                // SAFETY: The number of minted and burnt tokens easily fit into an i64 and due
1433                // to those small numbers, no overflows will occur during conversion or
1434                // subtraction.
1435                let epoch_supply_change =
1436                    system_epoch_info_event.map_or(0, |event| event.supply_change());
1437
1438                let committee = system_state_obj
1439                    .get_current_epoch_committee()
1440                    .committee()
1441                    .clone();
1442
1443                // This must happen after the call to augment_epoch_last_checkpoint,
1444                // otherwise we will not capture the change_epoch tx.
1445                let root_state_digest = {
1446                    let state_acc = self
1447                        .accumulator
1448                        .upgrade()
1449                        .expect("No checkpoints should be getting built after local configuration");
1450                    let acc = state_acc.accumulate_checkpoint(
1451                        effects.clone(),
1452                        sequence_number,
1453                        &self.epoch_store,
1454                    )?;
1455                    state_acc
1456                        .accumulate_running_root(&self.epoch_store, sequence_number, Some(acc))
1457                        .await?;
1458                    state_acc
1459                        .digest_epoch(self.epoch_store.clone(), sequence_number)
1460                        .await?
1461                };
1462                self.metrics.highest_accumulated_epoch.set(epoch as i64);
1463                info!("Epoch {epoch} root state hash digest: {root_state_digest:?}");
1464
1465                let epoch_commitments = vec![root_state_digest.into()];
1466
1467                Some(EndOfEpochData {
1468                    next_epoch_committee: committee.voting_rights,
1469                    next_epoch_protocol_version: ProtocolVersion::new(
1470                        system_state_obj.protocol_version(),
1471                    ),
1472                    epoch_commitments,
1473                    epoch_supply_change,
1474                })
1475            } else {
1476                None
1477            };
1478            let contents = CheckpointContents::new_with_digests_and_signatures(
1479                effects.iter().map(TransactionEffects::execution_digests),
1480                signatures,
1481            );
1482
1483            let num_txns = contents.size() as u64;
1484
1485            let network_total_transactions = last_checkpoint
1486                .as_ref()
1487                .map(|(_, c)| c.network_total_transactions + num_txns)
1488                .unwrap_or(num_txns);
1489
1490            let previous_digest = last_checkpoint.as_ref().map(|(_, c)| c.digest());
1491
1492            let matching_randomness_rounds: Vec<_> = effects
1493                .iter()
1494                .filter_map(|e| randomness_rounds.get(e.transaction_digest()))
1495                .copied()
1496                .collect();
1497
1498            let summary = CheckpointSummary::new(
1499                self.epoch_store.protocol_config(),
1500                epoch,
1501                sequence_number,
1502                network_total_transactions,
1503                &contents,
1504                previous_digest,
1505                epoch_rolling_gas_cost_summary,
1506                end_of_epoch_data,
1507                timestamp_ms,
1508                matching_randomness_rounds,
1509            );
1510            summary.report_checkpoint_age_ms(&self.metrics.last_created_checkpoint_age_ms);
1511            if last_checkpoint_of_epoch {
1512                info!(
1513                    checkpoint_seq = sequence_number,
1514                    "creating last checkpoint of epoch {}", epoch
1515                );
1516                if let Some(stats) = self.tables.get_epoch_stats(epoch, &summary) {
1517                    self.epoch_store
1518                        .report_epoch_metrics_at_last_checkpoint(stats);
1519                }
1520            }
1521            last_checkpoint = Some((sequence_number, summary.clone()));
1522            checkpoints.push((summary, contents));
1523        }
1524
1525        Ok(checkpoints)
1526    }
1527
1528    fn get_epoch_total_gas_cost(
1529        &self,
1530        last_checkpoint: Option<&CheckpointSummary>,
1531        cur_checkpoint_effects: &[TransactionEffects],
1532    ) -> GasCostSummary {
1533        let (previous_epoch, previous_gas_costs) = last_checkpoint
1534            .map(|c| (c.epoch, c.epoch_rolling_gas_cost_summary.clone()))
1535            .unwrap_or_default();
1536        let current_gas_costs = GasCostSummary::new_from_txn_effects(cur_checkpoint_effects.iter());
1537        if previous_epoch == self.epoch_store.epoch() {
1538            // sum only when we are within the same epoch
1539            GasCostSummary::new(
1540                previous_gas_costs.computation_cost + current_gas_costs.computation_cost,
1541                previous_gas_costs.computation_cost_burned
1542                    + current_gas_costs.computation_cost_burned,
1543                previous_gas_costs.storage_cost + current_gas_costs.storage_cost,
1544                previous_gas_costs.storage_rebate + current_gas_costs.storage_rebate,
1545                previous_gas_costs.non_refundable_storage_fee
1546                    + current_gas_costs.non_refundable_storage_fee,
1547            )
1548        } else {
1549            current_gas_costs
1550        }
1551    }
1552
1553    /// Augments the last checkpoint of the epoch by creating and executing an
1554    /// advance epoch transaction.
1555    #[instrument(level = "error", skip_all)]
1556    async fn augment_epoch_last_checkpoint(
1557        &self,
1558        epoch_total_gas_cost: &GasCostSummary,
1559        epoch_start_timestamp_ms: CheckpointTimestamp,
1560        checkpoint_effects: &mut Vec<TransactionEffects>,
1561        signatures: &mut Vec<Vec<GenericSignature>>,
1562        checkpoint: CheckpointSequenceNumber,
1563    ) -> anyhow::Result<(IotaSystemState, Option<SystemEpochInfoEvent>)> {
1564        let (system_state, system_epoch_info_event, effects) = self
1565            .state
1566            .create_and_execute_advance_epoch_tx(
1567                &self.epoch_store,
1568                epoch_total_gas_cost,
1569                checkpoint,
1570                epoch_start_timestamp_ms,
1571            )
1572            .await?;
1573        checkpoint_effects.push(effects);
1574        signatures.push(vec![]);
1575        Ok((system_state, system_epoch_info_event))
1576    }
1577
1578    /// For the given roots return complete list of effects to include in
1579    /// checkpoint This list includes the roots and all their dependencies,
1580    /// which are not part of checkpoint already. Note that this function
1581    /// may be called multiple times to construct the checkpoint.
1582    /// `existing_tx_digests_in_checkpoint` is used to track the transactions
1583    /// that are already included in the checkpoint. Txs in `roots` that
1584    /// need to be included in the checkpoint will be added to
1585    /// `existing_tx_digests_in_checkpoint` after the call of this function.
1586    #[instrument(level = "debug", skip_all)]
1587    fn complete_checkpoint_effects(
1588        &self,
1589        mut roots: Vec<TransactionEffects>,
1590        existing_tx_digests_in_checkpoint: &mut BTreeSet<TransactionDigest>,
1591    ) -> IotaResult<Vec<TransactionEffects>> {
1592        let _scope = monitored_scope("CheckpointBuilder::complete_checkpoint_effects");
1593        let mut results = vec![];
1594        let mut seen = HashSet::new();
1595        loop {
1596            let mut pending = HashSet::new();
1597
1598            let transactions_included = self
1599                .epoch_store
1600                .builder_included_transactions_in_checkpoint(
1601                    roots.iter().map(|e| e.transaction_digest()),
1602                )?;
1603
1604            for (effect, tx_included) in roots.into_iter().zip(transactions_included.into_iter()) {
1605                let digest = effect.transaction_digest();
1606                // Unnecessary to read effects of a dependency if the effect is already
1607                // processed.
1608                seen.insert(*digest);
1609
1610                // Skip roots that are already included in the checkpoint.
1611                if existing_tx_digests_in_checkpoint.contains(effect.transaction_digest()) {
1612                    continue;
1613                }
1614
1615                // Skip roots already included in checkpoints or roots from previous epochs
1616                if tx_included || effect.executed_epoch() < self.epoch_store.epoch() {
1617                    continue;
1618                }
1619
1620                let existing_effects = self
1621                    .epoch_store
1622                    .transactions_executed_in_cur_epoch(effect.dependencies().iter())?;
1623
1624                for (dependency, effects_signature_exists) in
1625                    effect.dependencies().iter().zip(existing_effects.iter())
1626                {
1627                    // Skip here if dependency not executed in the current epoch.
1628                    // Note that the existence of an effects signature in the
1629                    // epoch store for the given digest indicates that the transaction
1630                    // was locally executed in the current epoch
1631                    if !effects_signature_exists {
1632                        continue;
1633                    }
1634                    if seen.insert(*dependency) {
1635                        pending.insert(*dependency);
1636                    }
1637                }
1638                results.push(effect);
1639            }
1640            if pending.is_empty() {
1641                break;
1642            }
1643            let pending = pending.into_iter().collect::<Vec<_>>();
1644            let effects = self.effects_store.multi_get_executed_effects(&pending)?;
1645            let effects = effects
1646                .into_iter()
1647                .zip(pending)
1648                .map(|(opt, digest)| match opt {
1649                    Some(x) => x,
1650                    None => panic!(
1651                        "Can not find effect for transaction {:?}, however transaction that depend on it was already executed",
1652                        digest
1653                    ),
1654                })
1655                .collect::<Vec<_>>();
1656            roots = effects;
1657        }
1658
1659        existing_tx_digests_in_checkpoint.extend(results.iter().map(|e| e.transaction_digest()));
1660        Ok(results)
1661    }
1662
1663    // This function is used to check the invariants of the consensus commit
1664    // prologue transactions in the checkpoint in simtest.
1665    #[cfg(msim)]
1666    fn expensive_consensus_commit_prologue_invariants_check(
1667        &self,
1668        root_digests: &[TransactionDigest],
1669        sorted: &[TransactionEffects],
1670    ) {
1671        // Gets all the consensus commit prologue transactions from the roots.
1672        let root_txs = self
1673            .state
1674            .get_transaction_cache_reader()
1675            .multi_get_transaction_blocks(root_digests)
1676            .unwrap();
1677        let ccps = root_txs
1678            .iter()
1679            .filter_map(|tx| {
1680                tx.as_ref().filter(|tx| {
1681                    matches!(
1682                        tx.transaction_data().kind(),
1683                        TransactionKind::ConsensusCommitPrologueV1(_)
1684                    )
1685                })
1686            })
1687            .collect::<Vec<_>>();
1688
1689        // There should be at most one consensus commit prologue transaction in the
1690        // roots.
1691        assert!(ccps.len() <= 1);
1692
1693        // Get all the transactions in the checkpoint.
1694        let txs = self
1695            .state
1696            .get_transaction_cache_reader()
1697            .multi_get_transaction_blocks(
1698                &sorted
1699                    .iter()
1700                    .map(|tx| *tx.transaction_digest())
1701                    .collect::<Vec<_>>(),
1702            )
1703            .unwrap();
1704
1705        if ccps.is_empty() {
1706            // If there is no consensus commit prologue transaction in the roots, then there
1707            // should be no consensus commit prologue transaction in the
1708            // checkpoint.
1709            for tx in txs.iter().flatten() {
1710                assert!(!matches!(
1711                    tx.transaction_data().kind(),
1712                    TransactionKind::ConsensusCommitPrologueV1(_)
1713                ));
1714            }
1715        } else {
1716            // If there is one consensus commit prologue, it must be the first one in the
1717            // checkpoint.
1718            assert!(matches!(
1719                txs[0].as_ref().unwrap().transaction_data().kind(),
1720                TransactionKind::ConsensusCommitPrologueV1(_)
1721            ));
1722
1723            assert_eq!(ccps[0].digest(), txs[0].as_ref().unwrap().digest());
1724
1725            for tx in txs.iter().skip(1).flatten() {
1726                assert!(!matches!(
1727                    tx.transaction_data().kind(),
1728                    TransactionKind::ConsensusCommitPrologueV1(_)
1729                ));
1730            }
1731        }
1732    }
1733}
1734
1735impl CheckpointAggregator {
1736    fn new(
1737        tables: Arc<CheckpointStore>,
1738        epoch_store: Arc<AuthorityPerEpochStore>,
1739        notify: Arc<Notify>,
1740        output: Box<dyn CertifiedCheckpointOutput>,
1741        state: Arc<AuthorityState>,
1742        metrics: Arc<CheckpointMetrics>,
1743    ) -> Self {
1744        let current = None;
1745        Self {
1746            tables,
1747            epoch_store,
1748            notify,
1749            current,
1750            output,
1751            state,
1752            metrics,
1753        }
1754    }
1755
1756    /// Runs the `CheckpointAggregator` in an asynchronous loop, managing the
1757    /// aggregation of checkpoints.
1758    /// The function ensures continuous aggregation of checkpoints, handling
1759    /// errors and retries gracefully, and allowing for proper shutdown on
1760    /// receiving an exit signal.
1761    async fn run(mut self) {
1762        info!("Starting CheckpointAggregator");
1763        loop {
1764            if let Err(e) = self.run_and_notify().await {
1765                error!(
1766                    "Error while aggregating checkpoint, will retry in 1s: {:?}",
1767                    e
1768                );
1769                self.metrics.checkpoint_errors.inc();
1770                tokio::time::sleep(Duration::from_secs(1)).await;
1771                continue;
1772            }
1773
1774            let _ = timeout(Duration::from_secs(1), self.notify.notified()).await;
1775        }
1776    }
1777
1778    async fn run_and_notify(&mut self) -> IotaResult {
1779        let summaries = self.run_inner()?;
1780        for summary in summaries {
1781            self.output.certified_checkpoint_created(&summary).await?;
1782        }
1783        Ok(())
1784    }
1785
1786    fn run_inner(&mut self) -> IotaResult<Vec<CertifiedCheckpointSummary>> {
1787        let _scope = monitored_scope("CheckpointAggregator");
1788        let mut result = vec![];
1789        'outer: loop {
1790            let next_to_certify = self.next_checkpoint_to_certify();
1791            let current = if let Some(current) = &mut self.current {
1792                // It's possible that the checkpoint was already certified by
1793                // the rest of the network and we've already received the
1794                // certified checkpoint via StateSync. In this case, we reset
1795                // the current signature aggregator to the next checkpoint to
1796                // be certified
1797                if current.summary.sequence_number < next_to_certify {
1798                    self.current = None;
1799                    continue;
1800                }
1801                current
1802            } else {
1803                let Some(summary) = self
1804                    .epoch_store
1805                    .get_built_checkpoint_summary(next_to_certify)?
1806                else {
1807                    return Ok(result);
1808                };
1809                self.current = Some(CheckpointSignatureAggregator {
1810                    next_index: 0,
1811                    digest: summary.digest(),
1812                    summary,
1813                    signatures_by_digest: MultiStakeAggregator::new(
1814                        self.epoch_store.committee().clone(),
1815                    ),
1816                    tables: self.tables.clone(),
1817                    state: self.state.clone(),
1818                    metrics: self.metrics.clone(),
1819                });
1820                self.current.as_mut().unwrap()
1821            };
1822
1823            let epoch_tables = self
1824                .epoch_store
1825                .tables()
1826                .expect("should not run past end of epoch");
1827            let iter = epoch_tables.get_pending_checkpoint_signatures_iter(
1828                current.summary.sequence_number,
1829                current.next_index,
1830            )?;
1831            for ((seq, index), data) in iter {
1832                if seq != current.summary.sequence_number {
1833                    debug!(
1834                        checkpoint_seq =? current.summary.sequence_number,
1835                        "Not enough checkpoint signatures",
1836                    );
1837                    // No more signatures (yet) for this checkpoint
1838                    return Ok(result);
1839                }
1840                debug!(
1841                    checkpoint_seq = current.summary.sequence_number,
1842                    "Processing signature for checkpoint (digest: {:?}) from {:?}",
1843                    current.summary.digest(),
1844                    data.summary.auth_sig().authority.concise()
1845                );
1846                self.metrics
1847                    .checkpoint_participation
1848                    .with_label_values(&[&format!(
1849                        "{:?}",
1850                        data.summary.auth_sig().authority.concise()
1851                    )])
1852                    .inc();
1853                if let Ok(auth_signature) = current.try_aggregate(data) {
1854                    let summary = VerifiedCheckpoint::new_unchecked(
1855                        CertifiedCheckpointSummary::new_from_data_and_sig(
1856                            current.summary.clone(),
1857                            auth_signature,
1858                        ),
1859                    );
1860
1861                    self.tables.insert_certified_checkpoint(&summary)?;
1862                    self.metrics
1863                        .last_certified_checkpoint
1864                        .set(current.summary.sequence_number as i64);
1865                    current
1866                        .summary
1867                        .report_checkpoint_age_ms(&self.metrics.last_certified_checkpoint_age_ms);
1868                    result.push(summary.into_inner());
1869                    self.current = None;
1870                    continue 'outer;
1871                } else {
1872                    current.next_index = index + 1;
1873                }
1874            }
1875            break;
1876        }
1877        Ok(result)
1878    }
1879
1880    fn next_checkpoint_to_certify(&self) -> CheckpointSequenceNumber {
1881        self.tables
1882            .certified_checkpoints
1883            .unbounded_iter()
1884            .skip_to_last()
1885            .next()
1886            .map(|(seq, _)| seq + 1)
1887            .unwrap_or_default()
1888    }
1889}
1890
1891impl CheckpointSignatureAggregator {
1892    #[expect(clippy::result_unit_err)]
1893    pub fn try_aggregate(
1894        &mut self,
1895        data: CheckpointSignatureMessage,
1896    ) -> Result<AuthorityStrongQuorumSignInfo, ()> {
1897        let their_digest = *data.summary.digest();
1898        let (_, signature) = data.summary.into_data_and_sig();
1899        let author = signature.authority;
1900        let envelope =
1901            SignedCheckpointSummary::new_from_data_and_sig(self.summary.clone(), signature);
1902        match self.signatures_by_digest.insert(their_digest, envelope) {
1903            // ignore repeated signatures
1904            InsertResult::Failed {
1905                error:
1906                    IotaError::StakeAggregatorRepeatedSigner {
1907                        conflicting_sig: false,
1908                        ..
1909                    },
1910            } => Err(()),
1911            InsertResult::Failed { error } => {
1912                warn!(
1913                    checkpoint_seq = self.summary.sequence_number,
1914                    "Failed to aggregate new signature from validator {:?}: {:?}",
1915                    author.concise(),
1916                    error
1917                );
1918                self.check_for_split_brain();
1919                Err(())
1920            }
1921            InsertResult::QuorumReached(cert) => {
1922                // It is not guaranteed that signature.authority == consensus_cert.author, but
1923                // we do verify the signature so we know that the author signed
1924                // the message at some point.
1925                if their_digest != self.digest {
1926                    self.metrics.remote_checkpoint_forks.inc();
1927                    warn!(
1928                        checkpoint_seq = self.summary.sequence_number,
1929                        "Validator {:?} has mismatching checkpoint digest {}, we have digest {}",
1930                        author.concise(),
1931                        their_digest,
1932                        self.digest
1933                    );
1934                    return Err(());
1935                }
1936                Ok(cert)
1937            }
1938            InsertResult::NotEnoughVotes {
1939                bad_votes: _,
1940                bad_authorities: _,
1941            } => {
1942                self.check_for_split_brain();
1943                Err(())
1944            }
1945        }
1946    }
1947
1948    /// Check if there is a split brain condition in checkpoint signature
1949    /// aggregation, defined as any state wherein it is no longer possible
1950    /// to achieve quorum on a checkpoint proposal, irrespective of the
1951    /// outcome of any outstanding votes.
1952    fn check_for_split_brain(&self) {
1953        debug!(
1954            checkpoint_seq = self.summary.sequence_number,
1955            "Checking for split brain condition"
1956        );
1957        if self.signatures_by_digest.quorum_unreachable() {
1958            // TODO: at this point we should immediately halt processing
1959            // of new transaction certificates to avoid building on top of
1960            // forked output
1961            // self.halt_all_execution();
1962
1963            let digests_by_stake_messages = self
1964                .signatures_by_digest
1965                .get_all_unique_values()
1966                .into_iter()
1967                .sorted_by_key(|(_, (_, stake))| -(*stake as i64))
1968                .map(|(digest, (_authorities, total_stake))| {
1969                    format!("{:?} (total stake: {})", digest, total_stake)
1970                })
1971                .collect::<Vec<String>>();
1972            error!(
1973                checkpoint_seq = self.summary.sequence_number,
1974                "Split brain detected in checkpoint signature aggregation! Remaining stake: {:?}, Digests by stake: {:?}",
1975                self.signatures_by_digest.uncommitted_stake(),
1976                digests_by_stake_messages,
1977            );
1978            self.metrics.split_brain_checkpoint_forks.inc();
1979
1980            let all_unique_values = self.signatures_by_digest.get_all_unique_values();
1981            let local_summary = self.summary.clone();
1982            let state = self.state.clone();
1983            let tables = self.tables.clone();
1984
1985            tokio::spawn(async move {
1986                diagnose_split_brain(all_unique_values, local_summary, state, tables).await;
1987            });
1988        }
1989    }
1990}
1991
1992/// Create data dump containing relevant data for diagnosing cause of the
1993/// split brain by querying one disagreeing validator for full checkpoint
1994/// contents. To minimize peer chatter, we only query one validator at random
1995/// from each disagreeing faction, as all honest validators that participated in
1996/// this round may inevitably run the same process.
1997async fn diagnose_split_brain(
1998    all_unique_values: BTreeMap<CheckpointDigest, (Vec<AuthorityName>, StakeUnit)>,
1999    local_summary: CheckpointSummary,
2000    state: Arc<AuthorityState>,
2001    tables: Arc<CheckpointStore>,
2002) {
2003    debug!(
2004        checkpoint_seq = local_summary.sequence_number,
2005        "Running split brain diagnostics..."
2006    );
2007    let time = Utc::now();
2008    // collect one random disagreeing validator per differing digest
2009    let digest_to_validator = all_unique_values
2010        .iter()
2011        .filter_map(|(digest, (validators, _))| {
2012            if *digest != local_summary.digest() {
2013                let random_validator = validators.choose(&mut OsRng).unwrap();
2014                Some((*digest, *random_validator))
2015            } else {
2016                None
2017            }
2018        })
2019        .collect::<HashMap<_, _>>();
2020    if digest_to_validator.is_empty() {
2021        panic!(
2022            "Given split brain condition, there should be at \
2023                least one validator that disagrees with local signature"
2024        );
2025    }
2026
2027    let epoch_store = state.load_epoch_store_one_call_per_task();
2028    let committee = epoch_store
2029        .epoch_start_state()
2030        .get_iota_committee_with_network_metadata();
2031    let network_config = default_iota_network_config();
2032    let network_clients =
2033        make_network_authority_clients_with_network_config(&committee, &network_config);
2034
2035    // Query all disagreeing validators
2036    let response_futures = digest_to_validator
2037        .values()
2038        .cloned()
2039        .map(|validator| {
2040            let client = network_clients
2041                .get(&validator)
2042                .expect("Failed to get network client");
2043            let request = CheckpointRequest {
2044                sequence_number: Some(local_summary.sequence_number),
2045                request_content: true,
2046                certified: false,
2047            };
2048            client.handle_checkpoint(request)
2049        })
2050        .collect::<Vec<_>>();
2051
2052    let digest_name_pair = digest_to_validator.iter();
2053    let response_data = futures::future::join_all(response_futures)
2054        .await
2055        .into_iter()
2056        .zip(digest_name_pair)
2057        .filter_map(|(response, (digest, name))| match response {
2058            Ok(response) => match response {
2059                CheckpointResponse {
2060                    checkpoint: Some(CheckpointSummaryResponse::Pending(summary)),
2061                    contents: Some(contents),
2062                } => Some((*name, *digest, summary, contents)),
2063                CheckpointResponse {
2064                    checkpoint: Some(CheckpointSummaryResponse::Certified(_)),
2065                    contents: _,
2066                } => {
2067                    panic!("Expected pending checkpoint, but got certified checkpoint");
2068                }
2069                CheckpointResponse {
2070                    checkpoint: None,
2071                    contents: _,
2072                } => {
2073                    error!(
2074                        "Summary for checkpoint {:?} not found on validator {:?}",
2075                        local_summary.sequence_number, name
2076                    );
2077                    None
2078                }
2079                CheckpointResponse {
2080                    checkpoint: _,
2081                    contents: None,
2082                } => {
2083                    error!(
2084                        "Contents for checkpoint {:?} not found on validator {:?}",
2085                        local_summary.sequence_number, name
2086                    );
2087                    None
2088                }
2089            },
2090            Err(e) => {
2091                error!(
2092                    "Failed to get checkpoint contents from validator for fork diagnostics: {:?}",
2093                    e
2094                );
2095                None
2096            }
2097        })
2098        .collect::<Vec<_>>();
2099
2100    let local_checkpoint_contents = tables
2101        .get_checkpoint_contents(&local_summary.content_digest)
2102        .unwrap_or_else(|_| {
2103            panic!(
2104                "Could not find checkpoint contents for digest {:?}",
2105                local_summary.digest()
2106            )
2107        })
2108        .unwrap_or_else(|| {
2109            panic!(
2110                "Could not find local full checkpoint contents for checkpoint {:?}, digest {:?}",
2111                local_summary.sequence_number,
2112                local_summary.digest()
2113            )
2114        });
2115    let local_contents_text = format!("{local_checkpoint_contents:?}");
2116
2117    let local_summary_text = format!("{local_summary:?}");
2118    let local_validator = state.name.concise();
2119    let diff_patches = response_data
2120        .iter()
2121        .map(|(name, other_digest, other_summary, contents)| {
2122            let other_contents_text = format!("{contents:?}");
2123            let other_summary_text = format!("{other_summary:?}");
2124            let (local_transactions, local_effects): (Vec<_>, Vec<_>) = local_checkpoint_contents
2125                .enumerate_transactions(&local_summary)
2126                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2127                .unzip();
2128            let (other_transactions, other_effects): (Vec<_>, Vec<_>) = contents
2129                .enumerate_transactions(other_summary)
2130                .map(|(_, exec_digest)| (exec_digest.transaction, exec_digest.effects))
2131                .unzip();
2132            let summary_patch = create_patch(&local_summary_text, &other_summary_text);
2133            let contents_patch = create_patch(&local_contents_text, &other_contents_text);
2134            let local_transactions_text = format!("{local_transactions:#?}");
2135            let other_transactions_text = format!("{other_transactions:#?}");
2136            let transactions_patch =
2137                create_patch(&local_transactions_text, &other_transactions_text);
2138            let local_effects_text = format!("{local_effects:#?}");
2139            let other_effects_text = format!("{other_effects:#?}");
2140            let effects_patch = create_patch(&local_effects_text, &other_effects_text);
2141            let seq_number = local_summary.sequence_number;
2142            let local_digest = local_summary.digest();
2143            let other_validator = name.concise();
2144            format!(
2145                "Checkpoint: {seq_number:?}\n\
2146                Local validator (original): {local_validator:?}, digest: {local_digest:?}\n\
2147                Other validator (modified): {other_validator:?}, digest: {other_digest:?}\n\n\
2148                Summary Diff: \n{summary_patch}\n\n\
2149                Contents Diff: \n{contents_patch}\n\n\
2150                Transactions Diff: \n{transactions_patch}\n\n\
2151                Effects Diff: \n{effects_patch}",
2152            )
2153        })
2154        .collect::<Vec<_>>()
2155        .join("\n\n\n");
2156
2157    let header = format!(
2158        "Checkpoint Fork Dump - Authority {local_validator:?}: \n\
2159        Datetime: {time}",
2160    );
2161    let fork_logs_text = format!("{header}\n\n{diff_patches}\n\n");
2162    let path = tempfile::tempdir()
2163        .expect("Failed to create tempdir")
2164        .into_path()
2165        .join(Path::new("checkpoint_fork_dump.txt"));
2166    let mut file = File::create(path).unwrap();
2167    write!(file, "{}", fork_logs_text).unwrap();
2168    debug!("{}", fork_logs_text);
2169
2170    fail_point!("split_brain_reached");
2171}
2172
2173pub trait CheckpointServiceNotify {
2174    fn notify_checkpoint_signature(
2175        &self,
2176        epoch_store: &AuthorityPerEpochStore,
2177        info: &CheckpointSignatureMessage,
2178    ) -> IotaResult;
2179
2180    fn notify_checkpoint(&self) -> IotaResult;
2181}
2182
2183/// This is a service used to communicate with other pieces of iota(for ex.
2184/// authority)
2185pub struct CheckpointService {
2186    tables: Arc<CheckpointStore>,
2187    notify_builder: Arc<Notify>,
2188    notify_aggregator: Arc<Notify>,
2189    last_signature_index: Mutex<u64>,
2190    metrics: Arc<CheckpointMetrics>,
2191}
2192
2193impl CheckpointService {
2194    /// Spawns the checkpoint service, initializing and starting the checkpoint
2195    /// builder and aggregator tasks.
2196    pub fn spawn(
2197        state: Arc<AuthorityState>,
2198        checkpoint_store: Arc<CheckpointStore>,
2199        epoch_store: Arc<AuthorityPerEpochStore>,
2200        effects_store: Arc<dyn TransactionCacheRead>,
2201        accumulator: Weak<StateAccumulator>,
2202        checkpoint_output: Box<dyn CheckpointOutput>,
2203        certified_checkpoint_output: Box<dyn CertifiedCheckpointOutput>,
2204        metrics: Arc<CheckpointMetrics>,
2205        max_transactions_per_checkpoint: usize,
2206        max_checkpoint_size_bytes: usize,
2207    ) -> (Arc<Self>, JoinSet<()> /* Handle to tasks */) {
2208        info!(
2209            "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes"
2210        );
2211        let notify_builder = Arc::new(Notify::new());
2212        let notify_aggregator = Arc::new(Notify::new());
2213
2214        let mut tasks = JoinSet::new();
2215
2216        let builder = CheckpointBuilder::new(
2217            state.clone(),
2218            checkpoint_store.clone(),
2219            epoch_store.clone(),
2220            notify_builder.clone(),
2221            effects_store,
2222            accumulator,
2223            checkpoint_output,
2224            notify_aggregator.clone(),
2225            metrics.clone(),
2226            max_transactions_per_checkpoint,
2227            max_checkpoint_size_bytes,
2228        );
2229        tasks.spawn(monitored_future!(builder.run()));
2230
2231        let aggregator = CheckpointAggregator::new(
2232            checkpoint_store.clone(),
2233            epoch_store.clone(),
2234            notify_aggregator.clone(),
2235            certified_checkpoint_output,
2236            state.clone(),
2237            metrics.clone(),
2238        );
2239        tasks.spawn(monitored_future!(aggregator.run()));
2240
2241        let last_signature_index = epoch_store
2242            .get_last_checkpoint_signature_index()
2243            .expect("should not cross end of epoch");
2244        let last_signature_index = Mutex::new(last_signature_index);
2245
2246        let service = Arc::new(Self {
2247            tables: checkpoint_store,
2248            notify_builder,
2249            notify_aggregator,
2250            last_signature_index,
2251            metrics,
2252        });
2253
2254        (service, tasks)
2255    }
2256
2257    #[cfg(test)]
2258    fn write_and_notify_checkpoint_for_testing(
2259        &self,
2260        epoch_store: &AuthorityPerEpochStore,
2261        checkpoint: PendingCheckpoint,
2262    ) -> IotaResult {
2263        use crate::authority::authority_per_epoch_store::ConsensusCommitOutput;
2264
2265        let mut output = ConsensusCommitOutput::new();
2266        epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?;
2267        let mut batch = epoch_store.db_batch_for_test();
2268        output.write_to_batch(epoch_store, &mut batch)?;
2269        batch.write()?;
2270        self.notify_checkpoint()?;
2271        Ok(())
2272    }
2273}
2274
2275impl CheckpointServiceNotify for CheckpointService {
2276    fn notify_checkpoint_signature(
2277        &self,
2278        epoch_store: &AuthorityPerEpochStore,
2279        info: &CheckpointSignatureMessage,
2280    ) -> IotaResult {
2281        let sequence = info.summary.sequence_number;
2282        let signer = info.summary.auth_sig().authority.concise();
2283
2284        if let Some(highest_verified_checkpoint) = self
2285            .tables
2286            .get_highest_verified_checkpoint()?
2287            .map(|x| *x.sequence_number())
2288        {
2289            if sequence <= highest_verified_checkpoint {
2290                debug!(
2291                    checkpoint_seq = sequence,
2292                    "Ignore checkpoint signature from {} - already certified", signer,
2293                );
2294                self.metrics
2295                    .last_ignored_checkpoint_signature_received
2296                    .set(sequence as i64);
2297                return Ok(());
2298            }
2299        }
2300        debug!(
2301            checkpoint_seq = sequence,
2302            "Received checkpoint signature, digest {} from {}",
2303            info.summary.digest(),
2304            signer,
2305        );
2306        self.metrics
2307            .last_received_checkpoint_signatures
2308            .with_label_values(&[&signer.to_string()])
2309            .set(sequence as i64);
2310        // While it can be tempting to make last_signature_index into AtomicU64, this
2311        // won't work We need to make sure we write to `pending_signatures` and
2312        // trigger `notify_aggregator` without race conditions
2313        let mut index = self.last_signature_index.lock();
2314        *index += 1;
2315        epoch_store.insert_checkpoint_signature(sequence, *index, info)?;
2316        self.notify_aggregator.notify_one();
2317        Ok(())
2318    }
2319
2320    fn notify_checkpoint(&self) -> IotaResult {
2321        self.notify_builder.notify_one();
2322        Ok(())
2323    }
2324}
2325
2326// test helper
2327pub struct CheckpointServiceNoop {}
2328impl CheckpointServiceNotify for CheckpointServiceNoop {
2329    fn notify_checkpoint_signature(
2330        &self,
2331        _: &AuthorityPerEpochStore,
2332        _: &CheckpointSignatureMessage,
2333    ) -> IotaResult {
2334        Ok(())
2335    }
2336
2337    fn notify_checkpoint(&self) -> IotaResult {
2338        Ok(())
2339    }
2340}
2341
2342#[cfg(test)]
2343mod tests {
2344    use std::{
2345        collections::{BTreeMap, HashMap},
2346        ops::Deref,
2347    };
2348
2349    use futures::{FutureExt as _, future::BoxFuture};
2350    use iota_macros::sim_test;
2351    use iota_protocol_config::{Chain, ProtocolConfig};
2352    use iota_types::{
2353        base_types::{ObjectID, SequenceNumber, TransactionEffectsDigest},
2354        crypto::Signature,
2355        digests::TransactionEventsDigest,
2356        effects::{TransactionEffects, TransactionEvents},
2357        messages_checkpoint::SignedCheckpointSummary,
2358        move_package::MovePackage,
2359        object,
2360        transaction::{GenesisObject, VerifiedTransaction},
2361    };
2362    use tokio::sync::mpsc;
2363
2364    use super::*;
2365    use crate::authority::test_authority_builder::TestAuthorityBuilder;
2366
2367    #[sim_test]
2368    pub async fn checkpoint_builder_test() {
2369        telemetry_subscribers::init_for_testing();
2370
2371        let mut protocol_config =
2372            ProtocolConfig::get_for_version(ProtocolVersion::max(), Chain::Unknown);
2373        protocol_config.set_min_checkpoint_interval_ms_for_testing(100);
2374        let state = TestAuthorityBuilder::new()
2375            .with_protocol_config(protocol_config)
2376            .build()
2377            .await;
2378
2379        let dummy_tx = VerifiedTransaction::new_genesis_transaction(vec![], vec![]);
2380        let dummy_tx_with_data = VerifiedTransaction::new_genesis_transaction(
2381            vec![GenesisObject::RawObject {
2382                data: object::Data::Package(
2383                    MovePackage::new(
2384                        ObjectID::random(),
2385                        SequenceNumber::new(),
2386                        BTreeMap::from([(format!("{:0>40000}", "1"), Vec::new())]),
2387                        100_000,
2388                        // no modules so empty type_origin_table as no types are defined in this
2389                        // package
2390                        Vec::new(),
2391                        // no modules so empty linkage_table as no dependencies of this package
2392                        // exist
2393                        BTreeMap::new(),
2394                    )
2395                    .unwrap(),
2396                ),
2397                owner: object::Owner::Immutable,
2398            }],
2399            vec![],
2400        );
2401        for i in 0..15 {
2402            state
2403                .database_for_testing()
2404                .perpetual_tables
2405                .transactions
2406                .insert(&d(i), dummy_tx.serializable_ref())
2407                .unwrap();
2408        }
2409        for i in 15..20 {
2410            state
2411                .database_for_testing()
2412                .perpetual_tables
2413                .transactions
2414                .insert(&d(i), dummy_tx_with_data.serializable_ref())
2415                .unwrap();
2416        }
2417
2418        let mut store = HashMap::<TransactionDigest, TransactionEffects>::new();
2419        commit_cert_for_test(
2420            &mut store,
2421            state.clone(),
2422            d(1),
2423            vec![d(2), d(3)],
2424            GasCostSummary::new(11, 11, 12, 11, 1),
2425        );
2426        commit_cert_for_test(
2427            &mut store,
2428            state.clone(),
2429            d(2),
2430            vec![d(3), d(4)],
2431            GasCostSummary::new(21, 21, 22, 21, 1),
2432        );
2433        commit_cert_for_test(
2434            &mut store,
2435            state.clone(),
2436            d(3),
2437            vec![],
2438            GasCostSummary::new(31, 31, 32, 31, 1),
2439        );
2440        commit_cert_for_test(
2441            &mut store,
2442            state.clone(),
2443            d(4),
2444            vec![],
2445            GasCostSummary::new(41, 41, 42, 41, 1),
2446        );
2447        for i in [5, 6, 7, 10, 11, 12, 13] {
2448            commit_cert_for_test(
2449                &mut store,
2450                state.clone(),
2451                d(i),
2452                vec![],
2453                GasCostSummary::new(41, 41, 42, 41, 1),
2454            );
2455        }
2456        for i in [15, 16, 17] {
2457            commit_cert_for_test(
2458                &mut store,
2459                state.clone(),
2460                d(i),
2461                vec![],
2462                GasCostSummary::new(51, 51, 52, 51, 1),
2463            );
2464        }
2465        let all_digests: Vec<_> = store.keys().copied().collect();
2466        for digest in all_digests {
2467            let signature = Signature::Ed25519IotaSignature(Default::default()).into();
2468            state
2469                .epoch_store_for_testing()
2470                .test_insert_user_signature(digest, vec![signature]);
2471        }
2472
2473        let (output, mut result) = mpsc::channel::<(CheckpointContents, CheckpointSummary)>(10);
2474        let (certified_output, mut certified_result) =
2475            mpsc::channel::<CertifiedCheckpointSummary>(10);
2476        let store = Arc::new(store);
2477
2478        let ckpt_dir = tempfile::tempdir().unwrap();
2479        let checkpoint_store = CheckpointStore::new(ckpt_dir.path());
2480        let epoch_store = state.epoch_store_for_testing();
2481
2482        let accumulator = Arc::new(StateAccumulator::new_for_tests(
2483            state.get_accumulator_store().clone(),
2484        ));
2485
2486        let (checkpoint_service, _tasks) = CheckpointService::spawn(
2487            state.clone(),
2488            checkpoint_store,
2489            epoch_store.clone(),
2490            store,
2491            Arc::downgrade(&accumulator),
2492            Box::new(output),
2493            Box::new(certified_output),
2494            CheckpointMetrics::new_for_tests(),
2495            3,
2496            100_000,
2497        );
2498
2499        checkpoint_service
2500            .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0))
2501            .unwrap();
2502        checkpoint_service
2503            .write_and_notify_checkpoint_for_testing(&epoch_store, p(1, vec![1, 3], 2000))
2504            .unwrap();
2505        checkpoint_service
2506            .write_and_notify_checkpoint_for_testing(&epoch_store, p(2, vec![10, 11, 12, 13], 3000))
2507            .unwrap();
2508        checkpoint_service
2509            .write_and_notify_checkpoint_for_testing(&epoch_store, p(3, vec![15, 16, 17], 4000))
2510            .unwrap();
2511        checkpoint_service
2512            .write_and_notify_checkpoint_for_testing(&epoch_store, p(4, vec![5], 4001))
2513            .unwrap();
2514        checkpoint_service
2515            .write_and_notify_checkpoint_for_testing(&epoch_store, p(5, vec![6], 5000))
2516            .unwrap();
2517
2518        let (c1c, c1s) = result.recv().await.unwrap();
2519        let (c2c, c2s) = result.recv().await.unwrap();
2520
2521        let c1t = c1c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2522        let c2t = c2c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2523        assert_eq!(c1t, vec![d(4)]);
2524        assert_eq!(c1s.previous_digest, None);
2525        assert_eq!(c1s.sequence_number, 0);
2526        assert_eq!(
2527            c1s.epoch_rolling_gas_cost_summary,
2528            GasCostSummary::new(41, 41, 42, 41, 1)
2529        );
2530
2531        assert_eq!(c2t, vec![d(3), d(2), d(1)]);
2532        assert_eq!(c2s.previous_digest, Some(c1s.digest()));
2533        assert_eq!(c2s.sequence_number, 1);
2534        assert_eq!(
2535            c2s.epoch_rolling_gas_cost_summary,
2536            GasCostSummary::new(104, 104, 108, 104, 4)
2537        );
2538
2539        // Pending at index 2 had 4 transactions, and we configured 3 transactions max.
2540        // Verify that we split into 2 checkpoints.
2541        let (c3c, c3s) = result.recv().await.unwrap();
2542        let c3t = c3c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2543        let (c4c, c4s) = result.recv().await.unwrap();
2544        let c4t = c4c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2545        assert_eq!(c3s.sequence_number, 2);
2546        assert_eq!(c3s.previous_digest, Some(c2s.digest()));
2547        assert_eq!(c4s.sequence_number, 3);
2548        assert_eq!(c4s.previous_digest, Some(c3s.digest()));
2549        assert_eq!(c3t, vec![d(10), d(11), d(12)]);
2550        assert_eq!(c4t, vec![d(13)]);
2551
2552        // Pending at index 3 had 3 transactions of 40K size, and we configured 100K
2553        // max. Verify that we split into 2 checkpoints.
2554        let (c5c, c5s) = result.recv().await.unwrap();
2555        let c5t = c5c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2556        let (c6c, c6s) = result.recv().await.unwrap();
2557        let c6t = c6c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2558        assert_eq!(c5s.sequence_number, 4);
2559        assert_eq!(c5s.previous_digest, Some(c4s.digest()));
2560        assert_eq!(c6s.sequence_number, 5);
2561        assert_eq!(c6s.previous_digest, Some(c5s.digest()));
2562        assert_eq!(c5t, vec![d(15), d(16)]);
2563        assert_eq!(c6t, vec![d(17)]);
2564
2565        // Pending at index 4 was too soon after the prior one and should be coalesced
2566        // into the next one.
2567        let (c7c, c7s) = result.recv().await.unwrap();
2568        let c7t = c7c.iter().map(|d| d.transaction).collect::<Vec<_>>();
2569        assert_eq!(c7t, vec![d(5), d(6)]);
2570        assert_eq!(c7s.previous_digest, Some(c6s.digest()));
2571        assert_eq!(c7s.sequence_number, 6);
2572
2573        let c1ss = SignedCheckpointSummary::new(c1s.epoch, c1s, state.secret.deref(), state.name);
2574        let c2ss = SignedCheckpointSummary::new(c2s.epoch, c2s, state.secret.deref(), state.name);
2575
2576        checkpoint_service
2577            .notify_checkpoint_signature(
2578                &epoch_store,
2579                &CheckpointSignatureMessage { summary: c2ss },
2580            )
2581            .unwrap();
2582        checkpoint_service
2583            .notify_checkpoint_signature(
2584                &epoch_store,
2585                &CheckpointSignatureMessage { summary: c1ss },
2586            )
2587            .unwrap();
2588
2589        let c1sc = certified_result.recv().await.unwrap();
2590        let c2sc = certified_result.recv().await.unwrap();
2591        assert_eq!(c1sc.sequence_number, 0);
2592        assert_eq!(c2sc.sequence_number, 1);
2593    }
2594
2595    impl TransactionCacheRead for HashMap<TransactionDigest, TransactionEffects> {
2596        fn notify_read_executed_effects(
2597            &self,
2598            digests: &[TransactionDigest],
2599        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffects>>> {
2600            std::future::ready(Ok(digests
2601                .iter()
2602                .map(|d| self.get(d).expect("effects not found").clone())
2603                .collect()))
2604            .boxed()
2605        }
2606
2607        fn notify_read_executed_effects_digests(
2608            &self,
2609            digests: &[TransactionDigest],
2610        ) -> BoxFuture<'_, IotaResult<Vec<TransactionEffectsDigest>>> {
2611            std::future::ready(Ok(digests
2612                .iter()
2613                .map(|d| {
2614                    self.get(d)
2615                        .map(|fx| fx.digest())
2616                        .expect("effects not found")
2617                })
2618                .collect()))
2619            .boxed()
2620        }
2621
2622        fn multi_get_executed_effects(
2623            &self,
2624            digests: &[TransactionDigest],
2625        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2626            Ok(digests.iter().map(|d| self.get(d).cloned()).collect())
2627        }
2628
2629        // Unimplemented methods - its unfortunate to have this big blob of useless
2630        // code, but it wasn't worth it to keep EffectsNotifyRead around just
2631        // for these tests, as it caused a ton of complication in non-test code.
2632        // (e.g. had to implement EFfectsNotifyRead for all ExecutionCacheRead
2633        // implementors).
2634
2635        fn multi_get_transaction_blocks(
2636            &self,
2637            _: &[TransactionDigest],
2638        ) -> IotaResult<Vec<Option<Arc<VerifiedTransaction>>>> {
2639            unimplemented!()
2640        }
2641
2642        fn multi_get_executed_effects_digests(
2643            &self,
2644            _: &[TransactionDigest],
2645        ) -> IotaResult<Vec<Option<TransactionEffectsDigest>>> {
2646            unimplemented!()
2647        }
2648
2649        fn multi_get_effects(
2650            &self,
2651            _: &[TransactionEffectsDigest],
2652        ) -> IotaResult<Vec<Option<TransactionEffects>>> {
2653            unimplemented!()
2654        }
2655
2656        fn multi_get_events(
2657            &self,
2658            _: &[TransactionEventsDigest],
2659        ) -> IotaResult<Vec<Option<TransactionEvents>>> {
2660            unimplemented!()
2661        }
2662    }
2663
2664    #[async_trait::async_trait]
2665    impl CheckpointOutput for mpsc::Sender<(CheckpointContents, CheckpointSummary)> {
2666        async fn checkpoint_created(
2667            &self,
2668            summary: &CheckpointSummary,
2669            contents: &CheckpointContents,
2670            _epoch_store: &Arc<AuthorityPerEpochStore>,
2671            _checkpoint_store: &Arc<CheckpointStore>,
2672        ) -> IotaResult {
2673            self.try_send((contents.clone(), summary.clone())).unwrap();
2674            Ok(())
2675        }
2676    }
2677
2678    #[async_trait::async_trait]
2679    impl CertifiedCheckpointOutput for mpsc::Sender<CertifiedCheckpointSummary> {
2680        async fn certified_checkpoint_created(
2681            &self,
2682            summary: &CertifiedCheckpointSummary,
2683        ) -> IotaResult {
2684            self.try_send(summary.clone()).unwrap();
2685            Ok(())
2686        }
2687    }
2688
2689    fn p(i: u64, t: Vec<u8>, timestamp_ms: u64) -> PendingCheckpoint {
2690        PendingCheckpoint::V1(PendingCheckpointContentsV1 {
2691            roots: t
2692                .into_iter()
2693                .map(|t| TransactionKey::Digest(d(t)))
2694                .collect(),
2695            details: PendingCheckpointInfo {
2696                timestamp_ms,
2697                last_of_epoch: false,
2698                checkpoint_height: i,
2699            },
2700        })
2701    }
2702
2703    fn d(i: u8) -> TransactionDigest {
2704        let mut bytes: [u8; 32] = Default::default();
2705        bytes[0] = i;
2706        TransactionDigest::new(bytes)
2707    }
2708
2709    fn e(
2710        transaction_digest: TransactionDigest,
2711        dependencies: Vec<TransactionDigest>,
2712        gas_used: GasCostSummary,
2713    ) -> TransactionEffects {
2714        let mut effects = TransactionEffects::default();
2715        *effects.transaction_digest_mut_for_testing() = transaction_digest;
2716        *effects.dependencies_mut_for_testing() = dependencies;
2717        *effects.gas_cost_summary_mut_for_testing() = gas_used;
2718        effects
2719    }
2720
2721    fn commit_cert_for_test(
2722        store: &mut HashMap<TransactionDigest, TransactionEffects>,
2723        state: Arc<AuthorityState>,
2724        digest: TransactionDigest,
2725        dependencies: Vec<TransactionDigest>,
2726        gas_used: GasCostSummary,
2727    ) {
2728        let epoch_store = state.epoch_store_for_testing();
2729        let effects = e(digest, dependencies, gas_used);
2730        store.insert(digest, effects.clone());
2731        epoch_store
2732            .insert_tx_key_and_digest(&TransactionKey::Digest(digest), &digest)
2733            .expect("Inserting cert fx and sigs should not fail");
2734    }
2735}