1#![allow(dead_code)]
6
7use std::{
8 collections::{HashMap, hash_map::Entry::Vacant},
9 fs,
10 fs::{File, OpenOptions},
11 io::{BufWriter, Seek, SeekFrom, Write},
12 num::NonZeroUsize,
13 path::PathBuf,
14 sync::Arc,
15};
16
17use anyhow::{Context, Result};
18use byteorder::{BigEndian, ByteOrder};
19use fastcrypto::hash::MultisetHash;
20use futures::StreamExt;
21use integer_encoding::VarInt;
22use iota_config::object_storage_config::ObjectStoreConfig;
23use iota_core::{
24 authority::authority_store_tables::{AuthorityPerpetualTables, LiveObject},
25 state_accumulator::StateAccumulator,
26};
27use iota_storage::{
28 blob::{BLOB_ENCODING_BYTES, Blob, BlobEncoding},
29 object_store::util::{copy_file, delete_recursively, path_to_filesystem},
30};
31use iota_types::{
32 accumulator::Accumulator,
33 base_types::{ObjectID, ObjectRef},
34 messages_checkpoint::ECMHLiveObjectSetDigest,
35};
36use object_store::{DynObjectStore, path::Path};
37use tokio::{
38 sync::{
39 mpsc,
40 mpsc::{Receiver, Sender},
41 },
42 task::JoinHandle,
43};
44use tokio_stream::wrappers::ReceiverStream;
45use tracing::debug;
46
47use crate::{
48 FILE_MAX_BYTES, FileCompression, FileMetadata, FileType, MAGIC_BYTES, MANIFEST_FILE_MAGIC,
49 Manifest, ManifestV1, OBJECT_FILE_MAGIC, OBJECT_REF_BYTES, REFERENCE_FILE_MAGIC,
50 SEQUENCE_NUM_BYTES, compute_sha3_checksum, create_file_metadata,
51};
52
53struct LiveObjectSetWriterV1 {
56 dir_path: PathBuf,
57 bucket_num: u32,
58 current_part_num: u32,
59 obj_wbuf: BufWriter<File>,
60 ref_wbuf: BufWriter<File>,
61 object_file_size: usize,
62 files: Vec<FileMetadata>,
63 sender: Option<Sender<FileMetadata>>,
64 file_compression: FileCompression,
65}
66
67impl LiveObjectSetWriterV1 {
68 fn new(
69 dir_path: PathBuf,
70 bucket_num: u32,
71 file_compression: FileCompression,
72 sender: Sender<FileMetadata>,
73 ) -> Result<Self> {
74 let part_num = 1;
75 let (n, obj_file) = Self::object_file(dir_path.clone(), bucket_num, part_num)?;
76 let ref_file = Self::ref_file(dir_path.clone(), bucket_num, part_num)?;
77 Ok(LiveObjectSetWriterV1 {
78 dir_path,
79 bucket_num,
80 current_part_num: part_num,
81 obj_wbuf: BufWriter::new(obj_file),
82 ref_wbuf: BufWriter::new(ref_file),
83 object_file_size: n,
84 files: vec![],
85 sender: Some(sender),
86 file_compression,
87 })
88 }
89
90 pub fn write(&mut self, object: &LiveObject) -> Result<()> {
93 let object_reference = object.object_reference();
94 self.write_object(object)?;
95 self.write_object_ref(&object_reference)?;
96 Ok(())
97 }
98
99 pub fn done(mut self) -> Result<Vec<FileMetadata>> {
102 self.finalize_obj()?;
103 self.finalize_ref()?;
104 self.sender = None;
105 Ok(self.files.clone())
106 }
107
108 fn object_file(dir_path: PathBuf, bucket_num: u32, part_num: u32) -> Result<(usize, File)> {
111 let next_part_file_path = dir_path.join(format!("{bucket_num}_{part_num}.obj"));
112 let next_part_file_tmp_path = dir_path.join(format!("{bucket_num}_{part_num}.obj.tmp"));
113 let mut f = File::create(next_part_file_tmp_path.clone())?;
114 let mut metab = [0u8; MAGIC_BYTES];
115 BigEndian::write_u32(&mut metab, OBJECT_FILE_MAGIC);
116 f.rewind()?;
117 let n = f.write(&metab)?;
118 drop(f);
119 fs::rename(next_part_file_tmp_path, next_part_file_path.clone())?;
120 let mut f = OpenOptions::new().append(true).open(next_part_file_path)?;
121 f.seek(SeekFrom::Start(n as u64))?;
122 Ok((n, f))
123 }
124
125 fn ref_file(dir_path: PathBuf, bucket_num: u32, part_num: u32) -> Result<File> {
129 let ref_path = dir_path.join(format!("{bucket_num}_{part_num}.ref"));
130 let ref_tmp_path = dir_path.join(format!("{bucket_num}_{part_num}.ref.tmp"));
131 let mut f = File::create(ref_tmp_path.clone())?;
132 f.rewind()?;
133 let mut metab = [0u8; MAGIC_BYTES];
134 BigEndian::write_u32(&mut metab, REFERENCE_FILE_MAGIC);
135 let n = f.write(&metab)?;
136 drop(f);
137 fs::rename(ref_tmp_path, ref_path.clone())?;
138 let mut f = OpenOptions::new().append(true).open(ref_path)?;
139 f.seek(SeekFrom::Start(n as u64))?;
140 Ok(f)
141 }
142
143 fn finalize_obj(&mut self) -> Result<()> {
146 self.obj_wbuf.flush()?;
148 self.obj_wbuf.get_ref().sync_data()?;
149 let off = self.obj_wbuf.get_ref().stream_position()?;
150 self.obj_wbuf.get_ref().set_len(off)?;
151 let file_path = self
152 .dir_path
153 .join(format!("{}_{}.obj", self.bucket_num, self.current_part_num));
154 let file_metadata = create_file_metadata(
155 &file_path,
156 self.file_compression,
157 FileType::Object,
158 self.bucket_num,
159 self.current_part_num,
160 )?;
161 self.files.push(file_metadata.clone());
162 if let Some(sender) = &self.sender {
163 sender.blocking_send(file_metadata)?;
164 }
165 Ok(())
166 }
167
168 fn finalize_ref(&mut self) -> Result<()> {
171 self.ref_wbuf.flush()?;
173 self.ref_wbuf.get_ref().sync_data()?;
174 let off = self.ref_wbuf.get_ref().stream_position()?;
175 self.ref_wbuf.get_ref().set_len(off)?;
176 let file_path = self
177 .dir_path
178 .join(format!("{}_{}.ref", self.bucket_num, self.current_part_num));
179 let file_metadata = create_file_metadata(
180 &file_path,
181 self.file_compression,
182 FileType::Reference,
183 self.bucket_num,
184 self.current_part_num,
185 )?;
186 self.files.push(file_metadata.clone());
187 if let Some(sender) = &self.sender {
188 sender.blocking_send(file_metadata)?;
189 }
190 Ok(())
191 }
192
193 fn cut(&mut self) -> Result<()> {
196 self.finalize_obj()?;
197 let (n, f) = Self::object_file(
198 self.dir_path.clone(),
199 self.bucket_num,
200 self.current_part_num + 1,
201 )?;
202 self.object_file_size = n;
203 self.obj_wbuf = BufWriter::new(f);
204 Ok(())
205 }
206
207 fn cut_reference_file(&mut self) -> Result<()> {
210 self.finalize_ref()?;
211 let f = Self::ref_file(
212 self.dir_path.clone(),
213 self.bucket_num,
214 self.current_part_num + 1,
215 )?;
216 self.ref_wbuf = BufWriter::new(f);
217 Ok(())
218 }
219
220 fn write_object(&mut self, object: &LiveObject) -> Result<()> {
223 let blob = Blob::encode(object, BlobEncoding::Bcs)?;
224 let mut blob_size = blob.data.len().required_space();
225 blob_size += BLOB_ENCODING_BYTES;
226 blob_size += blob.data.len();
227 let cut_new_part_file = (self.object_file_size + blob_size) > FILE_MAX_BYTES;
228 if cut_new_part_file {
229 self.cut()?;
230 self.cut_reference_file()?;
231 self.current_part_num += 1;
232 }
233 self.object_file_size += blob.write(&mut self.obj_wbuf)?;
234 Ok(())
235 }
236
237 fn write_object_ref(&mut self, object_ref: &ObjectRef) -> Result<()> {
239 let mut buf = [0u8; OBJECT_REF_BYTES];
240 buf[0..ObjectID::LENGTH].copy_from_slice(object_ref.0.as_ref());
241 BigEndian::write_u64(
242 &mut buf[ObjectID::LENGTH..OBJECT_REF_BYTES],
243 object_ref.1.value(),
244 );
245 buf[ObjectID::LENGTH + SEQUENCE_NUM_BYTES..OBJECT_REF_BYTES]
246 .copy_from_slice(object_ref.2.as_ref());
247 self.ref_wbuf.write_all(&buf)?;
248 Ok(())
249 }
250}
251
252pub struct StateSnapshotWriterV1 {
255 local_staging_dir: PathBuf,
256 file_compression: FileCompression,
257 remote_object_store: Arc<DynObjectStore>,
258 local_staging_store: Arc<DynObjectStore>,
259 concurrency: usize,
260}
261
262impl StateSnapshotWriterV1 {
263 pub async fn new_from_store(
264 local_staging_path: &std::path::Path,
265 local_staging_store: &Arc<DynObjectStore>,
266 remote_object_store: &Arc<DynObjectStore>,
267 file_compression: FileCompression,
268 concurrency: NonZeroUsize,
269 ) -> Result<Self> {
270 Ok(StateSnapshotWriterV1 {
271 file_compression,
272 local_staging_dir: local_staging_path.to_path_buf(),
273 remote_object_store: remote_object_store.clone(),
274 local_staging_store: local_staging_store.clone(),
275 concurrency: concurrency.get(),
276 })
277 }
278
279 pub async fn new(
280 local_store_config: &ObjectStoreConfig,
281 remote_store_config: &ObjectStoreConfig,
282 file_compression: FileCompression,
283 concurrency: NonZeroUsize,
284 ) -> Result<Self> {
285 let remote_object_store = remote_store_config.make()?;
286 let local_staging_store = local_store_config.make()?;
287 let local_staging_dir = local_store_config
288 .directory
289 .as_ref()
290 .context("No local directory specified")?
291 .clone();
292 Ok(StateSnapshotWriterV1 {
293 local_staging_dir,
294 file_compression,
295 remote_object_store,
296 local_staging_store,
297 concurrency: concurrency.get(),
298 })
299 }
300
301 pub async fn write(
305 self,
306 epoch: u64,
307 perpetual_db: Arc<AuthorityPerpetualTables>,
308 root_state_hash: ECMHLiveObjectSetDigest,
309 ) -> Result<()> {
310 self.write_internal(epoch, perpetual_db, root_state_hash)
311 .await
312 }
313
314 pub(crate) async fn write_internal(
317 mut self,
318 epoch: u64,
319 perpetual_db: Arc<AuthorityPerpetualTables>,
320 root_state_hash: ECMHLiveObjectSetDigest,
321 ) -> Result<()> {
322 self.setup_epoch_dir(epoch).await?;
323
324 let manifest_file_path = self.epoch_dir(epoch).child("MANIFEST");
325 let local_staging_dir = self.local_staging_dir.clone();
326 let local_object_store = self.local_staging_store.clone();
327 let remote_object_store = self.remote_object_store.clone();
328
329 let (sender, receiver) = mpsc::channel::<FileMetadata>(1000);
330 let upload_handle = self.start_upload(epoch, receiver)?;
332 let write_handler = tokio::task::spawn_blocking(move || {
333 self.write_live_object_set(
334 epoch,
335 perpetual_db,
336 sender,
337 Self::bucket_func,
338 root_state_hash,
339 )
340 });
341 write_handler.await?.context(format!(
344 "Failed to write state snapshot for epoch: {}",
345 &epoch
346 ))?;
347
348 upload_handle.await?.context(format!(
350 "Failed to upload state snapshot for epoch: {}",
351 &epoch
352 ))?;
353
354 Self::sync_file_to_remote(
356 local_staging_dir,
357 manifest_file_path,
358 local_object_store,
359 remote_object_store,
360 )
361 .await?;
362 Ok(())
363 }
364
365 fn start_upload(
368 &self,
369 epoch: u64,
370 receiver: Receiver<FileMetadata>,
371 ) -> Result<JoinHandle<Result<Vec<()>, anyhow::Error>>> {
372 let remote_object_store = self.remote_object_store.clone();
373 let local_staging_store = self.local_staging_store.clone();
374 let local_dir_path = self.local_staging_dir.clone();
375 let epoch_dir = self.epoch_dir(epoch);
376 let upload_concurrency = self.concurrency;
377 let join_handle = tokio::spawn(async move {
378 let results: Vec<Result<(), anyhow::Error>> = ReceiverStream::new(receiver)
381 .map(|file_metadata| {
382 let file_path = file_metadata.file_path(&epoch_dir);
383 let remote_object_store = remote_object_store.clone();
384 let local_object_store = local_staging_store.clone();
385 let local_dir_path = local_dir_path.clone();
386 async move {
387 Self::sync_file_to_remote(
388 local_dir_path.clone(),
389 file_path.clone(),
390 local_object_store.clone(),
391 remote_object_store.clone(),
392 )
393 .await?;
394 Ok(())
395 }
396 })
397 .boxed()
398 .buffer_unordered(upload_concurrency)
399 .collect()
400 .await;
401 results
402 .into_iter()
403 .collect::<Result<Vec<()>, anyhow::Error>>()
404 });
405 Ok(join_handle)
406 }
407
408 fn write_live_object_set<F>(
412 &mut self,
413 epoch: u64,
414 perpetual_db: Arc<AuthorityPerpetualTables>,
415 sender: Sender<FileMetadata>,
416 bucket_func: F,
417 root_state_hash: ECMHLiveObjectSetDigest,
418 ) -> Result<()>
419 where
420 F: Fn(&LiveObject) -> u32,
421 {
422 let mut object_writers: HashMap<u32, LiveObjectSetWriterV1> = HashMap::new();
423 let local_staging_dir_path =
424 path_to_filesystem(self.local_staging_dir.clone(), &self.epoch_dir(epoch))?;
425 let mut acc = Accumulator::default();
426 for object in perpetual_db.iter_live_object_set() {
427 StateAccumulator::accumulate_live_object(&mut acc, &object);
428 let bucket_num = bucket_func(&object);
429 if let Vacant(entry) = object_writers.entry(bucket_num) {
431 entry.insert(LiveObjectSetWriterV1::new(
432 local_staging_dir_path.clone(),
433 bucket_num,
434 self.file_compression,
435 sender.clone(),
436 )?);
437 }
438 let writer = object_writers
439 .get_mut(&bucket_num)
440 .context("Unexpected missing bucket writer")?;
441 writer.write(&object)?;
442 }
443 assert_eq!(
444 ECMHLiveObjectSetDigest::from(acc.digest()),
445 root_state_hash,
446 "Root state hash mismatch!"
447 );
448 let mut files = vec![];
449 for (_, writer) in object_writers.into_iter() {
452 files.extend(writer.done()?);
453 }
454 self.write_manifest(epoch, files)?;
456 Ok(())
457 }
458
459 fn write_manifest(&mut self, epoch: u64, file_metadata: Vec<FileMetadata>) -> Result<()> {
462 let (f, manifest_file_path) = self.manifest_file(epoch)?;
463 let mut wbuf = BufWriter::new(f);
464 let manifest: Manifest = Manifest::V1(ManifestV1 {
465 snapshot_version: 1,
466 address_length: ObjectID::LENGTH as u64,
467 file_metadata,
468 epoch,
469 });
470 let serialized_manifest = bcs::to_bytes(&manifest)?;
471 wbuf.write_all(&serialized_manifest)?;
472 wbuf.flush()?;
473 wbuf.get_ref().sync_data()?;
474 let sha3_digest = compute_sha3_checksum(&manifest_file_path)?;
477 wbuf.write_all(&sha3_digest)?;
478 wbuf.flush()?;
479 wbuf.get_ref().sync_data()?;
480 let off = wbuf.get_ref().stream_position()?;
481 wbuf.get_ref().set_len(off)?;
482 Ok(())
483 }
484
485 fn manifest_file(&mut self, epoch: u64) -> Result<(File, PathBuf)> {
488 let manifest_file_path = path_to_filesystem(
489 self.local_staging_dir.clone(),
490 &self.epoch_dir(epoch).child("MANIFEST"),
491 )?;
492 let manifest_file_tmp_path = path_to_filesystem(
493 self.local_staging_dir.clone(),
494 &self.epoch_dir(epoch).child("MANIFEST.tmp"),
495 )?;
496 let mut f = File::create(manifest_file_tmp_path.clone())?;
497 let mut metab = vec![0u8; MAGIC_BYTES];
498 BigEndian::write_u32(&mut metab, MANIFEST_FILE_MAGIC);
499 f.rewind()?;
500 f.write_all(&metab)?;
501 drop(f);
502 fs::rename(manifest_file_tmp_path, manifest_file_path.clone())?;
503 let mut f = OpenOptions::new()
504 .append(true)
505 .open(manifest_file_path.clone())?;
506 f.seek(SeekFrom::Start(MAGIC_BYTES as u64))?;
507 Ok((f, manifest_file_path))
508 }
509
510 fn bucket_func(_object: &LiveObject) -> u32 {
511 1u32
514 }
515
516 fn epoch_dir(&self, epoch: u64) -> Path {
517 Path::from(format!("epoch_{}", epoch))
518 }
519
520 async fn setup_epoch_dir(&self, epoch: u64) -> Result<()> {
523 let epoch_dir = self.epoch_dir(epoch);
524 delete_recursively(
526 &epoch_dir,
527 &self.remote_object_store,
528 NonZeroUsize::new(self.concurrency).unwrap(),
529 )
530 .await?;
531 let local_epoch_dir_path = self.local_staging_dir.join(format!("epoch_{}", epoch));
533 if local_epoch_dir_path.exists() {
534 fs::remove_dir_all(&local_epoch_dir_path)?;
535 }
536 fs::create_dir_all(&local_epoch_dir_path)?;
537 Ok(())
538 }
539
540 async fn sync_file_to_remote(
542 local_path: PathBuf,
543 path: Path,
544 from: Arc<DynObjectStore>,
545 to: Arc<DynObjectStore>,
546 ) -> Result<()> {
547 debug!("Syncing snapshot file to remote: {:?}", path);
548 copy_file(&path, &path, &from, &to).await?;
549 fs::remove_file(path_to_filesystem(local_path, &path)?)?;
550 Ok(())
551 }
552}