iota_core/
transaction_input_loader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, sync::Arc};
6
7use iota_common::fatal;
8use iota_types::{
9    base_types::{EpochId, ObjectRef, TransactionDigest},
10    error::{IotaError, IotaResult, UserInputError},
11    storage::ObjectKey,
12    transaction::{
13        InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
14        ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
15    },
16};
17use itertools::izip;
18use once_cell::unsync::OnceCell;
19use tracing::instrument;
20
21use crate::{
22    authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertLockGuard},
23    execution_cache::ObjectCacheRead,
24};
25
26pub(crate) struct TransactionInputLoader {
27    cache: Arc<dyn ObjectCacheRead>,
28}
29
30impl TransactionInputLoader {
31    pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
32        Self { cache }
33    }
34}
35
36impl TransactionInputLoader {
37    /// Read the inputs for a transaction that the validator was asked to sign.
38    ///
39    /// tx_digest is provided so that the inputs can be cached with the
40    /// tx_digest and returned with a single hash map lookup when
41    /// notify_read_objects_for_execution is called later. TODO: implement
42    /// this caching
43    #[instrument(level = "trace", skip_all)]
44    pub fn read_objects_for_signing(
45        &self,
46        _tx_digest_for_caching: Option<&TransactionDigest>,
47        input_object_kinds: &[InputObjectKind],
48        receiving_objects: &[ObjectRef],
49        epoch_id: EpochId,
50    ) -> IotaResult<(InputObjects, ReceivingObjects)> {
51        // Length of input_object_kinds have been checked via validity_check() for
52        // ProgrammableTransaction.
53        let mut input_results = vec![None; input_object_kinds.len()];
54        let mut object_refs = Vec::with_capacity(input_object_kinds.len());
55        let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
56
57        for (i, kind) in input_object_kinds.iter().enumerate() {
58            match kind {
59                // Packages are loaded one at a time via the cache
60                InputObjectKind::MovePackage(id) => {
61                    let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
62                        return Err(IotaError::from(kind.object_not_found_error()));
63                    };
64                    input_results[i] = Some(ObjectReadResult {
65                        input_object_kind: *kind,
66                        object: ObjectReadResultKind::Object(package),
67                    });
68                }
69                InputObjectKind::SharedMoveObject { id, .. } => match self.cache.get_object(id)? {
70                    Some(object) => {
71                        input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
72                    }
73                    None => {
74                        if let Some((version, digest)) = self
75                            .cache
76                            .get_last_shared_object_deletion_info(id, epoch_id)?
77                        {
78                            input_results[i] = Some(ObjectReadResult {
79                                input_object_kind: *kind,
80                                object: ObjectReadResultKind::DeletedSharedObject(version, digest),
81                            });
82                        } else {
83                            return Err(IotaError::from(kind.object_not_found_error()));
84                        }
85                    }
86                },
87                InputObjectKind::ImmOrOwnedMoveObject(objref) => {
88                    object_refs.push(*objref);
89                    fetch_indices.push(i);
90                }
91            }
92        }
93
94        let objects = self
95            .cache
96            .multi_get_objects_with_more_accurate_error_return(&object_refs)?;
97        assert_eq!(objects.len(), object_refs.len());
98        for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
99            input_results[index] = Some(ObjectReadResult {
100                input_object_kind: input_object_kinds[index],
101                object: ObjectReadResultKind::Object(object),
102            });
103        }
104
105        let receiving_results =
106            self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
107
108        Ok((
109            input_results
110                .into_iter()
111                .map(Option::unwrap)
112                .collect::<Vec<_>>()
113                .into(),
114            receiving_results,
115        ))
116    }
117
118    /// Read the inputs for a transaction that is ready to be executed.
119    ///
120    /// epoch_store is used to resolve the versions of any shared input objects.
121    ///
122    /// This function panics if any inputs are not available, as
123    /// TransactionManager should already have verified that the transaction
124    /// is ready to be executed.
125    ///
126    /// The tx_digest is provided here to support the following optimization
127    /// (not yet implemented): All the owned input objects will likely have
128    /// been loaded during transaction signing, and can be stored as a group
129    /// with the transaction_digest as the key, allowing the lookup to
130    /// proceed with only a single hash map lookup. (additional lookups may be
131    /// necessary for shared inputs, since the versions are not known at
132    /// signing time). Receiving objects could be cached, but only with
133    /// appropriate invalidation logic for when an object is received by a
134    /// different tx first.
135    #[instrument(level = "trace", skip_all)]
136    pub fn read_objects_for_execution(
137        &self,
138        epoch_store: &Arc<AuthorityPerEpochStore>,
139        tx_key: &TransactionKey,
140        // Important to hold the _tx_lock, otherwise it would be possible for a concurrent
141        // execution of the same tx to enter this point after the first execution has
142        // finished and the shared locks have been deleted.
143        _tx_lock: &CertLockGuard,
144        input_object_kinds: &[InputObjectKind],
145        epoch_id: EpochId,
146    ) -> IotaResult<InputObjects> {
147        let assigned_shared_versions_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();
148
149        let mut results = vec![None; input_object_kinds.len()];
150        let mut object_keys = Vec::with_capacity(input_object_kinds.len());
151        let mut fetches = Vec::with_capacity(input_object_kinds.len());
152
153        for (i, input) in input_object_kinds.iter().enumerate() {
154            match input {
155                InputObjectKind::MovePackage(id) => {
156                    let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
157                        panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
158                    });
159
160                    results[i] = Some(ObjectReadResult {
161                        input_object_kind: *input,
162                        object: ObjectReadResultKind::Object(package.into()),
163                    });
164                    continue;
165                }
166                InputObjectKind::ImmOrOwnedMoveObject(objref) => {
167                    object_keys.push(objref.into());
168                    fetches.push((i, input));
169                }
170                InputObjectKind::SharedMoveObject { id, .. } => {
171                    let assigned_shared_versions = assigned_shared_versions_cell
172                        .get_or_init(|| {
173                            epoch_store
174                                .get_assigned_shared_object_versions(tx_key)
175                                .expect("loading assigned shared versions should not fail")
176                                .map(|versions| versions.into_iter().collect())
177                        })
178                        .as_ref()
179                        .unwrap_or_else(|| {
180                            // Important to hold the _tx_lock here - otherwise it would be possible
181                            // for a concurrent execution of the same tx to enter this point after
182                            // the first execution has finished and the assigned shared versions
183                            // have been deleted.
184                            fatal!(
185                                "Failed to get assigned shared versions for transaction {tx_key:?}"
186                            );
187                        });
188                    // If we find a set of assigned versions but an object is missing, it indicates
189                    // a serious inconsistency:
190                    let version = assigned_shared_versions.get(id).unwrap_or_else(|| {
191                        panic!("Shared object version should have been assigned. key: {tx_key:?}, obj id: {id:?}")
192                    });
193                    if version.is_cancelled() {
194                        // Do not need to fetch shared object for cancelled transaction.
195                        results[i] = Some(ObjectReadResult {
196                            input_object_kind: *input,
197                            object: ObjectReadResultKind::CancelledTransactionSharedObject(
198                                *version,
199                            ),
200                        })
201                    } else {
202                        object_keys.push(ObjectKey(*id, *version));
203                        fetches.push((i, input));
204                    }
205                }
206            }
207        }
208
209        let objects = self.cache.multi_get_objects_by_key(&object_keys)?;
210
211        assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
212
213        for (object, key, (index, input)) in izip!(
214            objects.into_iter(),
215            object_keys.into_iter(),
216            fetches.into_iter()
217        ) {
218            results[index] = Some(match (object, input) {
219                (Some(obj), input_object_kind) => ObjectReadResult {
220                    input_object_kind: *input_object_kind,
221                    object: obj.into(),
222                },
223                (None, InputObjectKind::SharedMoveObject { id, .. }) => {
224                    assert!(key.1.is_valid());
225                    // Check if the object was deleted by a concurrently certified tx
226                    let version = key.1;
227                    if let Some(dependency) = self
228                        .cache
229                        .get_deleted_shared_object_previous_tx_digest(id, version, epoch_id)?
230                    {
231                        ObjectReadResult {
232                            input_object_kind: *input,
233                            object: ObjectReadResultKind::DeletedSharedObject(version, dependency),
234                        }
235                    } else {
236                        panic!(
237                            "All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {}, version: {version} is absent in epoch {epoch_id}",
238                            *id
239                        );
240                    }
241                }
242                _ => panic!(
243                    "All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"
244                ),
245            });
246        }
247
248        Ok(results
249            .into_iter()
250            .map(Option::unwrap)
251            .collect::<Vec<_>>()
252            .into())
253    }
254}
255
256// private methods
257impl TransactionInputLoader {
258    fn read_receiving_objects_for_signing(
259        &self,
260        receiving_objects: &[ObjectRef],
261        epoch_id: EpochId,
262    ) -> IotaResult<ReceivingObjects> {
263        let mut receiving_results = Vec::with_capacity(receiving_objects.len());
264        for objref in receiving_objects {
265            // Note: the digest is checked later in check_transaction_input
266            let (object_id, version, _) = objref;
267
268            if self
269                .cache
270                .have_received_object_at_version(object_id, *version, epoch_id)?
271            {
272                receiving_results.push(ReceivingObjectReadResult::new(
273                    *objref,
274                    ReceivingObjectReadResultKind::PreviouslyReceivedObject,
275                ));
276                continue;
277            }
278
279            let Some(object) = self.cache.get_object(object_id)? else {
280                return Err(UserInputError::ObjectNotFound {
281                    object_id: *object_id,
282                    version: Some(*version),
283                }
284                .into());
285            };
286
287            receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
288        }
289        Ok(receiving_results.into())
290    }
291}