1use std::{collections::HashMap, sync::Arc};
6
7use iota_common::fatal;
8use iota_types::{
9 base_types::{EpochId, ObjectRef, TransactionDigest},
10 error::{IotaError, IotaResult, UserInputError},
11 storage::{GetSharedLocks, ObjectKey},
12 transaction::{
13 InputObjectKind, InputObjects, ObjectReadResult, ObjectReadResultKind,
14 ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey,
15 },
16};
17use itertools::izip;
18use once_cell::unsync::OnceCell;
19use tracing::instrument;
20
21use crate::{
22 authority::authority_per_epoch_store::CertLockGuard, execution_cache::ObjectCacheRead,
23};
24
25pub(crate) struct TransactionInputLoader {
26 cache: Arc<dyn ObjectCacheRead>,
27}
28
29impl TransactionInputLoader {
30 pub fn new(cache: Arc<dyn ObjectCacheRead>) -> Self {
31 Self { cache }
32 }
33}
34
35impl TransactionInputLoader {
36 #[instrument(level = "trace", skip_all)]
43 pub fn read_objects_for_signing(
44 &self,
45 _tx_digest_for_caching: Option<&TransactionDigest>,
46 input_object_kinds: &[InputObjectKind],
47 receiving_objects: &[ObjectRef],
48 epoch_id: EpochId,
49 ) -> IotaResult<(InputObjects, ReceivingObjects)> {
50 let mut input_results = vec![None; input_object_kinds.len()];
53 let mut object_refs = Vec::with_capacity(input_object_kinds.len());
54 let mut fetch_indices = Vec::with_capacity(input_object_kinds.len());
55
56 for (i, kind) in input_object_kinds.iter().enumerate() {
57 match kind {
58 InputObjectKind::MovePackage(id) => {
60 let Some(package) = self.cache.get_package_object(id)?.map(|o| o.into()) else {
61 return Err(IotaError::from(kind.object_not_found_error()));
62 };
63 input_results[i] = Some(ObjectReadResult {
64 input_object_kind: *kind,
65 object: ObjectReadResultKind::Object(package),
66 });
67 }
68 InputObjectKind::SharedMoveObject { id, .. } => match self.cache.get_object(id)? {
69 Some(object) => {
70 input_results[i] = Some(ObjectReadResult::new(*kind, object.into()))
71 }
72 None => {
73 if let Some((version, digest)) = self
74 .cache
75 .get_last_shared_object_deletion_info(id, epoch_id)?
76 {
77 input_results[i] = Some(ObjectReadResult {
78 input_object_kind: *kind,
79 object: ObjectReadResultKind::DeletedSharedObject(version, digest),
80 });
81 } else {
82 return Err(IotaError::from(kind.object_not_found_error()));
83 }
84 }
85 },
86 InputObjectKind::ImmOrOwnedMoveObject(objref) => {
87 object_refs.push(*objref);
88 fetch_indices.push(i);
89 }
90 }
91 }
92
93 let objects = self
94 .cache
95 .multi_get_objects_with_more_accurate_error_return(&object_refs)?;
96 assert_eq!(objects.len(), object_refs.len());
97 for (index, object) in fetch_indices.into_iter().zip(objects.into_iter()) {
98 input_results[index] = Some(ObjectReadResult {
99 input_object_kind: input_object_kinds[index],
100 object: ObjectReadResultKind::Object(object),
101 });
102 }
103
104 let receiving_results =
105 self.read_receiving_objects_for_signing(receiving_objects, epoch_id)?;
106
107 Ok((
108 input_results
109 .into_iter()
110 .map(Option::unwrap)
111 .collect::<Vec<_>>()
112 .into(),
113 receiving_results,
114 ))
115 }
116
117 #[instrument(level = "trace", skip_all)]
136 pub fn read_objects_for_execution(
137 &self,
138 shared_lock_store: &impl GetSharedLocks,
139 tx_key: &TransactionKey,
140 _tx_lock: &CertLockGuard,
144 input_object_kinds: &[InputObjectKind],
145 epoch_id: EpochId,
146 ) -> IotaResult<InputObjects> {
147 let shared_locks_cell: OnceCell<Option<HashMap<_, _>>> = OnceCell::new();
148
149 let mut results = vec![None; input_object_kinds.len()];
150 let mut object_keys = Vec::with_capacity(input_object_kinds.len());
151 let mut fetches = Vec::with_capacity(input_object_kinds.len());
152
153 for (i, input) in input_object_kinds.iter().enumerate() {
154 match input {
155 InputObjectKind::MovePackage(id) => {
156 let package = self.cache.get_package_object(id)?.unwrap_or_else(|| {
157 panic!("Executable transaction {tx_key:?} depends on non-existent package {id:?}")
158 });
159
160 results[i] = Some(ObjectReadResult {
161 input_object_kind: *input,
162 object: ObjectReadResultKind::Object(package.into()),
163 });
164 continue;
165 }
166 InputObjectKind::ImmOrOwnedMoveObject(objref) => {
167 object_keys.push(objref.into());
168 fetches.push((i, input));
169 }
170 InputObjectKind::SharedMoveObject { id, .. } => {
171 let shared_locks = shared_locks_cell
172 .get_or_init(|| {
173 shared_lock_store
174 .get_shared_locks(tx_key)
175 .expect("loading shared locks should not fail")
176 .map(|locks| locks.into_iter().collect())
177 })
178 .as_ref()
179 .unwrap_or_else(|| {
180 fatal!("Failed to get shared locks for transaction {tx_key:?}");
182 });
183 let version = shared_locks.get(id).unwrap_or_else(|| {
186 panic!("Shared object locks should have been set. key: {tx_key:?}, obj id: {id:?}")
187 });
188 if version.is_cancelled() {
189 results[i] = Some(ObjectReadResult {
191 input_object_kind: *input,
192 object: ObjectReadResultKind::CancelledTransactionSharedObject(
193 *version,
194 ),
195 })
196 } else {
197 object_keys.push(ObjectKey(*id, *version));
198 fetches.push((i, input));
199 }
200 }
201 }
202 }
203
204 let objects = self.cache.multi_get_objects_by_key(&object_keys)?;
205
206 assert!(objects.len() == object_keys.len() && objects.len() == fetches.len());
207
208 for (object, key, (index, input)) in izip!(
209 objects.into_iter(),
210 object_keys.into_iter(),
211 fetches.into_iter()
212 ) {
213 results[index] = Some(match (object, input) {
214 (Some(obj), input_object_kind) => ObjectReadResult {
215 input_object_kind: *input_object_kind,
216 object: obj.into(),
217 },
218 (None, InputObjectKind::SharedMoveObject { id, .. }) => {
219 assert!(key.1.is_valid());
220 let version = key.1;
222 if let Some(dependency) = self
223 .cache
224 .get_deleted_shared_object_previous_tx_digest(id, version, epoch_id)?
225 {
226 ObjectReadResult {
227 input_object_kind: *input,
228 object: ObjectReadResultKind::DeletedSharedObject(version, dependency),
229 }
230 } else {
231 panic!(
232 "All dependencies of tx {tx_key:?} should have been executed now, but Shared Object id: {}, version: {version} is absent in epoch {epoch_id}",
233 *id
234 );
235 }
236 }
237 _ => panic!(
238 "All dependencies of tx {tx_key:?} should have been executed now, but obj {key:?} is absent"
239 ),
240 });
241 }
242
243 Ok(results
244 .into_iter()
245 .map(Option::unwrap)
246 .collect::<Vec<_>>()
247 .into())
248 }
249}
250
251impl TransactionInputLoader {
253 fn read_receiving_objects_for_signing(
254 &self,
255 receiving_objects: &[ObjectRef],
256 epoch_id: EpochId,
257 ) -> IotaResult<ReceivingObjects> {
258 let mut receiving_results = Vec::with_capacity(receiving_objects.len());
259 for objref in receiving_objects {
260 let (object_id, version, _) = objref;
262
263 if self
264 .cache
265 .have_received_object_at_version(object_id, *version, epoch_id)?
266 {
267 receiving_results.push(ReceivingObjectReadResult::new(
268 *objref,
269 ReceivingObjectReadResultKind::PreviouslyReceivedObject,
270 ));
271 continue;
272 }
273
274 let Some(object) = self.cache.get_object(object_id)? else {
275 return Err(UserInputError::ObjectNotFound {
276 object_id: *object_id,
277 version: Some(*version),
278 }
279 .into());
280 };
281
282 receiving_results.push(ReceivingObjectReadResult::new(*objref, object.into()));
283 }
284 Ok(receiving_results.into())
285 }
286}