iota_core/execution_cache/
object_locks.rs1use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
6use iota_common::debug_fatal;
7use iota_types::{
8 base_types::{ObjectID, ObjectRef},
9 error::{IotaError, IotaResult, UserInputError},
10 object::Object,
11 storage::ObjectStore,
12 transaction::VerifiedSignedTransaction,
13};
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
18
19type RefCount = usize;
20
21pub(super) struct ObjectLocks {
22 locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
33}
34
35impl ObjectLocks {
36 pub fn new() -> Self {
37 Self {
38 locked_transactions: DashMap::new(),
39 }
40 }
41
42 pub(crate) fn get_transaction_lock(
43 &self,
44 obj_ref: &ObjectRef,
45 epoch_store: &AuthorityPerEpochStore,
46 ) -> IotaResult<Option<LockDetails>> {
47 epoch_store.tables()?.get_locked_transaction(obj_ref)
51 }
52
53 pub(crate) fn try_set_transaction_lock(
58 &self,
59 obj_ref: &ObjectRef,
60 new_lock: LockDetails,
61 epoch_store: &AuthorityPerEpochStore,
62 ) -> IotaResult {
63 let entry = self.locked_transactions.entry(*obj_ref);
65
66 let prev_lock = match entry {
84 DashMapEntry::Vacant(vacant) => {
85 let tables = epoch_store.tables()?;
86 if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
87 trace!("read lock from db: {:?}", lock_details);
88 vacant.insert((1, lock_details));
89 lock_details
90 } else {
91 trace!("set lock: {:?}", new_lock);
92 vacant.insert((1, new_lock));
93 new_lock
94 }
95 }
96 DashMapEntry::Occupied(mut occupied) => {
97 occupied.get_mut().0 += 1;
98 occupied.get().1
99 }
100 };
101
102 if prev_lock != new_lock {
103 debug!(
104 "lock conflict detected for {:?}: {:?} != {:?}",
105 obj_ref, prev_lock, new_lock
106 );
107 Err(IotaError::ObjectLockConflict {
108 obj_ref: *obj_ref,
109 pending_transaction: prev_lock,
110 })
111 } else {
112 Ok(())
113 }
114 }
115
116 pub(crate) fn clear(&self) {
117 info!("clearing old transaction locks");
118 self.locked_transactions.clear();
119 }
120
121 fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> IotaResult {
122 debug_assert_eq!(obj_ref.0, live_object.id());
123 if obj_ref.1 != live_object.version() {
124 debug!(
125 "object version unavailable for consumption: {:?} (current: {})",
126 obj_ref,
127 live_object.version()
128 );
129 return Err(IotaError::UserInput {
130 error: UserInputError::ObjectVersionUnavailableForConsumption {
131 provided_obj_ref: *obj_ref,
132 current_version: live_object.version(),
133 },
134 });
135 }
136
137 let live_digest = live_object.digest();
138 if obj_ref.2 != live_digest {
139 return Err(IotaError::UserInput {
140 error: UserInputError::InvalidObjectDigest {
141 object_id: obj_ref.0,
142 expected_digest: live_digest,
143 },
144 });
145 }
146
147 Ok(())
148 }
149
150 fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
151 for (obj_ref, lock) in locks {
152 let entry = self.locked_transactions.entry(*obj_ref);
153 let mut occupied = match entry {
154 DashMapEntry::Vacant(_) => {
155 debug_fatal!("lock must exist for object: {:?}", obj_ref);
156 continue;
157 }
158 DashMapEntry::Occupied(occupied) => occupied,
159 };
160
161 if occupied.get().1 == *lock {
162 occupied.get_mut().0 -= 1;
163 if occupied.get().0 == 0 {
164 trace!("clearing lock: {:?}", lock);
165 occupied.remove();
166 }
167 } else {
168 panic!("lock was changed since we set it");
172 }
173 }
174 }
175
176 fn multi_get_objects_must_exist(
177 cache: &WritebackCache,
178 object_ids: &[ObjectID],
179 ) -> IotaResult<Vec<Object>> {
180 let objects = cache.try_multi_get_objects(object_ids)?;
181 let mut result = Vec::with_capacity(objects.len());
182 for (i, object) in objects.into_iter().enumerate() {
183 if let Some(object) = object {
184 result.push(object);
185 } else {
186 return Err(IotaError::UserInput {
187 error: UserInputError::ObjectNotFound {
188 object_id: object_ids[i],
189 version: None,
190 },
191 });
192 }
193 }
194 Ok(result)
195 }
196
197 #[instrument(level = "debug", skip_all)]
198 pub(crate) fn acquire_transaction_locks(
199 &self,
200 cache: &WritebackCache,
201 epoch_store: &AuthorityPerEpochStore,
202 owned_input_objects: &[ObjectRef],
203 transaction: VerifiedSignedTransaction,
204 ) -> IotaResult {
205 let tx_digest = *transaction.digest();
206
207 let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
208 let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
209
210 for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
212 Self::verify_live_object(obj_ref, live_object)?;
213 }
214
215 let mut locks_to_write: Vec<(_, LockDetails)> =
216 Vec::with_capacity(owned_input_objects.len());
217
218 let owned_input_objects = {
229 let mut o = owned_input_objects.to_vec();
230 o.sort_by_key(|o| o.0);
231 o
232 };
233
234 for obj_ref in owned_input_objects.iter() {
241 match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
242 Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
243 Err(e) => {
244 self.clear_cached_locks(&locks_to_write);
250 return Err(e);
251 }
252 }
253 }
254
255 epoch_store
257 .tables()?
258 .write_transaction_locks(transaction, locks_to_write.iter().cloned())?;
259
260 self.clear_cached_locks(&locks_to_write);
262
263 Ok(())
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use crate::execution_cache::{
270 ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
271 };
272
273 #[tokio::test]
274 async fn test_transaction_locks_are_exclusive() {
275 telemetry_subscribers::init_for_testing();
276 Scenario::iterate(|mut s| async move {
277 s.with_created(&[1, 2, 3]);
278 s.do_tx().await;
279
280 s.with_mutated(&[1, 2, 3]);
281 s.do_tx().await;
282
283 let new1 = s.obj_ref(1);
284 let new2 = s.obj_ref(2);
285 let new3 = s.obj_ref(3);
286
287 s.with_mutated(&[1, 2, 3]); let outputs = s.take_outputs();
289
290 let tx1 = s.make_signed_transaction(&outputs.transaction);
291
292 s.cache
293 .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1)
294 .expect("locks should be available");
295
296 s.with_created(&[4, 5]);
299 let tx2 = s.take_outputs().transaction.clone();
300 let tx2 = s.make_signed_transaction(&tx2);
301
302 s.cache
304 .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone())
305 .unwrap_err();
306
307 s.cache
309 .try_acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone())
310 .unwrap_err();
311
312 s.cache
314 .try_acquire_transaction_locks(&s.epoch_store, &[new3], tx2)
315 .expect("new3 should be unlocked");
316 })
317 .await;
318 }
319
320 #[tokio::test]
321 async fn test_transaction_locks_are_durable() {
322 telemetry_subscribers::init_for_testing();
323 Scenario::iterate(|mut s| async move {
324 s.with_created(&[1, 2]);
325 s.do_tx().await;
326
327 let old2 = s.obj_ref(2);
328
329 s.with_mutated(&[1, 2]);
330 s.do_tx().await;
331
332 let new1 = s.obj_ref(1);
333 let new2 = s.obj_ref(2);
334
335 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
337
338 let tx = s.make_signed_transaction(&outputs.transaction);
339
340 s.cache
342 .try_acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone())
343 .unwrap_err();
344
345 s.cache
348 .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx)
349 .expect("new1 should be unlocked after revert");
350 })
351 .await;
352 }
353
354 #[tokio::test]
355 async fn test_acquire_transaction_locks_revert() {
356 telemetry_subscribers::init_for_testing();
357 Scenario::iterate(|mut s| async move {
358 s.with_created(&[1, 2]);
359 s.do_tx().await;
360
361 let old2 = s.obj_ref(2);
362
363 s.with_mutated(&[1, 2]);
364 s.do_tx().await;
365
366 let new1 = s.obj_ref(1);
367 let new2 = s.obj_ref(2);
368
369 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
371
372 let tx = s.make_signed_transaction(&outputs.transaction);
373
374 s.cache
376 .try_acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx)
377 .unwrap_err();
378
379 s.with_created(&[4, 5]);
382 let tx2 = s.take_outputs().transaction.clone();
383 let tx2 = s.make_signed_transaction(&tx2);
384
385 s.cache
388 .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2)
389 .expect("new1 should be unlocked after revert");
390 })
391 .await;
392 }
393
394 #[tokio::test]
395 async fn test_acquire_transaction_locks_is_sync() {
396 telemetry_subscribers::init_for_testing();
397 Scenario::iterate(|mut s| async move {
398 s.with_created(&[1, 2]);
399 s.do_tx().await;
400
401 let objects: Vec<_> = vec![s.object(1), s.object(2)]
402 .into_iter()
403 .map(|o| o.compute_object_reference())
404 .collect();
405
406 s.with_mutated(&[1, 2]);
407 let outputs = s.take_outputs();
408
409 let tx2 = s.make_signed_transaction(&outputs.transaction);
410 s.cache
413 .try_acquire_transaction_locks(&s.epoch_store, &objects, tx2)
414 .unwrap();
415 })
416 .await;
417 }
418}