1use std::collections::{BTreeMap, BTreeSet, HashMap};
6
7use iota_types::{
8 base_types::TransactionDigest,
9 effects::{InputSharedObject, TransactionEffects, TransactionEffectsAPI},
10 storage::ObjectKey,
11};
12use tracing::trace;
13
14pub struct CausalOrder {
15 not_seen: BTreeMap<TransactionDigest, TransactionDependencies>,
16 output: Vec<TransactionEffects>,
17}
18
19impl CausalOrder {
20 pub fn causal_sort(effects: Vec<TransactionEffects>) -> Vec<TransactionEffects> {
31 let mut this = Self::from_vec(effects);
32 while let Some(item) = this.pop_first() {
33 this.insert(item);
34 }
35 this.into_list()
36 }
37
38 fn from_vec(effects: Vec<TransactionEffects>) -> Self {
39 let rwlock_builder = RWLockDependencyBuilder::from_effects(&effects);
40 let dependencies: Vec<_> = effects
41 .into_iter()
42 .map(|e| TransactionDependencies::from_effects(e, &rwlock_builder))
43 .collect();
44 let output = Vec::with_capacity(dependencies.len() * 2);
45 let not_seen = dependencies.into_iter().map(|e| (e.digest, e)).collect();
46 Self { not_seen, output }
47 }
48
49 fn pop_first(&mut self) -> Option<TransactionDependencies> {
50 let key = *self.not_seen.keys().next()?;
53 Some(self.not_seen.remove(&key).unwrap())
54 }
55
56 fn insert(&mut self, transaction: TransactionDependencies) {
58 let initial_state = InsertState::new(transaction);
59 let mut states = vec![initial_state];
60
61 while let Some(state) = states.last_mut() {
62 if let Some(new_state) = state.process(self) {
63 states.push(new_state);
66 } else {
67 states.pop().expect("Should contain an element");
69 }
70 }
71 }
72
73 fn into_list(self) -> Vec<TransactionEffects> {
74 self.output
75 }
76}
77
78struct TransactionDependencies {
79 digest: TransactionDigest,
80 dependencies: BTreeSet<TransactionDigest>,
81 effects: TransactionEffects,
82}
83
84impl TransactionDependencies {
85 fn from_effects(effects: TransactionEffects, rwlock_builder: &RWLockDependencyBuilder) -> Self {
86 let mut dependencies: BTreeSet<_> = effects.dependencies().iter().cloned().collect();
87 rwlock_builder.add_dependencies_for(*effects.transaction_digest(), &mut dependencies);
88 Self {
89 digest: *effects.transaction_digest(),
90 dependencies,
91 effects,
92 }
93 }
94}
95
96struct RWLockDependencyBuilder {
125 read_version: HashMap<ObjectKey, Vec<TransactionDigest>>,
126 overwrite_versions: HashMap<TransactionDigest, Vec<ObjectKey>>,
127}
128
129impl RWLockDependencyBuilder {
130 pub fn from_effects(effects: &[TransactionEffects]) -> Self {
131 let mut read_version: HashMap<ObjectKey, Vec<TransactionDigest>> = Default::default();
132 let mut overwrite_versions: HashMap<TransactionDigest, Vec<ObjectKey>> = Default::default();
133 for effect in effects {
134 for kind in effect.input_shared_objects() {
135 match kind {
136 InputSharedObject::ReadOnly(obj_ref) => {
137 let obj_key = obj_ref.into();
138 read_version
140 .entry(obj_key)
141 .or_default()
142 .push(*effect.transaction_digest());
143 }
144 InputSharedObject::Mutate(obj_ref) => {
145 let obj_key = obj_ref.into();
146 overwrite_versions
148 .entry(*effect.transaction_digest())
149 .or_default()
150 .push(obj_key);
151 }
152 InputSharedObject::ReadDeleted(oid, version) => read_version
153 .entry(ObjectKey(oid, version))
154 .or_default()
155 .push(*effect.transaction_digest()),
156 InputSharedObject::MutateDeleted(oid, version) => overwrite_versions
157 .entry(*effect.transaction_digest())
158 .or_default()
159 .push(ObjectKey(oid, version)),
160 InputSharedObject::Cancelled(..) => (), }
167 }
168 }
169 Self {
170 read_version,
171 overwrite_versions,
172 }
173 }
174
175 pub fn add_dependencies_for(
176 &self,
177 digest: TransactionDigest,
178 v: &mut BTreeSet<TransactionDigest>,
179 ) {
180 let Some(overwrites) = self.overwrite_versions.get(&digest) else {
181 return;
182 };
183 for obj_ver in overwrites {
184 let Some(reads) = self.read_version.get(obj_ver) else {
185 continue;
186 };
187 for dep in reads {
188 trace!(
189 "Assuming additional dependency when constructing checkpoint {:?} -> {:?}",
190 digest, *dep
191 );
192 v.insert(*dep);
193 }
194 }
195 }
196}
197
198struct InsertState {
199 dependencies: Vec<TransactionDigest>,
200 transaction: Option<TransactionDependencies>,
201}
202
203impl InsertState {
204 pub fn new(transaction: TransactionDependencies) -> Self {
205 Self {
206 dependencies: transaction.dependencies.iter().cloned().collect(),
207 transaction: Some(transaction),
208 }
209 }
210
211 pub fn process(&mut self, causal_order: &mut CausalOrder) -> Option<InsertState> {
212 while let Some(dep) = self.dependencies.pop() {
213 if let Some(dep_transaction) = causal_order.not_seen.remove(&dep) {
214 return Some(InsertState::new(dep_transaction));
215 }
216 }
217 let transaction = self
218 .transaction
219 .take()
220 .expect("Can't use InsertState after it is finished");
221 causal_order.output.push(transaction.effects);
222 None
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use iota_types::{
229 base_types::{ObjectDigest, ObjectID, SequenceNumber},
230 effects::TransactionEffects,
231 };
232
233 use super::*;
234
235 #[test]
236 pub fn test_causal_order() {
237 let e1 = e(d(1), vec![d(2), d(3)]);
238 let e2 = e(d(2), vec![d(3), d(4)]);
239 let e3 = e(d(3), vec![]);
240 let e4 = e(d(4), vec![]);
241
242 let r = extract(CausalOrder::causal_sort(vec![
243 e1.clone(),
244 e2,
245 e3,
246 e4.clone(),
247 ]));
248 assert_eq!(r, vec![3, 4, 2, 1]);
249
250 let r = extract(CausalOrder::causal_sort(vec![e1.clone(), e4.clone()]));
252 assert_eq!(r, vec![1, 4]);
253 let r = extract(CausalOrder::causal_sort(vec![e4, e1]));
254 assert_eq!(r, vec![1, 4]);
255 }
256
257 #[test]
258 pub fn test_causal_order_rw_locks() {
259 let mut e5 = e(d(5), vec![]);
260 let mut e2 = e(d(2), vec![]);
261 let mut e3 = e(d(3), vec![]);
262 let obj_digest = ObjectDigest::new(Default::default());
263 e5.unsafe_add_input_shared_object_for_testing(InputSharedObject::ReadOnly((
264 o(1),
265 SequenceNumber::from_u64(1),
266 obj_digest,
267 )));
268 e2.unsafe_add_input_shared_object_for_testing(InputSharedObject::ReadOnly((
269 o(1),
270 SequenceNumber::from_u64(1),
271 obj_digest,
272 )));
273 e3.unsafe_add_input_shared_object_for_testing(InputSharedObject::Mutate((
274 o(1),
275 SequenceNumber::from_u64(1),
276 obj_digest,
277 )));
278
279 let r = extract(CausalOrder::causal_sort(vec![e5, e2, e3]));
280 assert_eq!(r.len(), 3);
281 assert_eq!(*r.get(2).unwrap(), 3); assert!(r.contains(&5));
284 assert!(r.contains(&2));
285 }
286
287 fn extract(e: Vec<TransactionEffects>) -> Vec<u8> {
288 e.into_iter()
289 .map(|e| e.transaction_digest().inner()[0])
290 .collect()
291 }
292
293 fn d(i: u8) -> TransactionDigest {
294 let mut bytes: [u8; 32] = Default::default();
295 bytes[0] = i;
296 TransactionDigest::new(bytes)
297 }
298
299 fn o(i: u8) -> ObjectID {
300 let mut bytes: [u8; ObjectID::LENGTH] = Default::default();
301 bytes[0] = i;
302 ObjectID::new(bytes)
303 }
304
305 fn e(
306 transaction_digest: TransactionDigest,
307 dependencies: Vec<TransactionDigest>,
308 ) -> TransactionEffects {
309 let mut effects = TransactionEffects::default();
310 *effects.transaction_digest_mut_for_testing() = transaction_digest;
311 *effects.dependencies_mut_for_testing() = dependencies;
312 effects
313 }
314}