iota_core/
state_accumulator.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use fastcrypto::hash::MultisetHash;
8use iota_metrics::monitored_scope;
9use iota_types::{
10    accumulator::Accumulator,
11    base_types::{ObjectID, SequenceNumber},
12    committee::EpochId,
13    digests::ObjectDigest,
14    effects::{TransactionEffects, TransactionEffectsAPI},
15    error::IotaResult,
16    in_memory_storage::InMemoryStorage,
17    messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest},
18    storage::ObjectStore,
19};
20use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
21use serde::Serialize;
22use tracing::debug;
23
24use crate::authority::{
25    authority_per_epoch_store::AuthorityPerEpochStore, authority_store_tables::LiveObject,
26};
27
28pub struct StateAccumulatorMetrics {
29    inconsistent_state: IntGauge,
30}
31
32impl StateAccumulatorMetrics {
33    pub fn new(registry: &Registry) -> Arc<Self> {
34        let this = Self {
35            inconsistent_state: register_int_gauge_with_registry!(
36                "accumulator_inconsistent_state",
37                "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
38                registry
39            )
40            .unwrap(),
41        };
42        Arc::new(this)
43    }
44}
45
46pub enum StateAccumulator {
47    V1(StateAccumulatorV1),
48}
49
50pub struct StateAccumulatorV1 {
51    store: Arc<dyn AccumulatorStore>,
52    metrics: Arc<StateAccumulatorMetrics>,
53}
54
55pub trait AccumulatorStore: ObjectStore + Send + Sync {
56    fn get_root_state_accumulator_for_epoch(
57        &self,
58        epoch: EpochId,
59    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>>;
60
61    fn get_root_state_accumulator_for_highest_epoch(
62        &self,
63    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>>;
64
65    fn insert_state_accumulator_for_epoch(
66        &self,
67        epoch: EpochId,
68        checkpoint_seq_num: &CheckpointSequenceNumber,
69        acc: &Accumulator,
70    ) -> IotaResult;
71
72    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_>;
73
74    fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
75        self.iter_live_object_set()
76    }
77}
78
79impl AccumulatorStore for InMemoryStorage {
80    fn get_root_state_accumulator_for_epoch(
81        &self,
82        _epoch: EpochId,
83    ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
84        unreachable!("not used for testing")
85    }
86
87    fn get_root_state_accumulator_for_highest_epoch(
88        &self,
89    ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
90        unreachable!("not used for testing")
91    }
92
93    fn insert_state_accumulator_for_epoch(
94        &self,
95        _epoch: EpochId,
96        _checkpoint_seq_num: &CheckpointSequenceNumber,
97        _acc: &Accumulator,
98    ) -> IotaResult {
99        unreachable!("not used for testing")
100    }
101
102    fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
103        unreachable!("not used for testing")
104    }
105}
106
107/// Serializable representation of the ObjectRef of an
108/// object that has been wrapped
109/// TODO: This can be replaced with ObjectKey.
110#[derive(Serialize, Debug)]
111pub struct WrappedObject {
112    id: ObjectID,
113    wrapped_at: SequenceNumber,
114    digest: ObjectDigest,
115}
116
117impl WrappedObject {
118    pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
119        Self {
120            id,
121            wrapped_at,
122            digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
123        }
124    }
125}
126
127fn accumulate_effects(effects: Vec<TransactionEffects>) -> Accumulator {
128    let mut acc = Accumulator::default();
129
130    // process insertions to the set
131    acc.insert_all(
132        effects
133            .iter()
134            .flat_map(|fx| {
135                fx.all_changed_objects()
136                    .into_iter()
137                    .map(|(object_ref, _, _)| object_ref.2)
138            })
139            .collect::<Vec<ObjectDigest>>(),
140    );
141
142    // process modified objects to the set
143    acc.remove_all(
144        effects
145            .iter()
146            .flat_map(|fx| {
147                fx.old_object_metadata()
148                    .into_iter()
149                    .map(|(object_ref, _owner)| object_ref.2)
150            })
151            .collect::<Vec<ObjectDigest>>(),
152    );
153
154    acc
155}
156
157impl StateAccumulator {
158    pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
159        StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
160    }
161
162    pub fn new_for_tests(store: Arc<dyn AccumulatorStore>) -> Self {
163        Self::new(store, StateAccumulatorMetrics::new(&Registry::new()))
164    }
165
166    pub fn metrics(&self) -> Arc<StateAccumulatorMetrics> {
167        match self {
168            StateAccumulator::V1(impl_v1) => impl_v1.metrics.clone(),
169        }
170    }
171
172    pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
173        match self {
174            StateAccumulator::V1(impl_v1) => &impl_v1.metrics,
175        }
176        .inconsistent_state
177        .set(is_inconsistent_state as i64);
178    }
179
180    /// Accumulates the effects of a single checkpoint and persists the
181    /// accumulator.
182    pub fn accumulate_checkpoint(
183        &self,
184        effects: Vec<TransactionEffects>,
185        checkpoint_seq_num: CheckpointSequenceNumber,
186        epoch_store: &Arc<AuthorityPerEpochStore>,
187    ) -> IotaResult<Accumulator> {
188        let _scope = monitored_scope("AccumulateCheckpoint");
189        if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
190            return Ok(acc);
191        }
192
193        let acc = self.accumulate_effects(effects);
194
195        epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
196        debug!("Accumulated checkpoint {}", checkpoint_seq_num);
197
198        epoch_store
199            .checkpoint_state_notify_read
200            .notify(&checkpoint_seq_num, &acc);
201
202        Ok(acc)
203    }
204
205    pub async fn accumulate_running_root(
206        &self,
207        epoch_store: &AuthorityPerEpochStore,
208        checkpoint_seq_num: CheckpointSequenceNumber,
209        checkpoint_acc: Option<Accumulator>,
210    ) -> IotaResult {
211        match self {
212            StateAccumulator::V1(impl_v1) => {
213                impl_v1
214                    .accumulate_running_root(epoch_store, checkpoint_seq_num, checkpoint_acc)
215                    .await
216            }
217        }
218    }
219
220    pub async fn accumulate_epoch(
221        &self,
222        epoch_store: Arc<AuthorityPerEpochStore>,
223        last_checkpoint_of_epoch: CheckpointSequenceNumber,
224    ) -> IotaResult<Accumulator> {
225        match self {
226            StateAccumulator::V1(impl_v1) => {
227                impl_v1.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
228            }
229        }
230    }
231
232    pub fn accumulate_cached_live_object_set_for_testing(&self) -> Accumulator {
233        match self {
234            StateAccumulator::V1(impl_v1) => Self::accumulate_live_object_set_impl(
235                impl_v1.store.iter_cached_live_object_set_for_testing(),
236            ),
237        }
238    }
239
240    /// Returns the result of accumulating the live object set, without side
241    /// effects
242    pub fn accumulate_live_object_set(&self) -> Accumulator {
243        match self {
244            StateAccumulator::V1(impl_v1) => {
245                Self::accumulate_live_object_set_impl(impl_v1.store.iter_live_object_set())
246            }
247        }
248    }
249
250    /// Accumulates given effects and returns the accumulator without side
251    /// effects.
252    pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
253        match self {
254            StateAccumulator::V1(impl_v1) => impl_v1.accumulate_effects(effects),
255        }
256    }
257
258    fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> Accumulator {
259        let mut acc = Accumulator::default();
260        iter.for_each(|live_object| {
261            Self::accumulate_live_object(&mut acc, &live_object);
262        });
263        acc
264    }
265
266    pub fn accumulate_live_object(acc: &mut Accumulator, live_object: &LiveObject) {
267        match live_object {
268            LiveObject::Normal(object) => {
269                acc.insert(object.compute_object_reference().2);
270            }
271            LiveObject::Wrapped(key) => {
272                acc.insert(
273                    bcs::to_bytes(&WrappedObject::new(key.0, key.1))
274                        .expect("Failed to serialize WrappedObject"),
275                );
276            }
277        }
278    }
279
280    pub fn digest_live_object_set(&self) -> ECMHLiveObjectSetDigest {
281        let acc = self.accumulate_live_object_set();
282        acc.digest().into()
283    }
284
285    pub async fn digest_epoch(
286        &self,
287        epoch_store: Arc<AuthorityPerEpochStore>,
288        last_checkpoint_of_epoch: CheckpointSequenceNumber,
289    ) -> IotaResult<ECMHLiveObjectSetDigest> {
290        Ok(self
291            .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
292            .await?
293            .digest()
294            .into())
295    }
296}
297
298impl StateAccumulatorV1 {
299    pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
300        Self { store, metrics }
301    }
302
303    pub async fn accumulate_running_root(
304        &self,
305        epoch_store: &AuthorityPerEpochStore,
306        checkpoint_seq_num: CheckpointSequenceNumber,
307        checkpoint_acc: Option<Accumulator>,
308    ) -> IotaResult {
309        let _scope = monitored_scope("AccumulateRunningRoot");
310        tracing::debug!(
311            "accumulating running root for checkpoint {}",
312            checkpoint_seq_num
313        );
314
315        // For the last checkpoint of the epoch, this function will be called once by
316        // the checkpoint builder, and again by checkpoint executor.
317        //
318        // Normally this is fine, since the notify_read_running_root(checkpoint_seq_num
319        // - 1) will work normally. But if there is only one checkpoint in the
320        // epoch, that call will hang forever, since the previous checkpoint
321        // belongs to the previous epoch.
322        if epoch_store
323            .get_running_root_accumulator(&checkpoint_seq_num)?
324            .is_some()
325        {
326            debug!(
327                "accumulate_running_root {:?} {:?} already exists",
328                epoch_store.epoch(),
329                checkpoint_seq_num
330            );
331            return Ok(());
332        }
333
334        let mut running_root = if checkpoint_seq_num == 0 {
335            // we're at genesis and need to start from scratch
336            Accumulator::default()
337        } else if epoch_store
338            .get_highest_running_root_accumulator()?
339            .is_none()
340        {
341            // we're at the beginning of a new epoch and need to
342            // bootstrap from the previous epoch's root state hash. Because this
343            // should only occur at beginning of epoch, we shouldn't have to worry
344            // about race conditions on reading the highest running root accumulator.
345            if let Some((prev_epoch, (last_checkpoint_prev_epoch, prev_acc))) =
346                self.store.get_root_state_accumulator_for_highest_epoch()?
347            {
348                if last_checkpoint_prev_epoch != checkpoint_seq_num - 1 {
349                    epoch_store
350                        .notify_read_running_root(checkpoint_seq_num - 1)
351                        .await?
352                } else {
353                    assert_eq!(
354                        prev_epoch + 1,
355                        epoch_store.epoch(),
356                        "Expected highest existing root state hash to be for previous epoch",
357                    );
358                    prev_acc
359                }
360            } else {
361                // Rare edge case where we manage to somehow lag in checkpoint execution from
362                // genesis such that the end of epoch checkpoint is built before
363                // we execute any checkpoints.
364                assert_eq!(
365                    epoch_store.epoch(),
366                    0,
367                    "Expected epoch to be 0 if previous root state hash does not exist"
368                );
369                epoch_store
370                    .notify_read_running_root(checkpoint_seq_num - 1)
371                    .await?
372            }
373        } else {
374            epoch_store
375                .notify_read_running_root(checkpoint_seq_num - 1)
376                .await?
377        };
378
379        let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
380            epoch_store
381                .get_state_hash_for_checkpoint(&checkpoint_seq_num)
382                .expect("Failed to get checkpoint accumulator from disk")
383                .expect("Expected checkpoint accumulator to exist")
384        });
385        running_root.union(&checkpoint_acc);
386        epoch_store.insert_running_root_accumulator(&checkpoint_seq_num, &running_root)?;
387        debug!(
388            "Accumulated checkpoint {} to running root accumulator",
389            checkpoint_seq_num,
390        );
391        Ok(())
392    }
393
394    pub fn accumulate_epoch(
395        &self,
396        epoch_store: Arc<AuthorityPerEpochStore>,
397        last_checkpoint_of_epoch: CheckpointSequenceNumber,
398    ) -> IotaResult<Accumulator> {
399        let _scope = monitored_scope("AccumulateEpoch");
400        let running_root = epoch_store
401            .get_running_root_accumulator(&last_checkpoint_of_epoch)?
402            .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
403
404        self.store.insert_state_accumulator_for_epoch(
405            epoch_store.epoch(),
406            &last_checkpoint_of_epoch,
407            &running_root,
408        )?;
409        debug!(
410            "Finalized root state hash for epoch {} (up to checkpoint {})",
411            epoch_store.epoch(),
412            last_checkpoint_of_epoch
413        );
414        Ok(running_root.clone())
415    }
416
417    pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
418        accumulate_effects(effects)
419    }
420}