iota_core/execution_cache/
object_locks.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
6use iota_common::debug_fatal;
7use iota_types::{
8    base_types::{ObjectID, ObjectRef},
9    error::{IotaError, IotaResult, UserInputError},
10    object::Object,
11    storage::ObjectStore,
12    transaction::VerifiedSignedTransaction,
13};
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
18
19type RefCount = usize;
20
21pub(super) struct ObjectLocks {
22    // When acquire transaction locks, lock entries are briefly inserted into this map. The map
23    // exists to provide atomic test-and-set operations on the locks. After all locks have been
24    // inserted into the map, they are written to the db, and then all locks are removed from
25    // the map.
26    //
27    // After a transaction has been executed, newly created objects are available to be locked.
28    // But, because of crash recovery, we cannot rule out that a lock may already exist in the db
29    // for those objects. Therefore we do a db read for each object we are locking.
30    //
31    // TODO: find a strategy to allow us to avoid db reads for each object.
32    locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
33}
34
35impl ObjectLocks {
36    pub fn new() -> Self {
37        Self {
38            locked_transactions: DashMap::new(),
39        }
40    }
41
42    pub(crate) fn get_transaction_lock(
43        &self,
44        obj_ref: &ObjectRef,
45        epoch_store: &AuthorityPerEpochStore,
46    ) -> IotaResult<Option<LockDetails>> {
47        // We don't consult the in-memory state here. We are only interested in state
48        // that has been committed to the db. This is because in memory state is
49        // reverted if the transaction is not successfully locked.
50        epoch_store.tables()?.get_locked_transaction(obj_ref)
51    }
52
53    /// Attempts to atomically test-and-set a transaction lock on an object.
54    /// If the lock is already set to a conflicting transaction, an error is
55    /// returned. If the lock is not set, or is already set to the same
56    /// transaction, the lock is set.
57    pub(crate) fn try_set_transaction_lock(
58        &self,
59        obj_ref: &ObjectRef,
60        new_lock: LockDetails,
61        epoch_store: &AuthorityPerEpochStore,
62    ) -> IotaResult {
63        // entry holds a lock on the dashmap shard, so this function operates atomically
64        let entry = self.locked_transactions.entry(*obj_ref);
65
66        // TODO: currently, the common case for this code is that we will miss the cache
67        // and read from the db. It is difficult to implement negative caching, since we
68        // may have restarted, in which case there could be locks in the db that we do
69        // not have in the cache. We may want to explore strategies for proving there
70        // cannot be a lock in the db that we do not know about. Two possibilities are:
71        //
72        // 1. Read all locks into memory at startup (and keep them there). The lifetime
73        //    of locks is relatively short in the common case, so this might be
74        //    feasible.
75        // 2. Find some strategy to distinguish between the cases where we are
76        //    re-executing old transactions after restarting vs executing transactions
77        //    that we have never seen before. The output objects of novel transactions
78        //    cannot previously have been locked on this validator.
79        //
80        // Solving this is not terribly important as it is not in the execution path,
81        // and hence only improves the latency of transaction signing, not
82        // transaction execution
83        let prev_lock = match entry {
84            DashMapEntry::Vacant(vacant) => {
85                let tables = epoch_store.tables()?;
86                if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
87                    trace!("read lock from db: {:?}", lock_details);
88                    vacant.insert((1, lock_details));
89                    lock_details
90                } else {
91                    trace!("set lock: {:?}", new_lock);
92                    vacant.insert((1, new_lock));
93                    new_lock
94                }
95            }
96            DashMapEntry::Occupied(mut occupied) => {
97                occupied.get_mut().0 += 1;
98                occupied.get().1
99            }
100        };
101
102        if prev_lock != new_lock {
103            debug!("lock conflict detected: {:?} != {:?}", prev_lock, new_lock);
104            Err(IotaError::ObjectLockConflict {
105                obj_ref: *obj_ref,
106                pending_transaction: prev_lock,
107            })
108        } else {
109            Ok(())
110        }
111    }
112
113    pub(crate) fn clear(&self) {
114        info!("clearing old transaction locks");
115        self.locked_transactions.clear();
116    }
117
118    fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> IotaResult {
119        debug_assert_eq!(obj_ref.0, live_object.id());
120        if obj_ref.1 != live_object.version() {
121            debug!(
122                "object version unavailable for consumption: {:?} (current: {})",
123                obj_ref,
124                live_object.version()
125            );
126            return Err(IotaError::UserInput {
127                error: UserInputError::ObjectVersionUnavailableForConsumption {
128                    provided_obj_ref: *obj_ref,
129                    current_version: live_object.version(),
130                },
131            });
132        }
133
134        let live_digest = live_object.digest();
135        if obj_ref.2 != live_digest {
136            return Err(IotaError::UserInput {
137                error: UserInputError::InvalidObjectDigest {
138                    object_id: obj_ref.0,
139                    expected_digest: live_digest,
140                },
141            });
142        }
143
144        Ok(())
145    }
146
147    fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
148        for (obj_ref, lock) in locks {
149            let entry = self.locked_transactions.entry(*obj_ref);
150            let mut occupied = match entry {
151                DashMapEntry::Vacant(_) => {
152                    debug_fatal!("lock must exist for object: {:?}", obj_ref);
153                    continue;
154                }
155                DashMapEntry::Occupied(occupied) => occupied,
156            };
157
158            if occupied.get().1 == *lock {
159                occupied.get_mut().0 -= 1;
160                if occupied.get().0 == 0 {
161                    trace!("clearing lock: {:?}", lock);
162                    occupied.remove();
163                }
164            } else {
165                // this is impossible because the only case in which we overwrite a
166                // lock is when the lock is from a previous epoch. but we are holding
167                // execution_lock, so the epoch cannot have changed.
168                panic!("lock was changed since we set it");
169            }
170        }
171    }
172
173    fn multi_get_objects_must_exist(
174        cache: &WritebackCache,
175        object_ids: &[ObjectID],
176    ) -> IotaResult<Vec<Object>> {
177        let objects = cache.multi_get_objects(object_ids)?;
178        let mut result = Vec::with_capacity(objects.len());
179        for (i, object) in objects.into_iter().enumerate() {
180            if let Some(object) = object {
181                result.push(object);
182            } else {
183                return Err(IotaError::UserInput {
184                    error: UserInputError::ObjectNotFound {
185                        object_id: object_ids[i],
186                        version: None,
187                    },
188                });
189            }
190        }
191        Ok(result)
192    }
193
194    #[instrument(level = "debug", skip_all)]
195    pub(crate) async fn acquire_transaction_locks(
196        &self,
197        cache: &WritebackCache,
198        epoch_store: &AuthorityPerEpochStore,
199        owned_input_objects: &[ObjectRef],
200        transaction: VerifiedSignedTransaction,
201    ) -> IotaResult {
202        let tx_digest = *transaction.digest();
203
204        let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
205        let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
206
207        // Only live objects can be locked
208        for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
209            Self::verify_live_object(obj_ref, live_object)?;
210        }
211
212        let mut locks_to_write: Vec<(_, LockDetails)> =
213            Vec::with_capacity(owned_input_objects.len());
214
215        // Sort the objects before locking. This is not required by the protocol (since
216        // it's okay to reject any equivocating tx). However, this does prevent
217        // a confusing error on the client. Consider the case:
218        //   TX1: [o1, o2];
219        //   TX2: [o2, o1];
220        // If two threads race to acquire these locks, they might both acquire the first
221        // object, then error when trying to acquire the second. The error
222        // returned to the client would say that there is a conflicting tx on
223        // that object, but in fact neither object was locked and the tx was never
224        // signed. If one client then retries, they will succeed (counterintuitively).
225        let owned_input_objects = {
226            let mut o = owned_input_objects.to_vec();
227            o.sort_by_key(|o| o.0);
228            o
229        };
230
231        // Note that this function does not have to operate atomically. If there are two
232        // racing threads, then they are either trying to lock the same
233        // transaction (in which case both will succeed), or they are trying to
234        // lock the same object in two different transactions, in which case the
235        // sender has equivocated, and we are under no obligation to help them form a
236        // cert.
237        for obj_ref in owned_input_objects.iter() {
238            match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
239                Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
240                Err(e) => {
241                    // revert all pending writes and return error
242                    // Note that reverting is not required for liveness, since a well formed and
243                    // un-equivocating txn cannot fail to acquire locks.
244                    // However, reverting is easy enough to do in this implementation that we do it
245                    // anyway.
246                    self.clear_cached_locks(&locks_to_write);
247                    return Err(e);
248                }
249            }
250        }
251
252        // commit all writes to DB
253        epoch_store
254            .tables()?
255            .write_transaction_locks(transaction, locks_to_write.iter().cloned())?;
256
257        // remove pending locks from unbounded storage
258        self.clear_cached_locks(&locks_to_write);
259
260        Ok(())
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use futures::FutureExt;
267
268    use crate::execution_cache::{
269        ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
270    };
271
272    #[tokio::test]
273    async fn test_transaction_locks_are_exclusive() {
274        telemetry_subscribers::init_for_testing();
275        Scenario::iterate(|mut s| async move {
276            s.with_created(&[1, 2, 3]);
277            s.do_tx().await;
278
279            s.with_mutated(&[1, 2, 3]);
280            s.do_tx().await;
281
282            let new1 = s.obj_ref(1);
283            let new2 = s.obj_ref(2);
284            let new3 = s.obj_ref(3);
285
286            s.with_mutated(&[1, 2, 3]); // begin forming a tx but never execute it
287            let outputs = s.take_outputs();
288
289            let tx1 = s.make_signed_transaction(&outputs.transaction);
290
291            s.cache
292                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1)
293                .await
294                .expect("locks should be available");
295
296            // this tx doesn't use the actual objects in question, but we just need
297            // something to insert into the table.
298            s.with_created(&[4, 5]);
299            let tx2 = s.take_outputs().transaction.clone();
300            let tx2 = s.make_signed_transaction(&tx2);
301
302            // both locks are held by tx1, so this should fail
303            s.cache
304                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone())
305                .await
306                .unwrap_err();
307
308            // new3 is lockable, but new2 is not, so this should fail
309            s.cache
310                .acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone())
311                .await
312                .unwrap_err();
313
314            // new3 is unlocked
315            s.cache
316                .acquire_transaction_locks(&s.epoch_store, &[new3], tx2)
317                .await
318                .expect("new3 should be unlocked");
319        })
320        .await;
321    }
322
323    #[tokio::test]
324    async fn test_transaction_locks_are_durable() {
325        telemetry_subscribers::init_for_testing();
326        Scenario::iterate(|mut s| async move {
327            s.with_created(&[1, 2]);
328            s.do_tx().await;
329
330            let old2 = s.obj_ref(2);
331
332            s.with_mutated(&[1, 2]);
333            s.do_tx().await;
334
335            let new1 = s.obj_ref(1);
336            let new2 = s.obj_ref(2);
337
338            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
339            let outputs = s.take_outputs();
340
341            let tx = s.make_signed_transaction(&outputs.transaction);
342
343            // fails because we are referring to an old object
344            s.cache
345                .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone())
346                .await
347                .unwrap_err();
348
349            // succeeds because the above call releases the lock on new1 after failing
350            // to get the lock on old2
351            s.cache
352                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx)
353                .await
354                .expect("new1 should be unlocked after revert");
355        })
356        .await;
357    }
358
359    #[tokio::test]
360    async fn test_acquire_transaction_locks_revert() {
361        telemetry_subscribers::init_for_testing();
362        Scenario::iterate(|mut s| async move {
363            s.with_created(&[1, 2]);
364            s.do_tx().await;
365
366            let old2 = s.obj_ref(2);
367
368            s.with_mutated(&[1, 2]);
369            s.do_tx().await;
370
371            let new1 = s.obj_ref(1);
372            let new2 = s.obj_ref(2);
373
374            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
375            let outputs = s.take_outputs();
376
377            let tx = s.make_signed_transaction(&outputs.transaction);
378
379            // fails because we are referring to an old object
380            s.cache
381                .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx)
382                .await
383                .unwrap_err();
384
385            // this tx doesn't use the actual objects in question, but we just need
386            // something to insert into the table.
387            s.with_created(&[4, 5]);
388            let tx2 = s.take_outputs().transaction.clone();
389            let tx2 = s.make_signed_transaction(&tx2);
390
391            // succeeds because the above call releases the lock on new1 after failing
392            // to get the lock on old2
393            s.cache
394                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2)
395                .await
396                .expect("new1 should be unlocked after revert");
397        })
398        .await;
399    }
400
401    #[tokio::test]
402    async fn test_acquire_transaction_locks_is_sync() {
403        telemetry_subscribers::init_for_testing();
404        Scenario::iterate(|mut s| async move {
405            s.with_created(&[1, 2]);
406            s.do_tx().await;
407
408            let objects: Vec<_> = vec![s.object(1), s.object(2)]
409                .into_iter()
410                .map(|o| o.compute_object_reference())
411                .collect();
412
413            s.with_mutated(&[1, 2]);
414            let outputs = s.take_outputs();
415
416            let tx2 = s.make_signed_transaction(&outputs.transaction);
417            // assert that acquire_transaction_locks is sync in non-simtest, which causes
418            // the fail_point_async! macros above to be elided
419            s.cache
420                .acquire_transaction_locks(&s.epoch_store, &objects, tx2)
421                .now_or_never()
422                .unwrap()
423                .unwrap();
424        })
425        .await;
426    }
427}