iota_core/checkpoints/checkpoint_executor/
data_ingestion_handler.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{collections::HashMap, path::PathBuf, sync::Arc};
6
7use iota_storage::blob::{Blob, BlobEncoding};
8use iota_types::{
9    digests::TransactionDigest,
10    effects::TransactionEffectsAPI,
11    error::{IotaError, IotaResult, UserInputError},
12    full_checkpoint_content::{CheckpointData, CheckpointTransaction},
13    messages_checkpoint::VerifiedCheckpoint,
14    storage::ObjectKey,
15};
16
17use crate::{
18    checkpoints::CheckpointStore,
19    execution_cache::{ObjectCacheRead, TransactionCacheRead},
20};
21
22pub(crate) fn load_checkpoint_data(
23    checkpoint: VerifiedCheckpoint,
24    object_cache_reader: &dyn ObjectCacheRead,
25    transaction_cache_reader: &dyn TransactionCacheRead,
26    checkpoint_store: Arc<CheckpointStore>,
27    transaction_digests: &[TransactionDigest],
28) -> IotaResult<CheckpointData> {
29    let checkpoint_contents = checkpoint_store
30        .get_checkpoint_contents(&checkpoint.content_digest)?
31        .expect("checkpoint content has to be stored");
32
33    let transactions = transaction_cache_reader
34        .multi_get_transaction_blocks(transaction_digests)?
35        .into_iter()
36        .zip(transaction_digests)
37        .map(|(tx, digest)| tx.ok_or(IotaError::TransactionNotFound { digest: *digest }))
38        .collect::<IotaResult<Vec<_>>>()?;
39
40    let effects = transaction_cache_reader
41        .multi_get_executed_effects(transaction_digests)?
42        .into_iter()
43        .zip(transaction_digests)
44        .map(|(effects, &digest)| effects.ok_or(IotaError::TransactionNotFound { digest }))
45        .collect::<IotaResult<Vec<_>>>()?;
46
47    let event_digests = effects
48        .iter()
49        .flat_map(|fx| fx.events_digest().copied())
50        .collect::<Vec<_>>();
51
52    let events = transaction_cache_reader
53        .multi_get_events(&event_digests)?
54        .into_iter()
55        .zip(&event_digests)
56        .map(|(event, digest)| {
57            event.ok_or(IotaError::TransactionEventsNotFound { digest: *digest })
58        })
59        .collect::<IotaResult<Vec<_>>>()?;
60
61    let events: HashMap<_, _> = event_digests.into_iter().zip(events).collect();
62    let mut full_transactions = Vec::with_capacity(transactions.len());
63    for (tx, fx) in transactions.into_iter().zip(effects) {
64        let events = fx.events_digest().map(|event_digest| {
65            events
66                .get(event_digest)
67                .cloned()
68                .expect("event was already checked to be present")
69        });
70
71        let input_object_keys = fx
72            .modified_at_versions()
73            .into_iter()
74            .map(|(object_id, version)| ObjectKey(object_id, version))
75            .collect::<Vec<_>>();
76
77        let input_objects = object_cache_reader
78            .multi_get_objects_by_key(&input_object_keys)?
79            .into_iter()
80            .zip(&input_object_keys)
81            .map(|(object, object_key)| {
82                object.ok_or(IotaError::UserInput {
83                    error: UserInputError::ObjectNotFound {
84                        object_id: object_key.0,
85                        version: Some(object_key.1),
86                    },
87                })
88            })
89            .collect::<IotaResult<Vec<_>>>()?;
90
91        let output_object_keys = fx
92            .all_changed_objects()
93            .into_iter()
94            .map(|(object_ref, _owner, _kind)| ObjectKey::from(object_ref))
95            .collect::<Vec<_>>();
96
97        let output_objects = object_cache_reader
98            .multi_get_objects_by_key(&output_object_keys)?
99            .into_iter()
100            .zip(&output_object_keys)
101            .map(|(object, object_key)| {
102                object.ok_or(IotaError::UserInput {
103                    error: UserInputError::ObjectNotFound {
104                        object_id: object_key.0,
105                        version: Some(object_key.1),
106                    },
107                })
108            })
109            .collect::<IotaResult<Vec<_>>>()?;
110
111        let full_transaction = CheckpointTransaction {
112            transaction: (*tx).clone().into(),
113            effects: fx,
114            events,
115            input_objects,
116            output_objects,
117        };
118        full_transactions.push(full_transaction);
119    }
120    let checkpoint_data = CheckpointData {
121        checkpoint_summary: checkpoint.into(),
122        checkpoint_contents,
123        transactions: full_transactions,
124    };
125    Ok(checkpoint_data)
126}
127
128pub(crate) fn store_checkpoint_locally(
129    path: PathBuf,
130    checkpoint_data: &CheckpointData,
131) -> IotaResult {
132    let file_name = format!("{}.chk", checkpoint_data.checkpoint_summary.sequence_number);
133
134    std::fs::create_dir_all(&path).map_err(|err| {
135        IotaError::FileIO(format!(
136            "failed to save full checkpoint content locally {:?}",
137            err
138        ))
139    })?;
140
141    Blob::encode(&checkpoint_data, BlobEncoding::Bcs)
142        .map_err(|_| IotaError::TransactionSerialization {
143            error: "failed to serialize full checkpoint content".to_string(),
144        }) // Map the first error
145        .and_then(|blob| {
146            std::fs::write(path.join(file_name), blob.to_bytes()).map_err(|_| {
147                IotaError::FileIO("failed to save full checkpoint content locally".to_string())
148            })
149        })?;
150
151    Ok(())
152}