iota_core/
transaction_input_loader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use iota_common::fatal;
8use iota_types::{
9    base_types::{EpochId, ObjectRef, TransactionDigest},
10    error::{IotaError, IotaResult, UserInputError},
11    storage::{GetSharedLocks, ObjectKey},
12    transaction::{
13        InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
14        ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
15    },
16};
17use itertools::izip;
18use once_cell::unsync::OnceCell;
19use tracing::instrument;
20
21use crate::{
22    authority::authority_per_epoch_store::CertLockGuard, execution_cache::ObjectCacheRead,
23};
24
25pub(crate) struct TransactionInputLoader {
26    cache: Arc<dyn ObjectCacheRead>,
27}
28
29impl TransactionInputLoader {
30    pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
31        Self { cache }
32    }
33}
34
35impl TransactionInputLoader {
36    /// Read the inputs for a transaction that the validator was asked to sign.
37    ///
38    /// tx_digest is provided so that the inputs can be cached with the
39    /// tx_digest and returned with a single hash map lookup when
40    /// notify_read_objects_for_execution is called later. TODO: implement
41    /// this caching
42    #[instrument(level = "trace", skip_all)]
43    pub fn read_objects_for_signing(
44        &self,
45        _tx_digest_for_caching: Option<&TransactionDigest>,
46        input_object_kinds: &[InputObjectKind],
47        receiving_objects: &[ObjectRef],
48        epoch_id: EpochId,
49    ) -> IotaResult<(InputObjects, ReceivingObjects)> {
50        // Length of input_object_kinds have been checked via validity_check() for
51        // ProgrammableTransaction.
52        let mut input_results = vec![None; input_object_kinds.len()];
53        let mut object_refs = Vec::with_capacity(input_object_kinds.len());
54        let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
55
56        for (i, kind) in input_object_kinds.iter().enumerate() {
57            match kind {
58                // Packages are loaded one at a time via the cache
59                InputObjectKind::MovePackage(id) => {
60                    let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
61                        return Err(IotaError::from(kind.object_not_found_error()));
62                    };
63                    input_results[i] = Some(ObjectReadResult {
64                        input_object_kind: *kind,
65                        object: ObjectReadResultKind::Object(package),
66                    });
67                }
68                InputObjectKind::SharedMoveObject { id, .. } => match self.cache.get_object(id)? {
69                    Some(object) => {
70                        input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
71                    }
72                    None => {
73                        if let Some((version, digest)) = self
74                            .cache
75                            .get_last_shared_object_deletion_info(id, epoch_id)?
76                        {
77                            input_results[i] = Some(ObjectReadResult {
78                                input_object_kind: *kind,
79                                object: ObjectReadResultKind::DeletedSharedObject(version, digest),
80                            });
81                        } else {
82                            return Err(IotaError::from(kind.object_not_found_error()));
83                        }
84                    }
85                },
86                InputObjectKind::ImmOrOwnedMoveObject(objref) => {
87                    object_refs.push(*objref);
88                    fetch_indices.push(i);
89                }
90            }
91        }
92
93        let objects = self
94            .cache
95            .multi_get_objects_with_more_accurate_error_return(&object_refs)?;
96        assert_eq!(objects.len(), object_refs.len());
97        for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
98            input_results[index] = Some(ObjectReadResult {
99                input_object_kind: input_object_kinds[index],
100                object: ObjectReadResultKind::Object(object),
101            });
102        }
103
104        let receiving_results =
105            self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
106
107        Ok((
108            input_results
109                .into_iter()
110                .map(Option::unwrap)
111                .collect::<Vec<_>>()
112                .into(),
113            receiving_results,
114        ))
115    }
116
117    /// Read the inputs for a transaction that is ready to be executed.
118    ///
119    /// shared_lock_store is used to resolve the versions of any shared input
120    /// objects.
121    ///
122    /// This function panics if any inputs are not available, as
123    /// TransactionManager should already have verified that the transaction
124    /// is ready to be executed.
125    ///
126    /// The tx_digest is provided here to support the following optimization
127    /// (not yet implemented): All the owned input objects will likely have
128    /// been loaded during transaction signing, and can be stored as a group
129    /// with the transaction_digest as the key, allowing the lookup to
130    /// proceed with only a single hash map lookup. (additional lookups may be
131    /// necessary for shared inputs, since the versions are not known at
132    /// signing time). Receiving objects could be cached, but only with
133    /// appropriate invalidation logic for when an object is received by a
134    /// different tx first.
135    #[instrument(level = "trace", skip_all)]
136    pub fn read_objects_for_execution(
137        &self,
138        shared_lock_store: &impl GetSharedLocks,
139        tx_key: &TransactionKey,
140        // Important to hold the _tx_lock, otherwise it would be possible for a concurrent
141        // execution of the same tx to enter this point after the first execution has
142        // finished and the shared locks have been deleted.
143        _tx_lock: &CertLockGuard,
144        input_object_kinds: &[InputObjectKind],
145        epoch_id: EpochId,
146    ) -> IotaResult<InputObjects> {
147        let shared_locks_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();
148
149        let mut results = vec![None; input_object_kinds.len()];
150        let mut object_keys = Vec::with_capacity(input_object_kinds.len());
151        let mut fetches = Vec::with_capacity(input_object_kinds.len());
152
153        for (i, input) in input_object_kinds.iter().enumerate() {
154            match input {
155                InputObjectKind::MovePackage(id) => {
156                    let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
157                        panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
158                    });
159
160                    results[i] = Some(ObjectReadResult {
161                        input_object_kind: *input,
162                        object: ObjectReadResultKind::Object(package.into()),
163                    });
164                    continue;
165                }
166                InputObjectKind::ImmOrOwnedMoveObject(objref) => {
167                    object_keys.push(objref.into());
168                    fetches.push((i, input));
169                }
170                InputObjectKind::SharedMoveObject { id, .. } => {
171                    let shared_locks = shared_locks_cell
172                        .get_or_init(|| {
173                            shared_lock_store
174                                .get_shared_locks(tx_key)
175                                .expect("loading shared locks should not fail")
176                                .map(|locks| locks.into_iter().collect())
177                        })
178                        .as_ref()
179                        .unwrap_or_else(|| {
180                            // _tx_lock is held, so this should not happen
181                            fatal!("Failed to get shared locks for transaction {tx_key:?}");
182                        });
183                    // If we find a set of locks but an object is missing, it indicates a serious
184                    // inconsistency:
185                    let version = shared_locks.get(id).unwrap_or_else(|| {
186                        panic!("Shared object locks should have been set. key: {tx_key:?}, obj id: {id:?}")
187                    });
188                    if version.is_cancelled() {
189                        // Do not need to fetch shared object for cancelled transaction.
190                        results[i] = Some(ObjectReadResult {
191                            input_object_kind: *input,
192                            object: ObjectReadResultKind::CancelledTransactionSharedObject(
193                                *version,
194                            ),
195                        })
196                    } else {
197                        object_keys.push(ObjectKey(*id, *version));
198                        fetches.push((i, input));
199                    }
200                }
201            }
202        }
203
204        let objects = self.cache.multi_get_objects_by_key(&object_keys)?;
205
206        assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
207
208        for (object, key, (index, input)) in izip!(
209            objects.into_iter(),
210            object_keys.into_iter(),
211            fetches.into_iter()
212        ) {
213            results[index] = Some(match (object, input) {
214                (Some(obj), input_object_kind) => ObjectReadResult {
215                    input_object_kind: *input_object_kind,
216                    object: obj.into(),
217                },
218                (None, InputObjectKind::SharedMoveObject { id, .. }) => {
219                    assert!(key.1.is_valid());
220                    // Check if the object was deleted by a concurrently certified tx
221                    let version = key.1;
222                    if let Some(dependency) = self
223                        .cache
224                        .get_deleted_shared_object_previous_tx_digest(id, version, epoch_id)?
225                    {
226                        ObjectReadResult {
227                            input_object_kind: *input,
228                            object: ObjectReadResultKind::DeletedSharedObject(version, dependency),
229                        }
230                    } else {
231                        panic!(
232                            "All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {}, version: {version} is absent in epoch {epoch_id}",
233                            *id
234                        );
235                    }
236                }
237                _ => panic!(
238                    "All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"
239                ),
240            });
241        }
242
243        Ok(results
244            .into_iter()
245            .map(Option::unwrap)
246            .collect::<Vec<_>>()
247            .into())
248    }
249}
250
251// private methods
252impl TransactionInputLoader {
253    fn read_receiving_objects_for_signing(
254        &self,
255        receiving_objects: &[ObjectRef],
256        epoch_id: EpochId,
257    ) -> IotaResult<ReceivingObjects> {
258        let mut receiving_results = Vec::with_capacity(receiving_objects.len());
259        for objref in receiving_objects {
260            // Note: the digest is checked later in check_transaction_input
261            let (object_id, version, _) = objref;
262
263            if self
264                .cache
265                .have_received_object_at_version(object_id, *version, epoch_id)?
266            {
267                receiving_results.push(ReceivingObjectReadResult::new(
268                    *objref,
269                    ReceivingObjectReadResultKind::PreviouslyReceivedObject,
270                ));
271                continue;
272            }
273
274            let Some(object) = self.cache.get_object(object_id)? else {
275                return Err(UserInputError::ObjectNotFound {
276                    object_id: *object_id,
277                    version: Some(*version),
278                }
279                .into());
280            };
281
282            receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
283        }
284        Ok(receiving_results.into())
285    }
286}