1use std::sync::Arc;
6
7use fastcrypto::hash::MultisetHash;
8use iota_common::fatal;
9use iota_metrics::monitored_scope;
10use iota_types::{
11 accumulator::Accumulator,
12 base_types::{ObjectID, SequenceNumber},
13 committee::EpochId,
14 digests::ObjectDigest,
15 effects::{TransactionEffects, TransactionEffectsAPI},
16 error::IotaResult,
17 in_memory_storage::InMemoryStorage,
18 messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest},
19 storage::ObjectStore,
20};
21use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
22use serde::Serialize;
23use tracing::debug;
24
25use crate::authority::{
26 authority_per_epoch_store::AuthorityPerEpochStore, authority_store_tables::LiveObject,
27};
28
29pub struct StateAccumulatorMetrics {
30 inconsistent_state: IntGauge,
31}
32
33impl StateAccumulatorMetrics {
34 pub fn new(registry: &Registry) -> Arc<Self> {
35 let this = Self {
36 inconsistent_state: register_int_gauge_with_registry!(
37 "accumulator_inconsistent_state",
38 "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
39 registry
40 )
41 .unwrap(),
42 };
43 Arc::new(this)
44 }
45}
46
47pub struct StateAccumulator {
48 store: Arc<dyn AccumulatorStore>,
49 metrics: Arc<StateAccumulatorMetrics>,
50}
51
52pub trait AccumulatorStore: ObjectStore + Send + Sync {
53 fn get_root_state_accumulator_for_epoch(
54 &self,
55 epoch: EpochId,
56 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>>;
57
58 fn get_root_state_accumulator_for_highest_epoch(
59 &self,
60 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>>;
61
62 fn insert_state_accumulator_for_epoch(
63 &self,
64 epoch: EpochId,
65 checkpoint_seq_num: &CheckpointSequenceNumber,
66 acc: &Accumulator,
67 ) -> IotaResult;
68
69 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_>;
70
71 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
72 self.iter_live_object_set()
73 }
74}
75
76impl AccumulatorStore for InMemoryStorage {
77 fn get_root_state_accumulator_for_epoch(
78 &self,
79 _epoch: EpochId,
80 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
81 unreachable!("not used for testing")
82 }
83
84 fn get_root_state_accumulator_for_highest_epoch(
85 &self,
86 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
87 unreachable!("not used for testing")
88 }
89
90 fn insert_state_accumulator_for_epoch(
91 &self,
92 _epoch: EpochId,
93 _checkpoint_seq_num: &CheckpointSequenceNumber,
94 _acc: &Accumulator,
95 ) -> IotaResult {
96 unreachable!("not used for testing")
97 }
98
99 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
100 unreachable!("not used for testing")
101 }
102}
103
104#[derive(Serialize, Debug)]
108pub struct WrappedObject {
109 id: ObjectID,
110 wrapped_at: SequenceNumber,
111 digest: ObjectDigest,
112}
113
114impl WrappedObject {
115 pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
116 Self {
117 id,
118 wrapped_at,
119 digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
120 }
121 }
122}
123
124fn accumulate_effects(effects: &[TransactionEffects]) -> Accumulator {
125 let mut acc = Accumulator::default();
126
127 acc.insert_all(
129 effects
130 .iter()
131 .flat_map(|fx| {
132 fx.all_changed_objects()
133 .into_iter()
134 .map(|(object_ref, _, _)| object_ref.2)
135 })
136 .collect::<Vec<ObjectDigest>>(),
137 );
138
139 acc.remove_all(
141 effects
142 .iter()
143 .flat_map(|fx| {
144 fx.old_object_metadata()
145 .into_iter()
146 .map(|(object_ref, _owner)| object_ref.2)
147 })
148 .collect::<Vec<ObjectDigest>>(),
149 );
150
151 acc
152}
153
154impl StateAccumulator {
155 pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
156 Self { store, metrics }
157 }
158
159 pub fn new_for_tests(store: Arc<dyn AccumulatorStore>) -> Self {
160 Self::new(store, StateAccumulatorMetrics::new(&Registry::new()))
161 }
162
163 pub fn metrics(&self) -> Arc<StateAccumulatorMetrics> {
164 self.metrics.clone()
165 }
166
167 pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
168 self.metrics
169 .inconsistent_state
170 .set(is_inconsistent_state as i64);
171 }
172
173 pub fn accumulate_checkpoint(
176 &self,
177 effects: &[TransactionEffects],
178 checkpoint_seq_num: CheckpointSequenceNumber,
179 epoch_store: &AuthorityPerEpochStore,
180 ) -> IotaResult<Accumulator> {
181 let _scope = monitored_scope("AccumulateCheckpoint");
182 if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
183 return Ok(acc);
184 }
185
186 let acc = self.accumulate_effects(effects);
187
188 epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
189 debug!("Accumulated checkpoint {}", checkpoint_seq_num);
190
191 epoch_store
192 .checkpoint_state_notify_read
193 .notify(&checkpoint_seq_num, &acc);
194
195 Ok(acc)
196 }
197
198 pub fn accumulate_cached_live_object_set_for_testing(&self) -> Accumulator {
199 Self::accumulate_live_object_set_impl(self.store.iter_cached_live_object_set_for_testing())
200 }
201
202 pub fn accumulate_live_object_set(&self) -> Accumulator {
205 Self::accumulate_live_object_set_impl(self.store.iter_live_object_set())
206 }
207
208 fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> Accumulator {
209 let mut acc = Accumulator::default();
210 iter.for_each(|live_object| {
211 Self::accumulate_live_object(&mut acc, &live_object);
212 });
213 acc
214 }
215
216 pub fn accumulate_live_object(acc: &mut Accumulator, live_object: &LiveObject) {
217 match live_object {
218 LiveObject::Normal(object) => {
219 acc.insert(object.compute_object_reference().2);
220 }
221 LiveObject::Wrapped(key) => {
222 acc.insert(
223 bcs::to_bytes(&WrappedObject::new(key.0, key.1))
224 .expect("Failed to serialize WrappedObject"),
225 );
226 }
227 }
228 }
229
230 pub fn digest_live_object_set(&self) -> ECMHLiveObjectSetDigest {
231 let acc = self.accumulate_live_object_set();
232 acc.digest().into()
233 }
234
235 pub async fn digest_epoch(
236 &self,
237 epoch_store: Arc<AuthorityPerEpochStore>,
238 last_checkpoint_of_epoch: CheckpointSequenceNumber,
239 ) -> IotaResult<ECMHLiveObjectSetDigest> {
240 Ok(self
241 .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)?
242 .digest()
243 .into())
244 }
245
246 pub async fn wait_for_previous_running_root(
247 &self,
248 epoch_store: &AuthorityPerEpochStore,
249 checkpoint_seq_num: CheckpointSequenceNumber,
250 ) -> IotaResult {
251 assert!(checkpoint_seq_num > 0);
252
253 if self
256 .store
257 .get_root_state_accumulator_for_highest_epoch()?
258 .map(|(_, (last_checkpoint_prev_epoch, _))| last_checkpoint_prev_epoch)
259 == Some(checkpoint_seq_num - 1)
260 {
261 return Ok(());
262 }
263
264 epoch_store
269 .notify_read_running_root(checkpoint_seq_num - 1)
270 .await?;
271 Ok(())
272 }
273
274 fn get_prior_root(
275 &self,
276 epoch_store: &AuthorityPerEpochStore,
277 checkpoint_seq_num: CheckpointSequenceNumber,
278 ) -> IotaResult<Accumulator> {
279 if checkpoint_seq_num == 0 {
280 return Ok(Accumulator::default());
281 }
282
283 if let Some((prev_epoch, (last_checkpoint_prev_epoch, prev_acc))) =
284 self.store.get_root_state_accumulator_for_highest_epoch()?
285 {
286 assert_eq!(prev_epoch + 1, epoch_store.epoch());
287 if last_checkpoint_prev_epoch == checkpoint_seq_num - 1 {
288 return Ok(prev_acc);
289 }
290 }
291
292 let Some(prior_running_root) =
293 epoch_store.get_running_root_accumulator(checkpoint_seq_num - 1)?
294 else {
295 fatal!(
296 "Running root accumulator must exist for checkpoint {}",
297 checkpoint_seq_num - 1
298 );
299 };
300
301 Ok(prior_running_root)
302 }
303
304 pub fn accumulate_running_root(
308 &self,
309 epoch_store: &AuthorityPerEpochStore,
310 checkpoint_seq_num: CheckpointSequenceNumber,
311 checkpoint_acc: Option<Accumulator>,
312 ) -> IotaResult {
313 let _scope = monitored_scope("AccumulateRunningRoot");
314 tracing::debug!(
315 "accumulating running root for checkpoint {}",
316 checkpoint_seq_num
317 );
318
319 if epoch_store
321 .get_running_root_accumulator(checkpoint_seq_num)?
322 .is_some()
323 {
324 debug!(
325 "accumulate_running_root {:?} {:?} already exists",
326 epoch_store.epoch(),
327 checkpoint_seq_num
328 );
329 return Ok(());
330 }
331
332 let mut running_root = self.get_prior_root(epoch_store, checkpoint_seq_num)?;
333
334 let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
335 epoch_store
336 .get_state_hash_for_checkpoint(&checkpoint_seq_num)
337 .expect("Failed to get checkpoint accumulator from disk")
338 .expect("Expected checkpoint accumulator to exist")
339 });
340 running_root.union(&checkpoint_acc);
341 epoch_store.insert_running_root_accumulator(&checkpoint_seq_num, &running_root)?;
342 debug!(
343 "Accumulated checkpoint {} to running root accumulator",
344 checkpoint_seq_num,
345 );
346 Ok(())
347 }
348
349 pub fn accumulate_epoch(
350 &self,
351 epoch_store: Arc<AuthorityPerEpochStore>,
352 last_checkpoint_of_epoch: CheckpointSequenceNumber,
353 ) -> IotaResult<Accumulator> {
354 let _scope = monitored_scope("AccumulateEpoch");
355 let running_root = epoch_store
356 .get_running_root_accumulator(last_checkpoint_of_epoch)?
357 .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
358
359 self.store.insert_state_accumulator_for_epoch(
360 epoch_store.epoch(),
361 &last_checkpoint_of_epoch,
362 &running_root,
363 )?;
364 debug!(
365 "Finalized root state hash for epoch {} (up to checkpoint {})",
366 epoch_store.epoch(),
367 last_checkpoint_of_epoch
368 );
369 Ok(running_root.clone())
370 }
371
372 pub fn accumulate_effects(&self, effects: &[TransactionEffects]) -> Accumulator {
373 accumulate_effects(effects)
374 }
375}