iota_core/execution_cache/
object_locks.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use dashmap::{DashMap, mapref::entry::Entry as DashMapEntry};
6use iota_common::debug_fatal;
7use iota_types::{
8    base_types::{ObjectID, ObjectRef},
9    error::{IotaError, IotaResult, UserInputError},
10    object::Object,
11    storage::ObjectStore,
12    transaction::VerifiedSignedTransaction,
13};
14use tracing::{debug, info, instrument, trace};
15
16use super::writeback_cache::WritebackCache;
17use crate::authority::authority_per_epoch_store::{AuthorityPerEpochStore, LockDetails};
18
19type RefCount = usize;
20
21pub(super) struct ObjectLocks {
22    // When acquire transaction locks, lock entries are briefly inserted into this map. The map
23    // exists to provide atomic test-and-set operations on the locks. After all locks have been
24    // inserted into the map, they are written to the db, and then all locks are removed from
25    // the map.
26    //
27    // After a transaction has been executed, newly created objects are available to be locked.
28    // But, because of crash recovery, we cannot rule out that a lock may already exist in the db
29    // for those objects. Therefore we do a db read for each object we are locking.
30    //
31    // TODO: find a strategy to allow us to avoid db reads for each object.
32    locked_transactions: DashMap<ObjectRef, (RefCount, LockDetails)>,
33}
34
35impl ObjectLocks {
36    pub fn new() -> Self {
37        Self {
38            locked_transactions: DashMap::new(),
39        }
40    }
41
42    pub(crate) fn get_transaction_lock(
43        &self,
44        obj_ref: &ObjectRef,
45        epoch_store: &AuthorityPerEpochStore,
46    ) -> IotaResult<Option<LockDetails>> {
47        // We don't consult the in-memory state here. We are only interested in state
48        // that has been committed to the db. This is because in memory state is
49        // reverted if the transaction is not successfully locked.
50        epoch_store.tables()?.get_locked_transaction(obj_ref)
51    }
52
53    /// Attempts to atomically test-and-set a transaction lock on an object.
54    /// If the lock is already set to a conflicting transaction, an error is
55    /// returned. If the lock is not set, or is already set to the same
56    /// transaction, the lock is set.
57    pub(crate) fn try_set_transaction_lock(
58        &self,
59        obj_ref: &ObjectRef,
60        new_lock: LockDetails,
61        epoch_store: &AuthorityPerEpochStore,
62    ) -> IotaResult {
63        // entry holds a lock on the dashmap shard, so this function operates atomically
64        let entry = self.locked_transactions.entry(*obj_ref);
65
66        // TODO: currently, the common case for this code is that we will miss the cache
67        // and read from the db. It is difficult to implement negative caching, since we
68        // may have restarted, in which case there could be locks in the db that we do
69        // not have in the cache. We may want to explore strategies for proving there
70        // cannot be a lock in the db that we do not know about. Two possibilities are:
71        //
72        // 1. Read all locks into memory at startup (and keep them there). The lifetime
73        //    of locks is relatively short in the common case, so this might be
74        //    feasible.
75        // 2. Find some strategy to distinguish between the cases where we are
76        //    re-executing old transactions after restarting vs executing transactions
77        //    that we have never seen before. The output objects of novel transactions
78        //    cannot previously have been locked on this validator.
79        //
80        // Solving this is not terribly important as it is not in the execution path,
81        // and hence only improves the latency of transaction signing, not
82        // transaction execution
83        let prev_lock = match entry {
84            DashMapEntry::Vacant(vacant) => {
85                let tables = epoch_store.tables()?;
86                if let Some(lock_details) = tables.get_locked_transaction(obj_ref)? {
87                    trace!("read lock from db: {:?}", lock_details);
88                    vacant.insert((1, lock_details));
89                    lock_details
90                } else {
91                    trace!("set lock: {:?}", new_lock);
92                    vacant.insert((1, new_lock));
93                    new_lock
94                }
95            }
96            DashMapEntry::Occupied(mut occupied) => {
97                occupied.get_mut().0 += 1;
98                occupied.get().1
99            }
100        };
101
102        if prev_lock != new_lock {
103            debug!("lock conflict detected: {:?} != {:?}", prev_lock, new_lock);
104            Err(IotaError::ObjectLockConflict {
105                obj_ref: *obj_ref,
106                pending_transaction: prev_lock,
107            })
108        } else {
109            Ok(())
110        }
111    }
112
113    pub(crate) fn clear(&self) {
114        info!("clearing old transaction locks");
115        self.locked_transactions.clear();
116    }
117
118    fn verify_live_object(obj_ref: &ObjectRef, live_object: &Object) -> IotaResult {
119        debug_assert_eq!(obj_ref.0, live_object.id());
120        if obj_ref.1 != live_object.version() {
121            debug!(
122                "object version unavailable for consumption: {:?} (current: {})",
123                obj_ref,
124                live_object.version()
125            );
126            return Err(IotaError::UserInput {
127                error: UserInputError::ObjectVersionUnavailableForConsumption {
128                    provided_obj_ref: *obj_ref,
129                    current_version: live_object.version(),
130                },
131            });
132        }
133
134        let live_digest = live_object.digest();
135        if obj_ref.2 != live_digest {
136            debug!("object digest mismatch: {:?} vs {:?}", obj_ref, live_digest);
137            return Err(IotaError::UserInput {
138                error: UserInputError::InvalidObjectDigest {
139                    object_id: obj_ref.0,
140                    expected_digest: live_digest,
141                },
142            });
143        }
144
145        Ok(())
146    }
147
148    fn clear_cached_locks(&self, locks: &[(ObjectRef, LockDetails)]) {
149        for (obj_ref, lock) in locks {
150            let entry = self.locked_transactions.entry(*obj_ref);
151            let mut occupied = match entry {
152                DashMapEntry::Vacant(_) => {
153                    debug_fatal!("lock must exist for object: {:?}", obj_ref);
154                    continue;
155                }
156                DashMapEntry::Occupied(occupied) => occupied,
157            };
158
159            if occupied.get().1 == *lock {
160                occupied.get_mut().0 -= 1;
161                if occupied.get().0 == 0 {
162                    trace!("clearing lock: {:?}", lock);
163                    occupied.remove();
164                }
165            } else {
166                // this is impossible because the only case in which we overwrite a
167                // lock is when the lock is from a previous epoch. but we are holding
168                // execution_lock, so the epoch cannot have changed.
169                panic!("lock was changed since we set it");
170            }
171        }
172    }
173
174    fn multi_get_objects_must_exist(
175        cache: &WritebackCache,
176        object_ids: &[ObjectID],
177    ) -> IotaResult<Vec<Object>> {
178        let objects = cache.multi_get_objects(object_ids)?;
179        let mut result = Vec::with_capacity(objects.len());
180        for (i, object) in objects.into_iter().enumerate() {
181            if let Some(object) = object {
182                result.push(object);
183            } else {
184                return Err(IotaError::UserInput {
185                    error: UserInputError::ObjectNotFound {
186                        object_id: object_ids[i],
187                        version: None,
188                    },
189                });
190            }
191        }
192        Ok(result)
193    }
194
195    #[instrument(level = "debug", skip_all)]
196    pub(crate) async fn acquire_transaction_locks(
197        &self,
198        cache: &WritebackCache,
199        epoch_store: &AuthorityPerEpochStore,
200        owned_input_objects: &[ObjectRef],
201        transaction: VerifiedSignedTransaction,
202    ) -> IotaResult {
203        let tx_digest = *transaction.digest();
204
205        let object_ids = owned_input_objects.iter().map(|o| o.0).collect::<Vec<_>>();
206        let live_objects = Self::multi_get_objects_must_exist(cache, &object_ids)?;
207
208        // Only live objects can be locked
209        for (obj_ref, live_object) in owned_input_objects.iter().zip(live_objects.iter()) {
210            Self::verify_live_object(obj_ref, live_object)?;
211        }
212
213        let mut locks_to_write: Vec<(_, LockDetails)> =
214            Vec::with_capacity(owned_input_objects.len());
215
216        // Sort the objects before locking. This is not required by the protocol (since
217        // it's okay to reject any equivocating tx). However, this does prevent
218        // a confusing error on the client. Consider the case:
219        //   TX1: [o1, o2];
220        //   TX2: [o2, o1];
221        // If two threads race to acquire these locks, they might both acquire the first
222        // object, then error when trying to acquire the second. The error
223        // returned to the client would say that there is a conflicting tx on
224        // that object, but in fact neither object was locked and the tx was never
225        // signed. If one client then retries, they will succeed (counterintuitively).
226        let owned_input_objects = {
227            let mut o = owned_input_objects.to_vec();
228            o.sort_by_key(|o| o.0);
229            o
230        };
231
232        // Note that this function does not have to operate atomically. If there are two
233        // racing threads, then they are either trying to lock the same
234        // transaction (in which case both will succeed), or they are trying to
235        // lock the same object in two different transactions, in which case the
236        // sender has equivocated, and we are under no obligation to help them form a
237        // cert.
238        for obj_ref in owned_input_objects.iter() {
239            match self.try_set_transaction_lock(obj_ref, tx_digest, epoch_store) {
240                Ok(()) => locks_to_write.push((*obj_ref, tx_digest)),
241                Err(e) => {
242                    // revert all pending writes and return error
243                    // Note that reverting is not required for liveness, since a well formed and
244                    // un-equivocating txn cannot fail to acquire locks.
245                    // However, reverting is easy enough to do in this implementation that we do it
246                    // anyway.
247                    self.clear_cached_locks(&locks_to_write);
248                    return Err(e);
249                }
250            }
251        }
252
253        // commit all writes to DB
254        epoch_store
255            .tables()?
256            .write_transaction_locks(transaction, locks_to_write.iter().cloned())?;
257
258        // remove pending locks from unbounded storage
259        self.clear_cached_locks(&locks_to_write);
260
261        Ok(())
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use futures::FutureExt;
268
269    use crate::execution_cache::{
270        ExecutionCacheWrite, writeback_cache::writeback_cache_tests::Scenario,
271    };
272
273    #[tokio::test]
274    async fn test_transaction_locks_are_exclusive() {
275        telemetry_subscribers::init_for_testing();
276        Scenario::iterate(|mut s| async move {
277            s.with_created(&[1, 2, 3]);
278            s.do_tx().await;
279
280            s.with_mutated(&[1, 2, 3]);
281            s.do_tx().await;
282
283            let new1 = s.obj_ref(1);
284            let new2 = s.obj_ref(2);
285            let new3 = s.obj_ref(3);
286
287            s.with_mutated(&[1, 2, 3]); // begin forming a tx but never execute it
288            let outputs = s.take_outputs();
289
290            let tx1 = s.make_signed_transaction(&outputs.transaction);
291
292            s.cache
293                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx1)
294                .await
295                .expect("locks should be available");
296
297            // this tx doesn't use the actual objects in question, but we just need
298            // something to insert into the table.
299            s.with_created(&[4, 5]);
300            let tx2 = s.take_outputs().transaction.clone();
301            let tx2 = s.make_signed_transaction(&tx2);
302
303            // both locks are held by tx1, so this should fail
304            s.cache
305                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2.clone())
306                .await
307                .unwrap_err();
308
309            // new3 is lockable, but new2 is not, so this should fail
310            s.cache
311                .acquire_transaction_locks(&s.epoch_store, &[new3, new2], tx2.clone())
312                .await
313                .unwrap_err();
314
315            // new3 is unlocked
316            s.cache
317                .acquire_transaction_locks(&s.epoch_store, &[new3], tx2)
318                .await
319                .expect("new3 should be unlocked");
320        })
321        .await;
322    }
323
324    #[tokio::test]
325    async fn test_transaction_locks_are_durable() {
326        telemetry_subscribers::init_for_testing();
327        Scenario::iterate(|mut s| async move {
328            s.with_created(&[1, 2]);
329            s.do_tx().await;
330
331            let old2 = s.obj_ref(2);
332
333            s.with_mutated(&[1, 2]);
334            s.do_tx().await;
335
336            let new1 = s.obj_ref(1);
337            let new2 = s.obj_ref(2);
338
339            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
340            let outputs = s.take_outputs();
341
342            let tx = s.make_signed_transaction(&outputs.transaction);
343
344            // fails because we are referring to an old object
345            s.cache
346                .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx.clone())
347                .await
348                .unwrap_err();
349
350            // succeeds because the above call releases the lock on new1 after failing
351            // to get the lock on old2
352            s.cache
353                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx)
354                .await
355                .expect("new1 should be unlocked after revert");
356        })
357        .await;
358    }
359
360    #[tokio::test]
361    async fn test_acquire_transaction_locks_revert() {
362        telemetry_subscribers::init_for_testing();
363        Scenario::iterate(|mut s| async move {
364            s.with_created(&[1, 2]);
365            s.do_tx().await;
366
367            let old2 = s.obj_ref(2);
368
369            s.with_mutated(&[1, 2]);
370            s.do_tx().await;
371
372            let new1 = s.obj_ref(1);
373            let new2 = s.obj_ref(2);
374
375            s.with_mutated(&[1, 2]); // begin forming a tx but never execute it
376            let outputs = s.take_outputs();
377
378            let tx = s.make_signed_transaction(&outputs.transaction);
379
380            // fails because we are referring to an old object
381            s.cache
382                .acquire_transaction_locks(&s.epoch_store, &[new1, old2], tx)
383                .await
384                .unwrap_err();
385
386            // this tx doesn't use the actual objects in question, but we just need
387            // something to insert into the table.
388            s.with_created(&[4, 5]);
389            let tx2 = s.take_outputs().transaction.clone();
390            let tx2 = s.make_signed_transaction(&tx2);
391
392            // succeeds because the above call releases the lock on new1 after failing
393            // to get the lock on old2
394            s.cache
395                .acquire_transaction_locks(&s.epoch_store, &[new1, new2], tx2)
396                .await
397                .expect("new1 should be unlocked after revert");
398        })
399        .await;
400    }
401
402    #[tokio::test]
403    async fn test_acquire_transaction_locks_is_sync() {
404        telemetry_subscribers::init_for_testing();
405        Scenario::iterate(|mut s| async move {
406            s.with_created(&[1, 2]);
407            s.do_tx().await;
408
409            let objects: Vec<_> = vec![s.object(1), s.object(2)]
410                .into_iter()
411                .map(|o| o.compute_object_reference())
412                .collect();
413
414            s.with_mutated(&[1, 2]);
415            let outputs = s.take_outputs();
416
417            let tx2 = s.make_signed_transaction(&outputs.transaction);
418            // assert that acquire_transaction_locks is sync in non-simtest, which causes
419            // the fail_point_async! macros above to be elided
420            s.cache
421                .acquire_transaction_locks(&s.epoch_store, &objects, tx2)
422                .now_or_never()
423                .unwrap()
424                .unwrap();
425        })
426        .await;
427    }
428}