iota_core/execution_cache/
object_locks.rs1use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
6use iota_common::debug_fatal;
7use iota_types::{
8 base_types::{ObjectID, ObjectRef},
9 error::{IotaError, IotaResult, UserInputError},
10 object::Object,
11 storage::ObjectStore,
12 transaction::VerifiedSignedTransaction,
13};
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
18
19type RefCount = usize;
20
21pub(super) struct ObjectLocks {
22 locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
33}
34
35impl ObjectLocks {
36 pub fn new() -> Self {
37 Self {
38 locked_transactions: DashMap::new(),
39 }
40 }
41
42 pub(crate) fn get_transaction_lock(
43 &self,
44 obj_ref: &ObjectRef,
45 epoch_store: &AuthorityPerEpochStore,
46 ) -> IotaResult<Option<LockDetails>> {
47 epoch_store.tables()?.get_locked_transaction(obj_ref)
51 }
52
53 pub(crate) fn try_set_transaction_lock(
58 &self,
59 obj_ref: &ObjectRef,
60 new_lock: LockDetails,
61 epoch_store: &AuthorityPerEpochStore,
62 ) -> IotaResult {
63 let entry = self.locked_transactions.entry(*obj_ref);
65
66 let prev_lock = match entry {
84 DashMapEntry::Vacant(vacant) => {
85 let tables = epoch_store.tables()?;
86 if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
87 trace!("read lock from db: {:?}", lock_details);
88 vacant.insert((1, lock_details));
89 lock_details
90 } else {
91 trace!("set lock: {:?}", new_lock);
92 vacant.insert((1, new_lock));
93 new_lock
94 }
95 }
96 DashMapEntry::Occupied(mut occupied) => {
97 occupied.get_mut().0 += 1;
98 occupied.get().1
99 }
100 };
101
102 if prev_lock != new_lock {
103 debug!("lock conflict detected: {:?} != {:?}", prev_lock, new_lock);
104 Err(IotaError::ObjectLockConflict {
105 obj_ref: *obj_ref,
106 pending_transaction: prev_lock,
107 })
108 } else {
109 Ok(())
110 }
111 }
112
113 pub(crate) fn clear(&self) {
114 info!("clearing old transaction locks");
115 self.locked_transactions.clear();
116 }
117
118 fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> IotaResult {
119 debug_assert_eq!(obj_ref.0, live_object.id());
120 if obj_ref.1 != live_object.version() {
121 debug!(
122 "object version unavailable for consumption: {:?} (current: {})",
123 obj_ref,
124 live_object.version()
125 );
126 return Err(IotaError::UserInput {
127 error: UserInputError::ObjectVersionUnavailableForConsumption {
128 provided_obj_ref: *obj_ref,
129 current_version: live_object.version(),
130 },
131 });
132 }
133
134 let live_digest = live_object.digest();
135 if obj_ref.2 != live_digest {
136 debug!("object digest mismatch: {:?} vs {:?}", obj_ref, live_digest);
137 return Err(IotaError::UserInput {
138 error: UserInputError::InvalidObjectDigest {
139 object_id: obj_ref.0,
140 expected_digest: live_digest,
141 },
142 });
143 }
144
145 Ok(())
146 }
147
148 fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
149 for (obj_ref, lock) in locks {
150 let entry = self.locked_transactions.entry(*obj_ref);
151 let mut occupied = match entry {
152 DashMapEntry::Vacant(_) => {
153 debug_fatal!("lock must exist for object: {:?}", obj_ref);
154 continue;
155 }
156 DashMapEntry::Occupied(occupied) => occupied,
157 };
158
159 if occupied.get().1 == *lock {
160 occupied.get_mut().0 -= 1;
161 if occupied.get().0 == 0 {
162 trace!("clearing lock: {:?}", lock);
163 occupied.remove();
164 }
165 } else {
166 panic!("lock was changed since we set it");
170 }
171 }
172 }
173
174 fn multi_get_objects_must_exist(
175 cache: &WritebackCache,
176 object_ids: &[ObjectID],
177 ) -> IotaResult<Vec<Object>> {
178 let objects = cache.multi_get_objects(object_ids)?;
179 let mut result = Vec::with_capacity(objects.len());
180 for (i, object) in objects.into_iter().enumerate() {
181 if let Some(object) = object {
182 result.push(object);
183 } else {
184 return Err(IotaError::UserInput {
185 error: UserInputError::ObjectNotFound {
186 object_id: object_ids[i],
187 version: None,
188 },
189 });
190 }
191 }
192 Ok(result)
193 }
194
195 #[instrument(level = "debug", skip_all)]
196 pub(crate) async fn acquire_transaction_locks(
197 &self,
198 cache: &WritebackCache,
199 epoch_store: &AuthorityPerEpochStore,
200 owned_input_objects: &[ObjectRef],
201 transaction: VerifiedSignedTransaction,
202 ) -> IotaResult {
203 let tx_digest = *transaction.digest();
204
205 let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
206 let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
207
208 for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
210 Self::verify_live_object(obj_ref, live_object)?;
211 }
212
213 let mut locks_to_write: Vec<(_, LockDetails)> =
214 Vec::with_capacity(owned_input_objects.len());
215
216 let owned_input_objects = {
227 let mut o = owned_input_objects.to_vec();
228 o.sort_by_key(|o| o.0);
229 o
230 };
231
232 for obj_ref in owned_input_objects.iter() {
239 match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
240 Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
241 Err(e) => {
242 self.clear_cached_locks(&locks_to_write);
248 return Err(e);
249 }
250 }
251 }
252
253 epoch_store
255 .tables()?
256 .write_transaction_locks(transaction, locks_to_write.iter().cloned())?;
257
258 self.clear_cached_locks(&locks_to_write);
260
261 Ok(())
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use futures::FutureExt;
268
269 use crate::execution_cache::{
270 ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
271 };
272
273 #[tokio::test]
274 async fn test_transaction_locks_are_exclusive() {
275 telemetry_subscribers::init_for_testing();
276 Scenario::iterate(|mut s| async move {
277 s.with_created(&[1, 2, 3]);
278 s.do_tx().await;
279
280 s.with_mutated(&[1, 2, 3]);
281 s.do_tx().await;
282
283 let new1 = s.obj_ref(1);
284 let new2 = s.obj_ref(2);
285 let new3 = s.obj_ref(3);
286
287 s.with_mutated(&[1, 2, 3]); let outputs = s.take_outputs();
289
290 let tx1 = s.make_signed_transaction(&outputs.transaction);
291
292 s.cache
293 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1)
294 .await
295 .expect("locks should be available");
296
297 s.with_created(&[4, 5]);
300 let tx2 = s.take_outputs().transaction.clone();
301 let tx2 = s.make_signed_transaction(&tx2);
302
303 s.cache
305 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone())
306 .await
307 .unwrap_err();
308
309 s.cache
311 .acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone())
312 .await
313 .unwrap_err();
314
315 s.cache
317 .acquire_transaction_locks(&s.epoch_store, &[new3], tx2)
318 .await
319 .expect("new3 should be unlocked");
320 })
321 .await;
322 }
323
324 #[tokio::test]
325 async fn test_transaction_locks_are_durable() {
326 telemetry_subscribers::init_for_testing();
327 Scenario::iterate(|mut s| async move {
328 s.with_created(&[1, 2]);
329 s.do_tx().await;
330
331 let old2 = s.obj_ref(2);
332
333 s.with_mutated(&[1, 2]);
334 s.do_tx().await;
335
336 let new1 = s.obj_ref(1);
337 let new2 = s.obj_ref(2);
338
339 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
341
342 let tx = s.make_signed_transaction(&outputs.transaction);
343
344 s.cache
346 .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone())
347 .await
348 .unwrap_err();
349
350 s.cache
353 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx)
354 .await
355 .expect("new1 should be unlocked after revert");
356 })
357 .await;
358 }
359
360 #[tokio::test]
361 async fn test_acquire_transaction_locks_revert() {
362 telemetry_subscribers::init_for_testing();
363 Scenario::iterate(|mut s| async move {
364 s.with_created(&[1, 2]);
365 s.do_tx().await;
366
367 let old2 = s.obj_ref(2);
368
369 s.with_mutated(&[1, 2]);
370 s.do_tx().await;
371
372 let new1 = s.obj_ref(1);
373 let new2 = s.obj_ref(2);
374
375 s.with_mutated(&[1, 2]); let outputs = s.take_outputs();
377
378 let tx = s.make_signed_transaction(&outputs.transaction);
379
380 s.cache
382 .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx)
383 .await
384 .unwrap_err();
385
386 s.with_created(&[4, 5]);
389 let tx2 = s.take_outputs().transaction.clone();
390 let tx2 = s.make_signed_transaction(&tx2);
391
392 s.cache
395 .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2)
396 .await
397 .expect("new1 should be unlocked after revert");
398 })
399 .await;
400 }
401
402 #[tokio::test]
403 async fn test_acquire_transaction_locks_is_sync() {
404 telemetry_subscribers::init_for_testing();
405 Scenario::iterate(|mut s| async move {
406 s.with_created(&[1, 2]);
407 s.do_tx().await;
408
409 let objects: Vec<_> = vec![s.object(1), s.object(2)]
410 .into_iter()
411 .map(|o| o.compute_object_reference())
412 .collect();
413
414 s.with_mutated(&[1, 2]);
415 let outputs = s.take_outputs();
416
417 let tx2 = s.make_signed_transaction(&outputs.transaction);
418 s.cache
421 .acquire_transaction_locks(&s.epoch_store, &objects, tx2)
422 .now_or_never()
423 .unwrap()
424 .unwrap();
425 })
426 .await;
427 }
428}