iota_core/execution_cache/
object_locks.rs1use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
6use iota_common::debug_fatal;
7use iota_types::{
8 base_types::{ObjectID, ObjectRef},
9 error::{IotaError, IotaResult, UserInputError},
10 object::Object,
11 storage::ObjectStore,
12 transaction::VerifiedSignedTransaction,
13};
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
18
19type RefCount = usize;
20
21pub(super) struct ObjectLocks {
22 locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
33}
34
35impl ObjectLocks {
36 pub fn new() -> Self {
37 Self {
38 locked_transactions: DashMap::new(),
39 }
40 }
41
42 pub(crate) fn get_transaction_lock(
43 &self,
44 obj_ref: &ObjectRef,
45 epoch_store: &AuthorityPerEpochStore,
46 ) -> IotaResult<Option<LockDetails>> {
47 epoch_store.tables()?.get_locked_transaction(obj_ref)
51 }
52
53 pub(crate) fn try_set_transaction_lock(
58 &self,
59 obj_ref: &ObjectRef,
60 new_lock: LockDetails,
61 epoch_store: &AuthorityPerEpochStore,
62 ) -> IotaResult {
63 let entry = self.locked_transactions.entry(*obj_ref);
65
66 let prev_lock = match entry {
84 DashMapEntry::Vacant(vacant) => {
85 let tables = epoch_store.tables()?;
86 if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
87 trace!("read lock from db: {:?}", lock_details);
88 vacant.insert((1, lock_details));
89 lock_details
90 } else {
91 trace!("set lock: {:?}", new_lock);
92 vacant.insert((1, new_lock));
93 new_lock
94 }
95 }
96 DashMapEntry::Occupied(mut occupied) => {
97 occupied.get_mut().0 += 1;
98 occupied.get().1
99 }
100 };
101
102 if prev_lock != new_lock {
103 debug!("lock conflict detected: {:?} != {:?}", prev_lock, new_lock);
104 Err(IotaError::ObjectLockConflict {
105 obj_ref: *obj_ref,
106 pending_transaction: prev_lock,
107 })
108 } else {
109 Ok(())
110 }
111 }
112
113 pub(crate) fn clear(&self) {
114 info!("clearing old transaction locks");
115 self.locked_transactions.clear();
116 }
117
118 fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> IotaResult {
119 debug_assert_eq!(obj_ref.0, live_object.id());
120 if obj_ref.1 != live_object.version() {
121 debug!(
122 "object version unavailable for consumption: {:?} (current: {})",
123 obj_ref,
124 live_object.version()
125 );
126 return Err(IotaError::UserInput {
127 error: UserInputError::ObjectVersionUnavailableForConsumption {
128 provided_obj_ref: *obj_ref,
129 current_version: live_object.version(),
130 },
131 });
132 }
133
134 let live_digest = live_object.digest();
135 if obj_ref.2 != live_digest {
136 return Err(IotaError::UserInput {
137 error: UserInputError::InvalidObjectDigest {
138 object_id: obj_ref.0,
139 expected_digest: live_digest,
140 },
141 });
142 }
143
144 Ok(())
145 }
146
147 fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
148 for (obj_ref, lock) in locks {
149 let entry = self.locked_transactions.entry(*obj_ref);
150 let mut occupied = match entry {
151 DashMapEntry::Vacant(_) => {
152 debug_fatal!("lock must exist for object: {:?}", obj_ref);
153 continue;
154 }
155 DashMapEntry::Occupied(occupied) => occupied,
156 };
157
158 if occupied.get().1 == *lock {
159 occupied.get_mut().0 -= 1;
160 if occupied.get().0 == 0 {
161 trace!("clearing lock: {:?}", lock);
162 occupied.remove();
163 }
164 } else {
165 panic!("lock was changed since we set it");
169 }
170 }
171 }
172
173 fn multi_get_objects_must_exist(
174 cache: &WritebackCache,
175 object_ids: &[ObjectID],
176 ) -> IotaResult<Vec<Object>> {
177 let objects = cache.multi_get_objects(object_ids)?;
178 let mut result = Vec::with_capacity(objects.len());
179 for (i, object) in objects.into_iter().enumerate() {
180 if let Some(object) = object {
181 result.push(object);
182 } else {
183 return Err(IotaError::UserInput {
184 error: UserInputError::ObjectNotFound {
185 object_id: object_ids[i],
186 version: None,
187 },
188 });
189 }
190 }
191 Ok(result)
192 }
193
194 #[instrument(level = "debug", skip_all)]
195 pub(crate) async fn acquire_transaction_locks(
196 &self,
197 cache: &WritebackCache,
198 epoch_store: &AuthorityPerEpochStore,
199 owned_input_objects: &[ObjectRef],
200 transaction: VerifiedSignedTransaction,
201 ) -> IotaResult {
202 let tx_digest = *transaction.digest();
203
204 let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
205 let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
206
207 for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
209 Self::verify_live_object(obj_ref, live_object)?;
210 }
211
212 let mut locks_to_write: Vec<(_, LockDetails)> =
213 Vec::with_capacity(owned_input_objects.len());
214
215 let owned_input_objects = {
226 let mut o = owned_input_objects.to_vec();
227 o.sort_by_key(|o| o.0);
228 o
229 };
230
231 for obj_ref in owned_input_objects.iter() {
238 match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
239 Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
240 Err(e) => {
241 self.clear_cached_locks(&locks_to_write);
247 return Err(e);
248 }
249 }
250 }
251
252 epoch_store
254 .tables()?
255 .write_transaction_locks(transaction, locks_to_write.iter().cloned())?;
256
257 self.clear_cached_locks(&locks_to_write);
259
260 Ok(())
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use futures::FutureExt;
267
268 use crate::execution_cache::{
269 ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
270 };
271
272 #[tokio::test]
273 async fn test_transaction_locks_are_exclusive() {
274 telemetry_subscribers::init_for_testing();
275 Scenario::iterate(|mut s| async move {
276 s.with_created(&[1, 2, 3]);
277 s.do_tx().await;
278
279 s.with_mutated(&[1, 2, 3]);
280 s.do_tx().await;
281
282 let new1 = s.obj_ref(1);
283 let new2 = s.obj_ref(2);
284 let new3 = s.obj_ref(3);
285
286 s.with_mutated(&[1, 2, 3]); let outputs = s.take_outputs();
288
289 let tx1 = s.make_signed_transaction(&outputs.transaction);
290
291 s.cache
292 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1)
293 .await
294 .expect("locks should be available");
295
296 s.with_created(&[4, 5]);
299 let tx2 = s.take_outputs().transaction.clone();
300 let tx2 = s.make_signed_transaction(&tx2);
301
302 s.cache
304 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone())
305 .await
306 .unwrap_err();
307
308 s.cache
310 .acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone())
311 .await
312 .unwrap_err();
313
314 s.cache
316 .acquire_transaction_locks(&s.epoch_store, &[new3], tx2)
317 .await
318 .expect("new3 should be unlocked");
319 })
320 .await;
321 }
322
323 #[tokio::test]
324 async fn test_transaction_locks_are_durable() {
325 telemetry_subscribers::init_for_testing();
326 Scenario::iterate(|mut s| async move {
327 s.with_created(&[1, 2]);
328 s.do_tx().await;
329
330 let old2 = s.obj_ref(2);
331
332 s.with_mutated(&[1, 2]);
333 s.do_tx().await;
334
335 let new1 = s.obj_ref(1);
336 let new2 = s.obj_ref(2);
337
338 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
340
341 let tx = s.make_signed_transaction(&outputs.transaction);
342
343 s.cache
345 .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone())
346 .await
347 .unwrap_err();
348
349 s.cache
352 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx)
353 .await
354 .expect("new1 should be unlocked after revert");
355 })
356 .await;
357 }
358
359 #[tokio::test]
360 async fn test_acquire_transaction_locks_revert() {
361 telemetry_subscribers::init_for_testing();
362 Scenario::iterate(|mut s| async move {
363 s.with_created(&[1, 2]);
364 s.do_tx().await;
365
366 let old2 = s.obj_ref(2);
367
368 s.with_mutated(&[1, 2]);
369 s.do_tx().await;
370
371 let new1 = s.obj_ref(1);
372 let new2 = s.obj_ref(2);
373
374 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
376
377 let tx = s.make_signed_transaction(&outputs.transaction);
378
379 s.cache
381 .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx)
382 .await
383 .unwrap_err();
384
385 s.with_created(&[4, 5]);
388 let tx2 = s.take_outputs().transaction.clone();
389 let tx2 = s.make_signed_transaction(&tx2);
390
391 s.cache
394 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2)
395 .await
396 .expect("new1 should be unlocked after revert");
397 })
398 .await;
399 }
400
401 #[tokio::test]
402 async fn test_acquire_transaction_locks_is_sync() {
403 telemetry_subscribers::init_for_testing();
404 Scenario::iterate(|mut s| async move {
405 s.with_created(&[1, 2]);
406 s.do_tx().await;
407
408 let objects: Vec<_> = vec![s.object(1), s.object(2)]
409 .into_iter()
410 .map(|o| o.compute_object_reference())
411 .collect();
412
413 s.with_mutated(&[1, 2]);
414 let outputs = s.take_outputs();
415
416 let tx2 = s.make_signed_transaction(&outputs.transaction);
417 s.cache
420 .acquire_transaction_locks(&s.epoch_store, &objects, tx2)
421 .now_or_never()
422 .unwrap()
423 .unwrap();
424 })
425 .await;
426 }
427}