iota_core/execution_cache/
object_locks.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
6use iota_common::debug_fatal;
7use iota_types::{
8    base_types::{ObjectID, ObjectRef},
9    error::{IotaError, IotaResult, UserInputError},
10    object::Object,
11    storage::ObjectStore,
12    transaction::VerifiedSignedTransaction,
13};
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
18
19type RefCount = usize;
20
21pub(super) struct ObjectLocks {
22    // When acquire transaction locks, lock entries are briefly inserted into this map. The map
23    // exists to provide atomic test-and-set operations on the locks. After all locks have been
24    // inserted into the map, they are written to the db, and then all locks are removed from
25    // the map.
26    //
27    // After a transaction has been executed, newly created objects are available to be locked.
28    // But, because of crash recovery, we cannot rule out that a lock may already exist in the db
29    // for those objects. Therefore we do a db read for each object we are locking.
30    //
31    // TODO: find a strategy to allow us to avoid db reads for each object.
32    locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
33}
34
35impl ObjectLocks {
36    pub fn new() -> Self {
37        Self {
38            locked_transactions: DashMap::new(),
39        }
40    }
41
42    pub(crate) fn get_transaction_lock(
43        &self,
44        obj_ref: &ObjectRef,
45        epoch_store: &AuthorityPerEpochStore,
46    ) -> IotaResult<Option<LockDetails>> {
47        // We don't consult the in-memory state here. We are only interested in state
48        // that has been committed to the db. This is because in memory state is
49        // reverted if the transaction is not successfully locked.
50        epoch_store.tables()?.get_locked_transaction(obj_ref)
51    }
52
53    /// Attempts to atomically test-and-set a transaction lock on an object.
54    /// If the lock is already set to a conflicting transaction, an error is
55    /// returned. If the lock is not set, or is already set to the same
56    /// transaction, the lock is set.
57    pub(crate) fn try_set_transaction_lock(
58        &self,
59        obj_ref: &ObjectRef,
60        new_lock: LockDetails,
61        epoch_store: &AuthorityPerEpochStore,
62    ) -> IotaResult {
63        // entry holds a lock on the dashmap shard, so this function operates atomically
64        let entry = self.locked_transactions.entry(*obj_ref);
65
66        // TODO: currently, the common case for this code is that we will miss the cache
67        // and read from the db. It is difficult to implement negative caching, since we
68        // may have restarted, in which case there could be locks in the db that we do
69        // not have in the cache. We may want to explore strategies for proving there
70        // cannot be a lock in the db that we do not know about. Two possibilities are:
71        //
72        // 1. Read all locks into memory at startup (and keep them there). The lifetime
73        //    of locks is relatively short in the common case, so this might be
74        //    feasible.
75        // 2. Find some strategy to distinguish between the cases where we are
76        //    re-executing old transactions after restarting vs executing transactions
77        //    that we have never seen before. The output objects of novel transactions
78        //    cannot previously have been locked on this validator.
79        //
80        // Solving this is not terribly important as it is not in the execution path,
81        // and hence only improves the latency of transaction signing, not
82        // transaction execution
83        let prev_lock = match entry {
84            DashMapEntry::Vacant(vacant) => {
85                let tables = epoch_store.tables()?;
86                if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
87                    trace!("read lock from db: {:?}", lock_details);
88                    vacant.insert((1, lock_details));
89                    lock_details
90                } else {
91                    trace!("set lock: {:?}", new_lock);
92                    vacant.insert((1, new_lock));
93                    new_lock
94                }
95            }
96            DashMapEntry::Occupied(mut occupied) => {
97                occupied.get_mut().0 += 1;
98                occupied.get().1
99            }
100        };
101
102        if prev_lock != new_lock {
103            debug!(
104                "lock conflict detected for {:?}: {:?} != {:?}",
105                obj_ref, prev_lock, new_lock
106            );
107            Err(IotaError::ObjectLockConflict {
108                obj_ref: *obj_ref,
109                pending_transaction: prev_lock,
110            })
111        } else {
112            Ok(())
113        }
114    }
115
116    pub(crate) fn clear(&self) {
117        info!("clearing old transaction locks");
118        self.locked_transactions.clear();
119    }
120
121    fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> IotaResult {
122        debug_assert_eq!(obj_ref.0, live_object.id());
123        if obj_ref.1 != live_object.version() {
124            debug!(
125                "object version unavailable for consumption: {:?} (current: {})",
126                obj_ref,
127                live_object.version()
128            );
129            return Err(IotaError::UserInput {
130                error: UserInputError::ObjectVersionUnavailableForConsumption {
131                    provided_obj_ref: *obj_ref,
132                    current_version: live_object.version(),
133                },
134            });
135        }
136
137        let live_digest = live_object.digest();
138        if obj_ref.2 != live_digest {
139            return Err(IotaError::UserInput {
140                error: UserInputError::InvalidObjectDigest {
141                    object_id: obj_ref.0,
142                    expected_digest: live_digest,
143                },
144            });
145        }
146
147        Ok(())
148    }
149
150    fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
151        for (obj_ref, lock) in locks {
152            let entry = self.locked_transactions.entry(*obj_ref);
153            let mut occupied = match entry {
154                DashMapEntry::Vacant(_) => {
155                    debug_fatal!("lock must exist for object: {:?}", obj_ref);
156                    continue;
157                }
158                DashMapEntry::Occupied(occupied) => occupied,
159            };
160
161            if occupied.get().1 == *lock {
162                occupied.get_mut().0 -= 1;
163                if occupied.get().0 == 0 {
164                    trace!("clearing lock: {:?}", lock);
165                    occupied.remove();
166                }
167            } else {
168                // this is impossible because the only case in which we overwrite a
169                // lock is when the lock is from a previous epoch. but we are holding
170                // execution_lock, so the epoch cannot have changed.
171                panic!("lock was changed since we set it");
172            }
173        }
174    }
175
176    fn multi_get_objects_must_exist(
177        cache: &WritebackCache,
178        object_ids: &[ObjectID],
179    ) -> IotaResult<Vec<Object>> {
180        let objects = cache.try_multi_get_objects(object_ids)?;
181        let mut result = Vec::with_capacity(objects.len());
182        for (i, object) in objects.into_iter().enumerate() {
183            if let Some(object) = object {
184                result.push(object);
185            } else {
186                return Err(IotaError::UserInput {
187                    error: UserInputError::ObjectNotFound {
188                        object_id: object_ids[i],
189                        version: None,
190                    },
191                });
192            }
193        }
194        Ok(result)
195    }
196
197    #[instrument(level = "debug", skip_all)]
198    pub(crate) fn acquire_transaction_locks(
199        &self,
200        cache: &WritebackCache,
201        epoch_store: &AuthorityPerEpochStore,
202        owned_input_objects: &[ObjectRef],
203        transaction: VerifiedSignedTransaction,
204    ) -> IotaResult {
205        let tx_digest = *transaction.digest();
206
207        let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
208        let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
209
210        // Only live objects can be locked
211        for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
212            Self::verify_live_object(obj_ref, live_object)?;
213        }
214
215        let mut locks_to_write: Vec<(_, LockDetails)> =
216            Vec::with_capacity(owned_input_objects.len());
217
218        // Sort the objects before locking. This is not required by the protocol (since
219        // it's okay to reject any equivocating tx). However, this does prevent
220        // a confusing error on the client. Consider the case:
221        //   TX1: [o1, o2];
222        //   TX2: [o2, o1];
223        // If two threads race to acquire these locks, they might both acquire the first
224        // object, then error when trying to acquire the second. The error
225        // returned to the client would say that there is a conflicting tx on
226        // that object, but in fact neither object was locked and the tx was never
227        // signed. If one client then retries, they will succeed (counterintuitively).
228        let owned_input_objects = {
229            let mut o = owned_input_objects.to_vec();
230            o.sort_by_key(|o| o.0);
231            o
232        };
233
234        // Note that this function does not have to operate atomically. If there are two
235        // racing threads, then they are either trying to lock the same
236        // transaction (in which case both will succeed), or they are trying to
237        // lock the same object in two different transactions, in which case the
238        // sender has equivocated, and we are under no obligation to help them form a
239        // cert.
240        for obj_ref in owned_input_objects.iter() {
241            match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
242                Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
243                Err(e) => {
244                    // revert all pending writes and return error
245                    // Note that reverting is not required for liveness, since a well formed and
246                    // un-equivocating txn cannot fail to acquire locks.
247                    // However, reverting is easy enough to do in this implementation that we do it
248                    // anyway.
249                    self.clear_cached_locks(&locks_to_write);
250                    return Err(e);
251                }
252            }
253        }
254
255        // commit all writes to DB
256        epoch_store
257            .tables()?
258            .write_transaction_locks(transaction, locks_to_write.iter().cloned())?;
259
260        // remove pending locks from unbounded storage
261        self.clear_cached_locks(&locks_to_write);
262
263        Ok(())
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use crate::execution_cache::{
270        ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
271    };
272
273    #[tokio::test]
274    async fn test_transaction_locks_are_exclusive() {
275        telemetry_subscribers::init_for_testing();
276        Scenario::iterate(|mut s| async move {
277            s.with_created(&[1, 2, 3]);
278            s.do_tx().await;
279
280            s.with_mutated(&[1, 2, 3]);
281            s.do_tx().await;
282
283            let new1 = s.obj_ref(1);
284            let new2 = s.obj_ref(2);
285            let new3 = s.obj_ref(3);
286
287            s.with_mutated(&[1, 2, 3]); // begin forming a tx but never execute it
288            let outputs = s.take_outputs();
289
290            let tx1 = s.make_signed_transaction(&outputs.transaction);
291
292            s.cache
293                .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1)
294                .expect("locks should be available");
295
296            // this tx doesn't use the actual objects in question, but we just need
297            // something to insert into the table.
298            s.with_created(&[4, 5]);
299            let tx2 = s.take_outputs().transaction.clone();
300            let tx2 = s.make_signed_transaction(&tx2);
301
302            // both locks are held by tx1, so this should fail
303            s.cache
304                .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone())
305                .unwrap_err();
306
307            // new3 is lockable, but new2 is not, so this should fail
308            s.cache
309                .try_acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone())
310                .unwrap_err();
311
312            // new3 is unlocked
313            s.cache
314                .try_acquire_transaction_locks(&s.epoch_store, &[new3], tx2)
315                .expect("new3 should be unlocked");
316        })
317        .await;
318    }
319
320    #[tokio::test]
321    async fn test_transaction_locks_are_durable() {
322        telemetry_subscribers::init_for_testing();
323        Scenario::iterate(|mut s| async move {
324            s.with_created(&[1, 2]);
325            s.do_tx().await;
326
327            let old2 = s.obj_ref(2);
328
329            s.with_mutated(&[1, 2]);
330            s.do_tx().await;
331
332            let new1 = s.obj_ref(1);
333            let new2 = s.obj_ref(2);
334
335            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
336            let outputs = s.take_outputs();
337
338            let tx = s.make_signed_transaction(&outputs.transaction);
339
340            // fails because we are referring to an old object
341            s.cache
342                .try_acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone())
343                .unwrap_err();
344
345            // succeeds because the above call releases the lock on new1 after failing
346            // to get the lock on old2
347            s.cache
348                .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx)
349                .expect("new1 should be unlocked after revert");
350        })
351        .await;
352    }
353
354    #[tokio::test]
355    async fn test_acquire_transaction_locks_revert() {
356        telemetry_subscribers::init_for_testing();
357        Scenario::iterate(|mut s| async move {
358            s.with_created(&[1, 2]);
359            s.do_tx().await;
360
361            let old2 = s.obj_ref(2);
362
363            s.with_mutated(&[1, 2]);
364            s.do_tx().await;
365
366            let new1 = s.obj_ref(1);
367            let new2 = s.obj_ref(2);
368
369            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
370            let outputs = s.take_outputs();
371
372            let tx = s.make_signed_transaction(&outputs.transaction);
373
374            // fails because we are referring to an old object
375            s.cache
376                .try_acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx)
377                .unwrap_err();
378
379            // this tx doesn't use the actual objects in question, but we just need
380            // something to insert into the table.
381            s.with_created(&[4, 5]);
382            let tx2 = s.take_outputs().transaction.clone();
383            let tx2 = s.make_signed_transaction(&tx2);
384
385            // succeeds because the above call releases the lock on new1 after failing
386            // to get the lock on old2
387            s.cache
388                .try_acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2)
389                .expect("new1 should be unlocked after revert");
390        })
391        .await;
392    }
393
394    #[tokio::test]
395    async fn test_acquire_transaction_locks_is_sync() {
396        telemetry_subscribers::init_for_testing();
397        Scenario::iterate(|mut s| async move {
398            s.with_created(&[1, 2]);
399            s.do_tx().await;
400
401            let objects: Vec<_> = vec![s.object(1), s.object(2)]
402                .into_iter()
403                .map(|o| o.compute_object_reference())
404                .collect();
405
406            s.with_mutated(&[1, 2]);
407            let outputs = s.take_outputs();
408
409            let tx2 = s.make_signed_transaction(&outputs.transaction);
410            // assert that acquire_transaction_locks is sync in non-simtest, which causes
411            // the fail_point_async! macros above to be elided
412            s.cache
413                .try_acquire_transaction_locks(&s.epoch_store, &objects, tx2)
414                .unwrap();
415        })
416        .await;
417    }
418}