iota_core/checkpoints/
causal_order.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use iota_types::{
8    base_types::TransactionDigest,
9    effects::{InputSharedObject, TransactionEffects, TransactionEffectsAPI},
10    storage::ObjectKey,
11};
12use tracing::trace;
13
14pub struct CausalOrder {
15    not_seen: BTreeMap<TransactionDigest, TransactionDependencies>,
16    output: Vec<TransactionEffects>,
17}
18
19impl CausalOrder {
20    /// Causally sort given vector of effects
21    ///
22    /// Returned list has effects that
23    ///
24    /// (a) Causally sorted
25    /// (b) Have deterministic order between transactions that are not causally
26    /// dependent
27    ///
28    /// The order of result list does not depend on order of effects in the
29    /// supplied vector
30    pub fn causal_sort(effects: Vec<TransactionEffects>) -> Vec<TransactionEffects> {
31        let mut this = Self::from_vec(effects);
32        while let Some(item) = this.pop_first() {
33            this.insert(item);
34        }
35        this.into_list()
36    }
37
38    fn from_vec(effects: Vec<TransactionEffects>) -> Self {
39        let rwlock_builder = RWLockDependencyBuilder::from_effects(&effects);
40        let dependencies: Vec<_> = effects
41            .into_iter()
42            .map(|e| TransactionDependencies::from_effects(e, &rwlock_builder))
43            .collect();
44        let output = Vec::with_capacity(dependencies.len() * 2);
45        let not_seen = dependencies.into_iter().map(|e| (e.digest, e)).collect();
46        Self { not_seen, output }
47    }
48
49    fn pop_first(&mut self) -> Option<TransactionDependencies> {
50        // Once map_first_last is stabilized this function can be rewritten as this:
51        // self.not_seen.pop_first()
52        let key = *self.not_seen.keys().next()?;
53        Some(self.not_seen.remove(&key).unwrap())
54    }
55
56    // effect is already removed from self.not_seen at this point
57    fn insert(&mut self, transaction: TransactionDependencies) {
58        let initial_state = InsertState::new(transaction);
59        let mut states = vec![initial_state];
60
61        while let Some(state) = states.last_mut() {
62            if let Some(new_state) = state.process(self) {
63                // This is essentially a 'recursive call' but using heap instead of stack to
64                // store state
65                states.push(new_state);
66            } else {
67                // Done with current state, remove it
68                states.pop().expect("Should contain an element");
69            }
70        }
71    }
72
73    fn into_list(self) -> Vec<TransactionEffects> {
74        self.output
75    }
76}
77
78struct TransactionDependencies {
79    digest: TransactionDigest,
80    dependencies: BTreeSet<TransactionDigest>,
81    effects: TransactionEffects,
82}
83
84impl TransactionDependencies {
85    fn from_effects(effects: TransactionEffects, rwlock_builder: &RWLockDependencyBuilder) -> Self {
86        let mut dependencies: BTreeSet<_> = effects.dependencies().iter().cloned().collect();
87        rwlock_builder.add_dependencies_for(*effects.transaction_digest(), &mut dependencies);
88        Self {
89            digest: *effects.transaction_digest(),
90            dependencies,
91            effects,
92        }
93    }
94}
95
96/// Supplies TransactionDependencies tree with additional edges from
97/// transactions that write shared locks object to transactions that read
98/// previous version of this object.
99///
100/// With RWLocks we can have multiple transaction that depend on shared object
101/// version N - many read transactions and single write transaction. Those
102/// transactions depend on transaction that has written N, but they do not
103/// depend on each other. And specifically, transaction that reads N and writes
104/// N+1 does not depend on read-only transactions that also read N.
105///
106/// We do not add such read transactions to TransactionEffects of shared object
107/// write transactions for next version to make sure TransactionEffects are not
108/// grow too large (and because you do not need read transactions to replay
109/// write transaction for next version).
110///
111/// However, when building checkpoints we supply transaction dependency tree
112/// with additional dependency edges to make it look like write transaction for
113/// next version causally depends on transactions that read previous versions,
114/// for two reasons:
115///
116/// (1) Without this addition we could have peculiar checkpoints where
117/// transaction reading version N appears after transaction that overwritten
118/// this object with version N+1. This does not affect how transaction is
119/// executed, but it is not something one would expect in causally ordered list.
120///
121/// (2) On the practical side it will allow to simplify pruner as it can now
122/// just tail checkpoints and delete objects in order they appear in
123/// TransactionEffects::modified_at_versions in checkpoint.
124struct RWLockDependencyBuilder {
125    read_version: HashMap<ObjectKey, Vec<TransactionDigest>>,
126    overwrite_versions: HashMap<TransactionDigest, Vec<ObjectKey>>,
127}
128
129impl RWLockDependencyBuilder {
130    pub fn from_effects(effects: &[TransactionEffects]) -> Self {
131        let mut read_version: HashMap<ObjectKey, Vec<TransactionDigest>> = Default::default();
132        let mut overwrite_versions: HashMap<TransactionDigest, Vec<ObjectKey>> = Default::default();
133        for effect in effects {
134            for kind in effect.input_shared_objects() {
135                match kind {
136                    InputSharedObject::ReadOnly(obj_ref) => {
137                        let obj_key = obj_ref.into();
138                        // Read only transaction
139                        read_version
140                            .entry(obj_key)
141                            .or_default()
142                            .push(*effect.transaction_digest());
143                    }
144                    InputSharedObject::Mutate(obj_ref) => {
145                        let obj_key = obj_ref.into();
146                        // write transaction
147                        overwrite_versions
148                            .entry(*effect.transaction_digest())
149                            .or_default()
150                            .push(obj_key);
151                    }
152                    InputSharedObject::ReadDeleted(oid, version) => read_version
153                        .entry(ObjectKey(oid, version))
154                        .or_default()
155                        .push(*effect.transaction_digest()),
156                    InputSharedObject::MutateDeleted(oid, version) => overwrite_versions
157                        .entry(*effect.transaction_digest())
158                        .or_default()
159                        .push(ObjectKey(oid, version)),
160                    InputSharedObject::Cancelled(..) => (), /* TODO: confirm that
161                                                             * consensus_commit_prologue is
162                                                             * always at the beginning of the
163                                                             * checkpoint, so that cancelled txn
164                                                             * don't need to worry about
165                                                             * dependency. */
166                }
167            }
168        }
169        Self {
170            read_version,
171            overwrite_versions,
172        }
173    }
174
175    pub fn add_dependencies_for(
176        &self,
177        digest: TransactionDigest,
178        v: &mut BTreeSet<TransactionDigest>,
179    ) {
180        let Some(overwrites) = self.overwrite_versions.get(&digest) else {
181            return;
182        };
183        for obj_ver in overwrites {
184            let Some(reads) = self.read_version.get(obj_ver) else {
185                continue;
186            };
187            for dep in reads {
188                trace!(
189                    "Assuming additional dependency when constructing checkpoint {:?} -> {:?}",
190                    digest, *dep
191                );
192                v.insert(*dep);
193            }
194        }
195    }
196}
197
198struct InsertState {
199    dependencies: Vec<TransactionDigest>,
200    transaction: Option<TransactionDependencies>,
201}
202
203impl InsertState {
204    pub fn new(transaction: TransactionDependencies) -> Self {
205        Self {
206            dependencies: transaction.dependencies.iter().cloned().collect(),
207            transaction: Some(transaction),
208        }
209    }
210
211    pub fn process(&mut self, causal_order: &mut CausalOrder) -> Option<InsertState> {
212        while let Some(dep) = self.dependencies.pop() {
213            if let Some(dep_transaction) = causal_order.not_seen.remove(&dep) {
214                return Some(InsertState::new(dep_transaction));
215            }
216        }
217        let transaction = self
218            .transaction
219            .take()
220            .expect("Can't use InsertState after it is finished");
221        causal_order.output.push(transaction.effects);
222        None
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use iota_types::{
229        base_types::{ObjectDigest, ObjectID, SequenceNumber},
230        effects::TransactionEffects,
231    };
232
233    use super::*;
234
235    #[test]
236    pub fn test_causal_order() {
237        let e1 = e(d(1), vec![d(2), d(3)]);
238        let e2 = e(d(2), vec![d(3), d(4)]);
239        let e3 = e(d(3), vec![]);
240        let e4 = e(d(4), vec![]);
241
242        let r = extract(CausalOrder::causal_sort(vec![
243            e1.clone(),
244            e2,
245            e3,
246            e4.clone(),
247        ]));
248        assert_eq!(r, vec![3, 4, 2, 1]);
249
250        // e1 and e4 are not (directly) causally dependent - ordered lexicographically
251        let r = extract(CausalOrder::causal_sort(vec![e1.clone(), e4.clone()]));
252        assert_eq!(r, vec![1, 4]);
253        let r = extract(CausalOrder::causal_sort(vec![e4, e1]));
254        assert_eq!(r, vec![1, 4]);
255    }
256
257    #[test]
258    pub fn test_causal_order_rw_locks() {
259        let mut e5 = e(d(5), vec![]);
260        let mut e2 = e(d(2), vec![]);
261        let mut e3 = e(d(3), vec![]);
262        let obj_digest = ObjectDigest::new(Default::default());
263        e5.unsafe_add_input_shared_object_for_testing(InputSharedObject::ReadOnly((
264            o(1),
265            SequenceNumber::from_u64(1),
266            obj_digest,
267        )));
268        e2.unsafe_add_input_shared_object_for_testing(InputSharedObject::ReadOnly((
269            o(1),
270            SequenceNumber::from_u64(1),
271            obj_digest,
272        )));
273        e3.unsafe_add_input_shared_object_for_testing(InputSharedObject::Mutate((
274            o(1),
275            SequenceNumber::from_u64(1),
276            obj_digest,
277        )));
278
279        let r = extract(CausalOrder::causal_sort(vec![e5, e2, e3]));
280        assert_eq!(r.len(), 3);
281        assert_eq!(*r.get(2).unwrap(), 3); // [3] is the last
282        // both [5] and [2] are present (but order is not fixed)
283        assert!(r.contains(&5));
284        assert!(r.contains(&2));
285    }
286
287    fn extract(e: Vec<TransactionEffects>) -> Vec<u8> {
288        e.into_iter()
289            .map(|e| e.transaction_digest().inner()[0])
290            .collect()
291    }
292
293    fn d(i: u8) -> TransactionDigest {
294        let mut bytes: [u8; 32] = Default::default();
295        bytes[0] = i;
296        TransactionDigest::new(bytes)
297    }
298
299    fn o(i: u8) -> ObjectID {
300        let mut bytes: [u8; ObjectID::LENGTH] = Default::default();
301        bytes[0] = i;
302        ObjectID::new(bytes)
303    }
304
305    fn e(
306        transaction_digest: TransactionDigest,
307        dependencies: Vec<TransactionDigest>,
308    ) -> TransactionEffects {
309        let mut effects = TransactionEffects::default();
310        *effects.transaction_digest_mut_for_testing() = transaction_digest;
311        *effects.dependencies_mut_for_testing() = dependencies;
312        effects
313    }
314}