1use std::{collections::HashMap, sync::Arc};
6
7use iota_common::fatal;
8use iota_types::{
9 base_types::{EpochId, ObjectRef, TransactionDigest},
10 error::{IotaError, IotaResult, UserInputError},
11 storage::ObjectKey,
12 transaction::{
13 InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
14 ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
15 },
16};
17use itertools::izip;
18use once_cell::unsync::OnceCell;
19use tracing::instrument;
20
21use crate::{
22 authority::authority_per_epoch_store::{AuthorityPerEpochStore, CertLockGuard},
23 execution_cache::ObjectCacheRead,
24};
25
26pub(crate) struct TransactionInputLoader {
27 cache: Arc<dyn ObjectCacheRead>,
28}
29
30impl TransactionInputLoader {
31 pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
32 Self { cache }
33 }
34}
35
36impl TransactionInputLoader {
37 #[instrument(level = "trace", skip_all)]
44 pub fn read_objects_for_signing(
45 &self,
46 _tx_digest_for_caching: Option<&TransactionDigest>,
47 input_object_kinds: &[InputObjectKind],
48 receiving_objects: &[ObjectRef],
49 epoch_id: EpochId,
50 ) -> IotaResult<(InputObjects, ReceivingObjects)> {
51 let mut input_results = vec![None; input_object_kinds.len()];
54 let mut object_refs = Vec::with_capacity(input_object_kinds.len());
55 let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
56
57 for (i, kind) in input_object_kinds.iter().enumerate() {
58 match kind {
59 InputObjectKind::MovePackage(id) => {
61 let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
62 return Err(IotaError::from(kind.object_not_found_error()));
63 };
64 input_results[i] = Some(ObjectReadResult {
65 input_object_kind: *kind,
66 object: ObjectReadResultKind::Object(package),
67 });
68 }
69 InputObjectKind::SharedMoveObject { id, .. } => match self.cache.get_object(id)? {
70 Some(object) => {
71 input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
72 }
73 None => {
74 if let Some((version, digest)) = self
75 .cache
76 .get_last_shared_object_deletion_info(id, epoch_id)?
77 {
78 input_results[i] = Some(ObjectReadResult {
79 input_object_kind: *kind,
80 object: ObjectReadResultKind::DeletedSharedObject(version, digest),
81 });
82 } else {
83 return Err(IotaError::from(kind.object_not_found_error()));
84 }
85 }
86 },
87 InputObjectKind::ImmOrOwnedMoveObject(objref) => {
88 object_refs.push(*objref);
89 fetch_indices.push(i);
90 }
91 }
92 }
93
94 let objects = self
95 .cache
96 .multi_get_objects_with_more_accurate_error_return(&object_refs)?;
97 assert_eq!(objects.len(), object_refs.len());
98 for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
99 input_results[index] = Some(ObjectReadResult {
100 input_object_kind: input_object_kinds[index],
101 object: ObjectReadResultKind::Object(object),
102 });
103 }
104
105 let receiving_results =
106 self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
107
108 Ok((
109 input_results
110 .into_iter()
111 .map(Option::unwrap)
112 .collect::<Vec<_>>()
113 .into(),
114 receiving_results,
115 ))
116 }
117
118 #[instrument(level = "trace", skip_all)]
136 pub fn read_objects_for_execution(
137 &self,
138 epoch_store: &Arc<AuthorityPerEpochStore>,
139 tx_key: &TransactionKey,
140 _tx_lock: &CertLockGuard,
144 input_object_kinds: &[InputObjectKind],
145 epoch_id: EpochId,
146 ) -> IotaResult<InputObjects> {
147 let assigned_shared_versions_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();
148
149 let mut results = vec![None; input_object_kinds.len()];
150 let mut object_keys = Vec::with_capacity(input_object_kinds.len());
151 let mut fetches = Vec::with_capacity(input_object_kinds.len());
152
153 for (i, input) in input_object_kinds.iter().enumerate() {
154 match input {
155 InputObjectKind::MovePackage(id) => {
156 let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
157 panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
158 });
159
160 results[i] = Some(ObjectReadResult {
161 input_object_kind: *input,
162 object: ObjectReadResultKind::Object(package.into()),
163 });
164 continue;
165 }
166 InputObjectKind::ImmOrOwnedMoveObject(objref) => {
167 object_keys.push(objref.into());
168 fetches.push((i, input));
169 }
170 InputObjectKind::SharedMoveObject { id, .. } => {
171 let assigned_shared_versions = assigned_shared_versions_cell
172 .get_or_init(|| {
173 epoch_store
174 .get_assigned_shared_object_versions(tx_key)
175 .expect("loading assigned shared versions should not fail")
176 .map(|versions| versions.into_iter().collect())
177 })
178 .as_ref()
179 .unwrap_or_else(|| {
180 fatal!(
185 "Failed to get assigned shared versions for transaction {tx_key:?}"
186 );
187 });
188 let version = assigned_shared_versions.get(id).unwrap_or_else(|| {
191 panic!("Shared object version should have been assigned. key: {tx_key:?}, obj id: {id:?}")
192 });
193 if version.is_cancelled() {
194 results[i] = Some(ObjectReadResult {
196 input_object_kind: *input,
197 object: ObjectReadResultKind::CancelledTransactionSharedObject(
198 *version,
199 ),
200 })
201 } else {
202 object_keys.push(ObjectKey(*id, *version));
203 fetches.push((i, input));
204 }
205 }
206 }
207 }
208
209 let objects = self.cache.multi_get_objects_by_key(&object_keys)?;
210
211 assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
212
213 for (object, key, (index, input)) in izip!(
214 objects.into_iter(),
215 object_keys.into_iter(),
216 fetches.into_iter()
217 ) {
218 results[index] = Some(match (object, input) {
219 (Some(obj), input_object_kind) => ObjectReadResult {
220 input_object_kind: *input_object_kind,
221 object: obj.into(),
222 },
223 (None, InputObjectKind::SharedMoveObject { id, .. }) => {
224 assert!(key.1.is_valid());
225 let version = key.1;
227 if let Some(dependency) = self
228 .cache
229 .get_deleted_shared_object_previous_tx_digest(id, version, epoch_id)?
230 {
231 ObjectReadResult {
232 input_object_kind: *input,
233 object: ObjectReadResultKind::DeletedSharedObject(version, dependency),
234 }
235 } else {
236 panic!(
237 "All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {}, version: {version} is absent in epoch {epoch_id}",
238 *id
239 );
240 }
241 }
242 _ => panic!(
243 "All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"
244 ),
245 });
246 }
247
248 Ok(results
249 .into_iter()
250 .map(Option::unwrap)
251 .collect::<Vec<_>>()
252 .into())
253 }
254}
255
256impl TransactionInputLoader {
258 fn read_receiving_objects_for_signing(
259 &self,
260 receiving_objects: &[ObjectRef],
261 epoch_id: EpochId,
262 ) -> IotaResult<ReceivingObjects> {
263 let mut receiving_results = Vec::with_capacity(receiving_objects.len());
264 for objref in receiving_objects {
265 let (object_id, version, _) = objref;
267
268 if self
269 .cache
270 .have_received_object_at_version(object_id, *version, epoch_id)?
271 {
272 receiving_results.push(ReceivingObjectReadResult::new(
273 *objref,
274 ReceivingObjectReadResultKind::PreviouslyReceivedObject,
275 ));
276 continue;
277 }
278
279 let Some(object) = self.cache.get_object(object_id)? else {
280 return Err(UserInputError::ObjectNotFound {
281 object_id: *object_id,
282 version: Some(*version),
283 }
284 .into());
285 };
286
287 receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
288 }
289 Ok(receiving_results.into())
290 }
291}