iota_core/
state_accumulator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use fastcrypto::hash::MultisetHash;
8use iota_common::fatal;
9use iota_metrics::monitored_scope;
10use iota_types::{
11    accumulator::Accumulator,
12    base_types::{ObjectID, SequenceNumber},
13    committee::EpochId,
14    digests::ObjectDigest,
15    effects::{TransactionEffects, TransactionEffectsAPI},
16    error::IotaResult,
17    in_memory_storage::InMemoryStorage,
18    messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest},
19    storage::ObjectStore,
20};
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use serde::Serialize;
23use tracing::debug;
24
25use crate::authority::{
26    authority_per_epoch_store::AuthorityPerEpochStore, authority_store_tables::LiveObject,
27};
28
29pub struct StateAccumulatorMetrics {
30    inconsistent_state: IntGauge,
31}
32
33impl StateAccumulatorMetrics {
34    pub fn new(registry: &Registry) -> Arc<Self> {
35        let this = Self {
36            inconsistent_state: register_int_gauge_with_registry!(
37                "accumulator_inconsistent_state",
38                "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
39                registry
40            )
41            .unwrap(),
42        };
43        Arc::new(this)
44    }
45}
46
47pub struct StateAccumulator {
48    store: Arc<dyn AccumulatorStore>,
49    metrics: Arc<StateAccumulatorMetrics>,
50}
51
52pub trait AccumulatorStore: ObjectStore + Send + Sync {
53    fn get_root_state_accumulator_for_epoch(
54        &self,
55        epoch: EpochId,
56    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>>;
57
58    fn get_root_state_accumulator_for_highest_epoch(
59        &self,
60    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>>;
61
62    fn insert_state_accumulator_for_epoch(
63        &self,
64        epoch: EpochId,
65        checkpoint_seq_num: &CheckpointSequenceNumber,
66        acc: &Accumulator,
67    ) -> IotaResult;
68
69    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_>;
70
71    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
72        self.iter_live_object_set()
73    }
74}
75
76impl AccumulatorStore for InMemoryStorage {
77    fn get_root_state_accumulator_for_epoch(
78        &self,
79        _epoch: EpochId,
80    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
81        unreachable!("not used for testing")
82    }
83
84    fn get_root_state_accumulator_for_highest_epoch(
85        &self,
86    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
87        unreachable!("not used for testing")
88    }
89
90    fn insert_state_accumulator_for_epoch(
91        &self,
92        _epoch: EpochId,
93        _checkpoint_seq_num: &CheckpointSequenceNumber,
94        _acc: &Accumulator,
95    ) -> IotaResult {
96        unreachable!("not used for testing")
97    }
98
99    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
100        unreachable!("not used for testing")
101    }
102}
103
104/// Serializable representation of the ObjectRef of an
105/// object that has been wrapped
106/// TODO: This can be replaced with ObjectKey.
107#[derive(Serialize, Debug)]
108pub struct WrappedObject {
109    id: ObjectID,
110    wrapped_at: SequenceNumber,
111    digest: ObjectDigest,
112}
113
114impl WrappedObject {
115    pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
116        Self {
117            id,
118            wrapped_at,
119            digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
120        }
121    }
122}
123
124fn accumulate_effects(effects: &[TransactionEffects]) -> Accumulator {
125    let mut acc = Accumulator::default();
126
127    // process insertions to the set
128    acc.insert_all(
129        effects
130            .iter()
131            .flat_map(|fx| {
132                fx.all_changed_objects()
133                    .into_iter()
134                    .map(|(object_ref, _, _)| object_ref.2)
135            })
136            .collect::<Vec<ObjectDigest>>(),
137    );
138
139    // process modified objects to the set
140    acc.remove_all(
141        effects
142            .iter()
143            .flat_map(|fx| {
144                fx.old_object_metadata()
145                    .into_iter()
146                    .map(|(object_ref, _owner)| object_ref.2)
147            })
148            .collect::<Vec<ObjectDigest>>(),
149    );
150
151    acc
152}
153
154impl StateAccumulator {
155    pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
156        Self { store, metrics }
157    }
158
159    pub fn new_for_tests(store: Arc<dyn AccumulatorStore>) -> Self {
160        Self::new(store, StateAccumulatorMetrics::new(&Registry::new()))
161    }
162
163    pub fn metrics(&self) -> Arc<StateAccumulatorMetrics> {
164        self.metrics.clone()
165    }
166
167    pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
168        self.metrics
169            .inconsistent_state
170            .set(is_inconsistent_state as i64);
171    }
172
173    /// Accumulates the effects of a single checkpoint and persists the
174    /// accumulator.
175    pub fn accumulate_checkpoint(
176        &self,
177        effects: &[TransactionEffects],
178        checkpoint_seq_num: CheckpointSequenceNumber,
179        epoch_store: &AuthorityPerEpochStore,
180    ) -> IotaResult<Accumulator> {
181        let _scope = monitored_scope("AccumulateCheckpoint");
182        if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
183            return Ok(acc);
184        }
185
186        let acc = self.accumulate_effects(effects);
187
188        epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
189        debug!("Accumulated checkpoint {}", checkpoint_seq_num);
190
191        epoch_store
192            .checkpoint_state_notify_read
193            .notify(&checkpoint_seq_num, &acc);
194
195        Ok(acc)
196    }
197
198    pub fn accumulate_cached_live_object_set_for_testing(&self) -> Accumulator {
199        Self::accumulate_live_object_set_impl(self.store.iter_cached_live_object_set_for_testing())
200    }
201
202    /// Returns the result of accumulating the live object set, without side
203    /// effects
204    pub fn accumulate_live_object_set(&self) -> Accumulator {
205        Self::accumulate_live_object_set_impl(self.store.iter_live_object_set())
206    }
207
208    fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> Accumulator {
209        let mut acc = Accumulator::default();
210        iter.for_each(|live_object| {
211            Self::accumulate_live_object(&mut acc, &live_object);
212        });
213        acc
214    }
215
216    pub fn accumulate_live_object(acc: &mut Accumulator, live_object: &LiveObject) {
217        match live_object {
218            LiveObject::Normal(object) => {
219                acc.insert(object.compute_object_reference().2);
220            }
221            LiveObject::Wrapped(key) => {
222                acc.insert(
223                    bcs::to_bytes(&WrappedObject::new(key.0, key.1))
224                        .expect("Failed to serialize WrappedObject"),
225                );
226            }
227        }
228    }
229
230    pub fn digest_live_object_set(&self) -> ECMHLiveObjectSetDigest {
231        let acc = self.accumulate_live_object_set();
232        acc.digest().into()
233    }
234
235    pub async fn digest_epoch(
236        &self,
237        epoch_store: Arc<AuthorityPerEpochStore>,
238        last_checkpoint_of_epoch: CheckpointSequenceNumber,
239    ) -> IotaResult<ECMHLiveObjectSetDigest> {
240        Ok(self
241            .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)?
242            .digest()
243            .into())
244    }
245
246    pub async fn wait_for_previous_running_root(
247        &self,
248        epoch_store: &AuthorityPerEpochStore,
249        checkpoint_seq_num: CheckpointSequenceNumber,
250    ) -> IotaResult {
251        assert!(checkpoint_seq_num > 0);
252
253        // Check if this is the first checkpoint of the new epoch, in which case
254        // there is nothing to wait for.
255        if self
256            .store
257            .get_root_state_accumulator_for_highest_epoch()?
258            .map(|(_, (last_checkpoint_prev_epoch, _))| last_checkpoint_prev_epoch)
259            == Some(checkpoint_seq_num - 1)
260        {
261            return Ok(());
262        }
263
264        // There is an edge case here where checkpoint_seq_num is 1. This means the
265        // previous checkpoint is the genesis checkpoint. CheckpointExecutor is
266        // guaranteed to execute and accumulate the genesis checkpoint, so this
267        // will resolve.
268        epoch_store
269            .notify_read_running_root(checkpoint_seq_num - 1)
270            .await?;
271        Ok(())
272    }
273
274    fn get_prior_root(
275        &self,
276        epoch_store: &AuthorityPerEpochStore,
277        checkpoint_seq_num: CheckpointSequenceNumber,
278    ) -> IotaResult<Accumulator> {
279        if checkpoint_seq_num == 0 {
280            return Ok(Accumulator::default());
281        }
282
283        if let Some((prev_epoch, (last_checkpoint_prev_epoch, prev_acc))) =
284            self.store.get_root_state_accumulator_for_highest_epoch()?
285        {
286            assert_eq!(prev_epoch + 1, epoch_store.epoch());
287            if last_checkpoint_prev_epoch == checkpoint_seq_num - 1 {
288                return Ok(prev_acc);
289            }
290        }
291
292        let Some(prior_running_root) =
293            epoch_store.get_running_root_accumulator(checkpoint_seq_num - 1)?
294        else {
295            fatal!(
296                "Running root accumulator must exist for checkpoint {}",
297                checkpoint_seq_num - 1
298            );
299        };
300
301        Ok(prior_running_root)
302    }
303
304    // Accumulate the running root.
305    // The previous checkpoint must be accumulated before calling this function, or
306    // it will panic.
307    pub fn accumulate_running_root(
308        &self,
309        epoch_store: &AuthorityPerEpochStore,
310        checkpoint_seq_num: CheckpointSequenceNumber,
311        checkpoint_acc: Option<Accumulator>,
312    ) -> IotaResult {
313        let _scope = monitored_scope("AccumulateRunningRoot");
314        tracing::debug!(
315            "accumulating running root for checkpoint {}",
316            checkpoint_seq_num
317        );
318
319        // Idempotency.
320        if epoch_store
321            .get_running_root_accumulator(checkpoint_seq_num)?
322            .is_some()
323        {
324            debug!(
325                "accumulate_running_root {:?} {:?} already exists",
326                epoch_store.epoch(),
327                checkpoint_seq_num
328            );
329            return Ok(());
330        }
331
332        let mut running_root = self.get_prior_root(epoch_store, checkpoint_seq_num)?;
333
334        let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
335            epoch_store
336                .get_state_hash_for_checkpoint(&checkpoint_seq_num)
337                .expect("Failed to get checkpoint accumulator from disk")
338                .expect("Expected checkpoint accumulator to exist")
339        });
340        running_root.union(&checkpoint_acc);
341        epoch_store.insert_running_root_accumulator(&checkpoint_seq_num, &running_root)?;
342        debug!(
343            "Accumulated checkpoint {} to running root accumulator",
344            checkpoint_seq_num,
345        );
346        Ok(())
347    }
348
349    pub fn accumulate_epoch(
350        &self,
351        epoch_store: Arc<AuthorityPerEpochStore>,
352        last_checkpoint_of_epoch: CheckpointSequenceNumber,
353    ) -> IotaResult<Accumulator> {
354        let _scope = monitored_scope("AccumulateEpoch");
355        let running_root = epoch_store
356            .get_running_root_accumulator(last_checkpoint_of_epoch)?
357            .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
358
359        self.store.insert_state_accumulator_for_epoch(
360            epoch_store.epoch(),
361            &last_checkpoint_of_epoch,
362            &running_root,
363        )?;
364        debug!(
365            "Finalized root state hash for epoch {} (up to checkpoint {})",
366            epoch_store.epoch(),
367            last_checkpoint_of_epoch
368        );
369        Ok(running_root.clone())
370    }
371
372    pub fn accumulate_effects(&self, effects: &[TransactionEffects]) -> Accumulator {
373        accumulate_effects(effects)
374    }
375}