1use std::sync::Arc;
6
7use fastcrypto::hash::MultisetHash;
8use iota_metrics::monitored_scope;
9use iota_types::{
10 accumulator::Accumulator,
11 base_types::{ObjectID, SequenceNumber},
12 committee::EpochId,
13 digests::ObjectDigest,
14 effects::{TransactionEffects, TransactionEffectsAPI},
15 error::IotaResult,
16 in_memory_storage::InMemoryStorage,
17 messages_checkpoint::{CheckpointSequenceNumber, ECMHLiveObjectSetDigest},
18 storage::ObjectStore,
19};
20use prometheus::{IntGauge, Registry, register_int_gauge_with_registry};
21use serde::Serialize;
22use tracing::debug;
23
24use crate::authority::{
25 authority_per_epoch_store::AuthorityPerEpochStore, authority_store_tables::LiveObject,
26};
27
28pub struct StateAccumulatorMetrics {
29 inconsistent_state: IntGauge,
30}
31
32impl StateAccumulatorMetrics {
33 pub fn new(registry: &Registry) -> Arc<Self> {
34 let this = Self {
35 inconsistent_state: register_int_gauge_with_registry!(
36 "accumulator_inconsistent_state",
37 "1 if accumulated live object set differs from StateAccumulator root state hash for the previous epoch",
38 registry
39 )
40 .unwrap(),
41 };
42 Arc::new(this)
43 }
44}
45
46pub enum StateAccumulator {
47 V1(StateAccumulatorV1),
48}
49
50pub struct StateAccumulatorV1 {
51 store: Arc<dyn AccumulatorStore>,
52 metrics: Arc<StateAccumulatorMetrics>,
53}
54
55pub trait AccumulatorStore: ObjectStore + Send + Sync {
56 fn get_root_state_accumulator_for_epoch(
57 &self,
58 epoch: EpochId,
59 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>>;
60
61 fn get_root_state_accumulator_for_highest_epoch(
62 &self,
63 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>>;
64
65 fn insert_state_accumulator_for_epoch(
66 &self,
67 epoch: EpochId,
68 checkpoint_seq_num: &CheckpointSequenceNumber,
69 acc: &Accumulator,
70 ) -> IotaResult;
71
72 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_>;
73
74 fn iter_cached_live_object_set_for_testing(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
75 self.iter_live_object_set()
76 }
77}
78
79impl AccumulatorStore for InMemoryStorage {
80 fn get_root_state_accumulator_for_epoch(
81 &self,
82 _epoch: EpochId,
83 ) -> IotaResult<Option<(CheckpointSequenceNumber, Accumulator)>> {
84 unreachable!("not used for testing")
85 }
86
87 fn get_root_state_accumulator_for_highest_epoch(
88 &self,
89 ) -> IotaResult<Option<(EpochId, (CheckpointSequenceNumber, Accumulator))>> {
90 unreachable!("not used for testing")
91 }
92
93 fn insert_state_accumulator_for_epoch(
94 &self,
95 _epoch: EpochId,
96 _checkpoint_seq_num: &CheckpointSequenceNumber,
97 _acc: &Accumulator,
98 ) -> IotaResult {
99 unreachable!("not used for testing")
100 }
101
102 fn iter_live_object_set(&self) -> Box<dyn Iterator<Item = LiveObject> + '_> {
103 unreachable!("not used for testing")
104 }
105}
106
107#[derive(Serialize, Debug)]
111pub struct WrappedObject {
112 id: ObjectID,
113 wrapped_at: SequenceNumber,
114 digest: ObjectDigest,
115}
116
117impl WrappedObject {
118 pub fn new(id: ObjectID, wrapped_at: SequenceNumber) -> Self {
119 Self {
120 id,
121 wrapped_at,
122 digest: ObjectDigest::OBJECT_DIGEST_WRAPPED,
123 }
124 }
125}
126
127fn accumulate_effects(effects: Vec<TransactionEffects>) -> Accumulator {
128 let mut acc = Accumulator::default();
129
130 acc.insert_all(
132 effects
133 .iter()
134 .flat_map(|fx| {
135 fx.all_changed_objects()
136 .into_iter()
137 .map(|(object_ref, _, _)| object_ref.2)
138 })
139 .collect::<Vec<ObjectDigest>>(),
140 );
141
142 acc.remove_all(
144 effects
145 .iter()
146 .flat_map(|fx| {
147 fx.old_object_metadata()
148 .into_iter()
149 .map(|(object_ref, _owner)| object_ref.2)
150 })
151 .collect::<Vec<ObjectDigest>>(),
152 );
153
154 acc
155}
156
157impl StateAccumulator {
158 pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
159 StateAccumulator::V1(StateAccumulatorV1::new(store, metrics))
160 }
161
162 pub fn new_for_tests(store: Arc<dyn AccumulatorStore>) -> Self {
163 Self::new(store, StateAccumulatorMetrics::new(&Registry::new()))
164 }
165
166 pub fn metrics(&self) -> Arc<StateAccumulatorMetrics> {
167 match self {
168 StateAccumulator::V1(impl_v1) => impl_v1.metrics.clone(),
169 }
170 }
171
172 pub fn set_inconsistent_state(&self, is_inconsistent_state: bool) {
173 match self {
174 StateAccumulator::V1(impl_v1) => &impl_v1.metrics,
175 }
176 .inconsistent_state
177 .set(is_inconsistent_state as i64);
178 }
179
180 pub fn accumulate_checkpoint(
183 &self,
184 effects: Vec<TransactionEffects>,
185 checkpoint_seq_num: CheckpointSequenceNumber,
186 epoch_store: &Arc<AuthorityPerEpochStore>,
187 ) -> IotaResult<Accumulator> {
188 let _scope = monitored_scope("AccumulateCheckpoint");
189 if let Some(acc) = epoch_store.get_state_hash_for_checkpoint(&checkpoint_seq_num)? {
190 return Ok(acc);
191 }
192
193 let acc = self.accumulate_effects(effects);
194
195 epoch_store.insert_state_hash_for_checkpoint(&checkpoint_seq_num, &acc)?;
196 debug!("Accumulated checkpoint {}", checkpoint_seq_num);
197
198 epoch_store
199 .checkpoint_state_notify_read
200 .notify(&checkpoint_seq_num, &acc);
201
202 Ok(acc)
203 }
204
205 pub async fn accumulate_running_root(
206 &self,
207 epoch_store: &AuthorityPerEpochStore,
208 checkpoint_seq_num: CheckpointSequenceNumber,
209 checkpoint_acc: Option<Accumulator>,
210 ) -> IotaResult {
211 match self {
212 StateAccumulator::V1(impl_v1) => {
213 impl_v1
214 .accumulate_running_root(epoch_store, checkpoint_seq_num, checkpoint_acc)
215 .await
216 }
217 }
218 }
219
220 pub async fn accumulate_epoch(
221 &self,
222 epoch_store: Arc<AuthorityPerEpochStore>,
223 last_checkpoint_of_epoch: CheckpointSequenceNumber,
224 ) -> IotaResult<Accumulator> {
225 match self {
226 StateAccumulator::V1(impl_v1) => {
227 impl_v1.accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
228 }
229 }
230 }
231
232 pub fn accumulate_cached_live_object_set_for_testing(&self) -> Accumulator {
233 match self {
234 StateAccumulator::V1(impl_v1) => Self::accumulate_live_object_set_impl(
235 impl_v1.store.iter_cached_live_object_set_for_testing(),
236 ),
237 }
238 }
239
240 pub fn accumulate_live_object_set(&self) -> Accumulator {
243 match self {
244 StateAccumulator::V1(impl_v1) => {
245 Self::accumulate_live_object_set_impl(impl_v1.store.iter_live_object_set())
246 }
247 }
248 }
249
250 pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
253 match self {
254 StateAccumulator::V1(impl_v1) => impl_v1.accumulate_effects(effects),
255 }
256 }
257
258 fn accumulate_live_object_set_impl(iter: impl Iterator<Item = LiveObject>) -> Accumulator {
259 let mut acc = Accumulator::default();
260 iter.for_each(|live_object| {
261 Self::accumulate_live_object(&mut acc, &live_object);
262 });
263 acc
264 }
265
266 pub fn accumulate_live_object(acc: &mut Accumulator, live_object: &LiveObject) {
267 match live_object {
268 LiveObject::Normal(object) => {
269 acc.insert(object.compute_object_reference().2);
270 }
271 LiveObject::Wrapped(key) => {
272 acc.insert(
273 bcs::to_bytes(&WrappedObject::new(key.0, key.1))
274 .expect("Failed to serialize WrappedObject"),
275 );
276 }
277 }
278 }
279
280 pub fn digest_live_object_set(&self) -> ECMHLiveObjectSetDigest {
281 let acc = self.accumulate_live_object_set();
282 acc.digest().into()
283 }
284
285 pub async fn digest_epoch(
286 &self,
287 epoch_store: Arc<AuthorityPerEpochStore>,
288 last_checkpoint_of_epoch: CheckpointSequenceNumber,
289 ) -> IotaResult<ECMHLiveObjectSetDigest> {
290 Ok(self
291 .accumulate_epoch(epoch_store, last_checkpoint_of_epoch)
292 .await?
293 .digest()
294 .into())
295 }
296}
297
298impl StateAccumulatorV1 {
299 pub fn new(store: Arc<dyn AccumulatorStore>, metrics: Arc<StateAccumulatorMetrics>) -> Self {
300 Self { store, metrics }
301 }
302
303 pub async fn accumulate_running_root(
304 &self,
305 epoch_store: &AuthorityPerEpochStore,
306 checkpoint_seq_num: CheckpointSequenceNumber,
307 checkpoint_acc: Option<Accumulator>,
308 ) -> IotaResult {
309 let _scope = monitored_scope("AccumulateRunningRoot");
310 tracing::debug!(
311 "accumulating running root for checkpoint {}",
312 checkpoint_seq_num
313 );
314
315 if epoch_store
323 .get_running_root_accumulator(&checkpoint_seq_num)?
324 .is_some()
325 {
326 debug!(
327 "accumulate_running_root {:?} {:?} already exists",
328 epoch_store.epoch(),
329 checkpoint_seq_num
330 );
331 return Ok(());
332 }
333
334 let mut running_root = if checkpoint_seq_num == 0 {
335 Accumulator::default()
337 } else if epoch_store
338 .get_highest_running_root_accumulator()?
339 .is_none()
340 {
341 if let Some((prev_epoch, (last_checkpoint_prev_epoch, prev_acc))) =
346 self.store.get_root_state_accumulator_for_highest_epoch()?
347 {
348 if last_checkpoint_prev_epoch != checkpoint_seq_num - 1 {
349 epoch_store
350 .notify_read_running_root(checkpoint_seq_num - 1)
351 .await?
352 } else {
353 assert_eq!(
354 prev_epoch + 1,
355 epoch_store.epoch(),
356 "Expected highest existing root state hash to be for previous epoch",
357 );
358 prev_acc
359 }
360 } else {
361 assert_eq!(
365 epoch_store.epoch(),
366 0,
367 "Expected epoch to be 0 if previous root state hash does not exist"
368 );
369 epoch_store
370 .notify_read_running_root(checkpoint_seq_num - 1)
371 .await?
372 }
373 } else {
374 epoch_store
375 .notify_read_running_root(checkpoint_seq_num - 1)
376 .await?
377 };
378
379 let checkpoint_acc = checkpoint_acc.unwrap_or_else(|| {
380 epoch_store
381 .get_state_hash_for_checkpoint(&checkpoint_seq_num)
382 .expect("Failed to get checkpoint accumulator from disk")
383 .expect("Expected checkpoint accumulator to exist")
384 });
385 running_root.union(&checkpoint_acc);
386 epoch_store.insert_running_root_accumulator(&checkpoint_seq_num, &running_root)?;
387 debug!(
388 "Accumulated checkpoint {} to running root accumulator",
389 checkpoint_seq_num,
390 );
391 Ok(())
392 }
393
394 pub fn accumulate_epoch(
395 &self,
396 epoch_store: Arc<AuthorityPerEpochStore>,
397 last_checkpoint_of_epoch: CheckpointSequenceNumber,
398 ) -> IotaResult<Accumulator> {
399 let _scope = monitored_scope("AccumulateEpoch");
400 let running_root = epoch_store
401 .get_running_root_accumulator(&last_checkpoint_of_epoch)?
402 .expect("Expected running root accumulator to exist up to last checkpoint of epoch");
403
404 self.store.insert_state_accumulator_for_epoch(
405 epoch_store.epoch(),
406 &last_checkpoint_of_epoch,
407 &running_root,
408 )?;
409 debug!(
410 "Finalized root state hash for epoch {} (up to checkpoint {})",
411 epoch_store.epoch(),
412 last_checkpoint_of_epoch
413 );
414 Ok(running_root.clone())
415 }
416
417 pub fn accumulate_effects(&self, effects: Vec<TransactionEffects>) -> Accumulator {
418 accumulate_effects(effects)
419 }
420}