iota_core/checkpoints/checkpoint_executor/
data_ingestion_handler.rs1use std::{collections::HashMap, path::PathBuf, sync::Arc};
6
7use iota_storage::blob::{Blob, BlobEncoding};
8use iota_types::{
9 digests::TransactionDigest,
10 effects::TransactionEffectsAPI,
11 error::{IotaError, IotaResult, UserInputError},
12 full_checkpoint_content::{CheckpointData, CheckpointTransaction},
13 messages_checkpoint::VerifiedCheckpoint,
14 storage::ObjectKey,
15};
16
17use crate::{
18 checkpoints::CheckpointStore,
19 execution_cache::{ObjectCacheRead, TransactionCacheRead},
20};
21
22pub(crate) fn load_checkpoint_data(
23 checkpoint: VerifiedCheckpoint,
24 object_cache_reader: &dyn ObjectCacheRead,
25 transaction_cache_reader: &dyn TransactionCacheRead,
26 checkpoint_store: Arc<CheckpointStore>,
27 transaction_digests: &[TransactionDigest],
28) -> IotaResult<CheckpointData> {
29 let checkpoint_contents = checkpoint_store
30 .get_checkpoint_contents(&checkpoint.content_digest)?
31 .expect("checkpoint content has to be stored");
32
33 let transactions = transaction_cache_reader
34 .multi_get_transaction_blocks(transaction_digests)?
35 .into_iter()
36 .zip(transaction_digests)
37 .map(|(tx, digest)| tx.ok_or(IotaError::TransactionNotFound { digest: *digest }))
38 .collect::<IotaResult<Vec<_>>>()?;
39
40 let effects = transaction_cache_reader
41 .multi_get_executed_effects(transaction_digests)?
42 .into_iter()
43 .zip(transaction_digests)
44 .map(|(effects, &digest)| effects.ok_or(IotaError::TransactionNotFound { digest }))
45 .collect::<IotaResult<Vec<_>>>()?;
46
47 let event_digests = effects
48 .iter()
49 .flat_map(|fx| fx.events_digest().copied())
50 .collect::<Vec<_>>();
51
52 let events = transaction_cache_reader
53 .multi_get_events(&event_digests)?
54 .into_iter()
55 .zip(&event_digests)
56 .map(|(event, digest)| {
57 event.ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
58 })
59 .collect::<IotaResult<Vec<_>>>()?;
60
61 let events: HashMap<_, _> = event_digests.into_iter().zip(events).collect();
62 let mut full_transactions = Vec::with_capacity(transactions.len());
63 for (tx, fx) in transactions.into_iter().zip(effects) {
64 let events = fx.events_digest().map(|event_digest| {
65 events
66 .get(event_digest)
67 .cloned()
68 .expect("event was already checked to be present")
69 });
70
71 let input_object_keys = fx
72 .modified_at_versions()
73 .into_iter()
74 .map(|(object_id, version)| ObjectKey(object_id, version))
75 .collect::<Vec<_>>();
76
77 let input_objects = object_cache_reader
78 .multi_get_objects_by_key(&input_object_keys)?
79 .into_iter()
80 .zip(&input_object_keys)
81 .map(|(object, object_key)| {
82 object.ok_or(IotaError::UserInput {
83 error: UserInputError::ObjectNotFound {
84 object_id: object_key.0,
85 version: Some(object_key.1),
86 },
87 })
88 })
89 .collect::<IotaResult<Vec<_>>>()?;
90
91 let output_object_keys = fx
92 .all_changed_objects()
93 .into_iter()
94 .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
95 .collect::<Vec<_>>();
96
97 let output_objects = object_cache_reader
98 .multi_get_objects_by_key(&output_object_keys)?
99 .into_iter()
100 .zip(&output_object_keys)
101 .map(|(object, object_key)| {
102 object.ok_or(IotaError::UserInput {
103 error: UserInputError::ObjectNotFound {
104 object_id: object_key.0,
105 version: Some(object_key.1),
106 },
107 })
108 })
109 .collect::<IotaResult<Vec<_>>>()?;
110
111 let full_transaction = CheckpointTransaction {
112 transaction: (*tx).clone().into(),
113 effects: fx,
114 events,
115 input_objects,
116 output_objects,
117 };
118 full_transactions.push(full_transaction);
119 }
120 let checkpoint_data = CheckpointData {
121 checkpoint_summary: checkpoint.into(),
122 checkpoint_contents,
123 transactions: full_transactions,
124 };
125 Ok(checkpoint_data)
126}
127
128pub(crate) fn store_checkpoint_locally(
129 path: PathBuf,
130 checkpoint_data: &CheckpointData,
131) -> IotaResult {
132 let file_name = format!("{}.chk", checkpoint_data.checkpoint_summary.sequence_number);
133
134 std::fs::create_dir_all(&path).map_err(|err| {
135 IotaError::FileIO(format!(
136 "failed to save full checkpoint content locally {:?}",
137 err
138 ))
139 })?;
140
141 Blob::encode(&checkpoint_data, BlobEncoding::Bcs)
142 .map_err(|_| IotaError::TransactionSerialization {
143 error: "failed to serialize full checkpoint content".to_string(),
144 }) .and_then(|blob| {
146 std::fs::write(path.join(file_name), blob.to_bytes()).map_err(|_| {
147 IotaError::FileIO("failed to save full checkpoint content locally".to_string())
148 })
149 })?;
150
151 Ok(())
152}