1#![allow(dead_code)]
6
7#[cfg(test)]
8mod tests;
9
10pub mod reader;
11pub mod uploader;
12mod writer;
13
14use std::{
15 path::PathBuf,
16 sync::{
17 Arc,
18 atomic::{AtomicU64, Ordering},
19 },
20 time::Duration,
21};
22
23use anyhow::Result;
24use fastcrypto::hash::MultisetHash;
25use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
26use iota_core::{
27 authority::{
28 authority_store_tables::{AuthorityPerpetualTables, LiveObject},
29 epoch_start_configuration::{EpochFlag, EpochStartConfiguration},
30 },
31 checkpoints::CheckpointStore,
32 epoch::committee_store::CommitteeStore,
33 state_accumulator::WrappedObject,
34};
35use iota_storage::{
36 FileCompression, SHA3_BYTES, compute_sha3_checksum, object_store::util::path_to_filesystem,
37};
38use iota_types::{
39 accumulator::Accumulator,
40 base_types::ObjectID,
41 iota_system_state::{
42 IotaSystemStateTrait, epoch_start_iota_system_state::EpochStartSystemStateTrait,
43 get_iota_system_state,
44 },
45 messages_checkpoint::ECMHLiveObjectSetDigest,
46};
47use num_enum::{IntoPrimitive, TryFromPrimitive};
48use object_store::path::Path;
49use serde::{Deserialize, Serialize};
50use tokio::time::Instant;
51
52const OBJECT_FILE_MAGIC: u32 = 0x00B7EC75;
129const REFERENCE_FILE_MAGIC: u32 = 0xDEADBEEF;
130const MANIFEST_FILE_MAGIC: u32 = 0x00C0FFEE;
131const MAGIC_BYTES: usize = 4;
132const SNAPSHOT_VERSION_BYTES: usize = 1;
133const ADDRESS_LENGTH_BYTES: usize = 8;
134const PADDING_BYTES: usize = 3;
135const MANIFEST_FILE_HEADER_BYTES: usize =
136 MAGIC_BYTES + SNAPSHOT_VERSION_BYTES + ADDRESS_LENGTH_BYTES + PADDING_BYTES;
137const FILE_MAX_BYTES: usize = 128 * 1024 * 1024;
138const OBJECT_ID_BYTES: usize = ObjectID::LENGTH;
139const SEQUENCE_NUM_BYTES: usize = 8;
140const OBJECT_DIGEST_BYTES: usize = 32;
141const OBJECT_REF_BYTES: usize = OBJECT_ID_BYTES + SEQUENCE_NUM_BYTES + OBJECT_DIGEST_BYTES;
142const FILE_TYPE_BYTES: usize = 1;
143const BUCKET_BYTES: usize = 4;
144const BUCKET_PARTITION_BYTES: usize = 4;
145const COMPRESSION_TYPE_BYTES: usize = 1;
146const FILE_METADATA_BYTES: usize =
147 FILE_TYPE_BYTES + BUCKET_BYTES + BUCKET_PARTITION_BYTES + COMPRESSION_TYPE_BYTES + SHA3_BYTES;
148
149#[derive(
150 Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, TryFromPrimitive, IntoPrimitive,
151)]
152#[repr(u8)]
153pub enum FileType {
154 Object = 0,
155 Reference,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
159pub struct FileMetadata {
161 pub file_type: FileType,
162 pub bucket_num: u32,
163 pub part_num: u32,
164 pub file_compression: FileCompression,
165 pub sha3_digest: [u8; 32],
166}
167
168impl FileMetadata {
169 pub fn file_path(&self, dir_path: &Path) -> Path {
170 match self.file_type {
171 FileType::Object => {
172 dir_path.child(&*format!("{}_{}.obj", self.bucket_num, self.part_num))
173 }
174 FileType::Reference => {
175 dir_path.child(&*format!("{}_{}.ref", self.bucket_num, self.part_num))
176 }
177 }
178 }
179 pub fn local_file_path(&self, root_path: &std::path::Path, dir_path: &Path) -> Result<PathBuf> {
180 path_to_filesystem(root_path.to_path_buf(), &self.file_path(dir_path))
181 }
182}
183
184#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
185pub struct ManifestV1 {
186 pub snapshot_version: u8,
187 pub address_length: u64,
188 pub file_metadata: Vec<FileMetadata>,
189 pub epoch: u64,
190}
191
192#[derive(Debug, Serialize, Deserialize, Eq, PartialEq)]
193pub enum Manifest {
194 V1(ManifestV1),
195}
196
197impl Manifest {
198 pub fn snapshot_version(&self) -> u8 {
199 match self {
200 Self::V1(manifest) => manifest.snapshot_version,
201 }
202 }
203 pub fn address_length(&self) -> u64 {
204 match self {
205 Self::V1(manifest) => manifest.address_length,
206 }
207 }
208 pub fn file_metadata(&self) -> &Vec<FileMetadata> {
209 match self {
210 Self::V1(manifest) => &manifest.file_metadata,
211 }
212 }
213 pub fn epoch(&self) -> u64 {
214 match self {
215 Self::V1(manifest) => manifest.epoch,
216 }
217 }
218}
219
220pub fn create_file_metadata(
223 file_path: &std::path::Path,
224 file_compression: FileCompression,
225 file_type: FileType,
226 bucket_num: u32,
227 part_num: u32,
228) -> Result<FileMetadata> {
229 file_compression.compress(file_path)?;
231 let sha3_digest = compute_sha3_checksum(file_path)?;
233 let file_metadata = FileMetadata {
234 file_type,
235 bucket_num,
236 part_num,
237 file_compression,
238 sha3_digest,
239 };
240 Ok(file_metadata)
241}
242
243pub async fn setup_db_state(
244 epoch: u64,
245 accumulator: Accumulator,
246 perpetual_db: Arc<AuthorityPerpetualTables>,
247 checkpoint_store: Arc<CheckpointStore>,
248 committee_store: Arc<CommitteeStore>,
249 verify: bool,
250 num_live_objects: u64,
251 m: MultiProgress,
252) -> Result<()> {
253 let system_state_object = get_iota_system_state(&perpetual_db)?;
256 let new_epoch_start_state = system_state_object.into_epoch_start_state();
257 let next_epoch_committee = new_epoch_start_state.get_iota_committee();
258 let root_digest: ECMHLiveObjectSetDigest = accumulator.digest().into();
259 let last_checkpoint = checkpoint_store
260 .get_epoch_last_checkpoint(epoch)
261 .expect("Error loading last checkpoint for current epoch")
262 .expect("Could not load last checkpoint for current epoch");
263 let flags = EpochFlag::default_for_no_config();
264 let epoch_start_configuration = EpochStartConfiguration::new(
265 new_epoch_start_state,
266 *last_checkpoint.digest(),
267 &perpetual_db,
268 flags,
269 )
270 .unwrap();
271 perpetual_db.set_epoch_start_configuration(&epoch_start_configuration)?;
272 perpetual_db.insert_root_state_hash(epoch, last_checkpoint.sequence_number, accumulator)?;
273 perpetual_db.set_highest_pruned_checkpoint_without_wb(last_checkpoint.sequence_number)?;
274 committee_store.insert_new_committee(&next_epoch_committee)?;
275 checkpoint_store.update_highest_executed_checkpoint(&last_checkpoint)?;
276
277 if verify {
278 let iter = perpetual_db.iter_live_object_set();
279 let local_digest = ECMHLiveObjectSetDigest::from(
280 accumulate_live_object_iter(Box::new(iter), m.clone(), num_live_objects)
281 .await
282 .digest(),
283 );
284 assert_eq!(
285 root_digest, local_digest,
286 "End of epoch {} root state digest {} does not match \
287 local root state hash {} after restoring db from formal snapshot",
288 epoch, root_digest.digest, local_digest.digest,
289 );
290 println!("DB live object state verification completed successfully!");
291 }
292
293 Ok(())
294}
295
296pub async fn accumulate_live_object_iter(
297 iter: Box<dyn Iterator<Item = LiveObject> + '_>,
298 m: MultiProgress,
299 num_live_objects: u64,
300) -> Accumulator {
301 let accum_progress_bar = m.add(ProgressBar::new(num_live_objects).with_style(
303 ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len} ({msg})").unwrap(),
304 ));
305 let accum_counter = Arc::new(AtomicU64::new(0));
306 let cloned_accum_counter = accum_counter.clone();
307 let cloned_progress_bar = accum_progress_bar.clone();
308 let handle = tokio::spawn(async move {
309 let a_instant = Instant::now();
310 loop {
311 if cloned_progress_bar.is_finished() {
312 break;
313 }
314 let num_accumulated = cloned_accum_counter.load(Ordering::Relaxed);
315 assert!(
316 num_accumulated <= num_live_objects,
317 "Accumulated more objects (at least {num_accumulated}) than expected ({num_live_objects})"
318 );
319 let accumulations_per_sec = num_accumulated as f64 / a_instant.elapsed().as_secs_f64();
320 cloned_progress_bar.set_position(num_accumulated);
321 cloned_progress_bar.set_message(format!(
322 "DB live obj accumulations per sec: {}",
323 accumulations_per_sec
324 ));
325 tokio::time::sleep(Duration::from_secs(1)).await;
326 }
327 });
328
329 let mut acc = Accumulator::default();
331 for live_object in iter {
332 match live_object {
333 LiveObject::Normal(object) => {
334 acc.insert(object.compute_object_reference().2);
335 }
336 LiveObject::Wrapped(key) => {
337 acc.insert(
338 bcs::to_bytes(&WrappedObject::new(key.0, key.1))
339 .expect("Failed to serialize WrappedObject"),
340 );
341 }
342 }
343 accum_counter.fetch_add(1, Ordering::Relaxed);
344 }
345 accum_progress_bar.finish_with_message("DB live object accumulation completed");
346 handle
347 .await
348 .expect("Failed to join live object accumulation progress monitor");
349 acc
350}