iota_snapshot/
uploader.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
10use iota_core::{
11    authority::authority_store_tables::AuthorityPerpetualTables,
12    checkpoints::CheckpointStore,
13    db_checkpoint_handler::{STATE_SNAPSHOT_COMPLETED_MARKER, SUCCESS_MARKER},
14};
15use iota_storage::{
16    FileCompression,
17    object_store::util::{
18        find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem, put,
19        run_manifest_update_loop,
20    },
21};
22use iota_types::messages_checkpoint::CheckpointCommitment::ECMHLiveObjectSetDigest;
23use object_store::DynObjectStore;
24use prometheus::{
25    IntCounter, IntGauge, Registry, register_int_counter_with_registry,
26    register_int_gauge_with_registry,
27};
28use tracing::{debug, error, info};
29
30use crate::writer::StateSnapshotWriterV1;
31
32pub struct StateSnapshotUploaderMetrics {
33    pub first_missing_state_snapshot_epoch: IntGauge,
34    pub state_snapshot_upload_err: IntCounter,
35}
36
37impl StateSnapshotUploaderMetrics {
38    pub fn new(registry: &Registry) -> Arc<Self> {
39        let this = Self {
40            first_missing_state_snapshot_epoch: register_int_gauge_with_registry!(
41                "first_missing_state_snapshot_epoch",
42                "First epoch for which we have no state snapshot in remote store",
43                registry
44            )
45            .unwrap(),
46            state_snapshot_upload_err: register_int_counter_with_registry!(
47                "state_snapshot_upload_err",
48                "Track upload errors we can alert on",
49                registry
50            )
51            .unwrap(),
52        };
53        Arc::new(this)
54    }
55}
56
57/// StateSnapshotUploader is responsible for uploading state snapshots to remote
58/// store.
59pub struct StateSnapshotUploader {
60    /// Directory path on local disk where db checkpoints are stored
61    db_checkpoint_path: PathBuf,
62    /// Store on local disk where db checkpoints are written to
63    db_checkpoint_store: Arc<DynObjectStore>,
64    /// Checkpoint store; needed to fetch epoch state commitments for
65    /// verification
66    checkpoint_store: Arc<CheckpointStore>,
67    /// Directory path on local disk where state snapshots are staged for upload
68    staging_path: PathBuf,
69    /// Store on local disk where state snapshots are staged for upload
70    staging_store: Arc<DynObjectStore>,
71    /// Remote store i.e. S3, GCS, etc where state snapshots are uploaded to
72    snapshot_store: Arc<DynObjectStore>,
73    /// Time interval to check for presence of new db checkpoint (default: 60
74    /// secs)
75    interval: Duration,
76    metrics: Arc<StateSnapshotUploaderMetrics>,
77}
78
79impl StateSnapshotUploader {
80    pub fn new(
81        db_checkpoint_path: &std::path::Path,
82        staging_path: &std::path::Path,
83        snapshot_store_config: ObjectStoreConfig,
84        interval_s: u64,
85        registry: &Registry,
86        checkpoint_store: Arc<CheckpointStore>,
87    ) -> Result<Arc<Self>> {
88        let db_checkpoint_store_config = ObjectStoreConfig {
89            object_store: Some(ObjectStoreType::File),
90            directory: Some(db_checkpoint_path.to_path_buf()),
91            ..Default::default()
92        };
93        let staging_store_config = ObjectStoreConfig {
94            object_store: Some(ObjectStoreType::File),
95            directory: Some(staging_path.to_path_buf()),
96            ..Default::default()
97        };
98        Ok(Arc::new(StateSnapshotUploader {
99            db_checkpoint_path: db_checkpoint_path.to_path_buf(),
100            db_checkpoint_store: db_checkpoint_store_config.make()?,
101            checkpoint_store,
102            staging_path: staging_path.to_path_buf(),
103            staging_store: staging_store_config.make()?,
104            snapshot_store: snapshot_store_config.make()?,
105            interval: Duration::from_secs(interval_s),
106            metrics: StateSnapshotUploaderMetrics::new(registry),
107        }))
108    }
109
110    /// Starts the state snapshot uploader loop and manifest update loop.
111    pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
112        let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
113        tokio::task::spawn(Self::run_upload_loop(self.clone(), kill_sender.subscribe()));
114        tokio::task::spawn(run_manifest_update_loop(
115            self.snapshot_store.clone(),
116            kill_sender.subscribe(),
117        ));
118        kill_sender
119    }
120
121    /// Uploads state snapshots to remote store if they are missing.
122    async fn upload_state_snapshot_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
123        let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
124        // Finds all local checkpoints db by epoch
125        let local_checkpoints_by_epoch =
126            find_all_dirs_with_epoch_prefix(&self.db_checkpoint_store, None).await?;
127        let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
128        dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
129        for (epoch, db_path) in dirs {
130            // Writes state snapshot to remote store if it is missing
131            // or if the local has more advanced epochs than the remote
132            if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
133                info!("Starting state snapshot creation for epoch: {}", *epoch);
134                let state_snapshot_writer = StateSnapshotWriterV1::new_from_store(
135                    &self.staging_path,
136                    &self.staging_store,
137                    &self.snapshot_store,
138                    FileCompression::Zstd,
139                    NonZeroUsize::new(20).unwrap(),
140                )
141                .await?;
142                let db = Arc::new(AuthorityPerpetualTables::open(
143                    &path_to_filesystem(self.db_checkpoint_path.clone(), &db_path.child("store"))?,
144                    None,
145                ));
146                let commitments = self
147                    .checkpoint_store
148                    .get_epoch_state_commitments(*epoch)
149                    .expect("Expected last checkpoint of epoch to have end of epoch data")
150                    .expect("Expected end of epoch data to be present");
151                let ECMHLiveObjectSetDigest(state_hash_commitment) = commitments
152                    .last()
153                    .expect("Expected at least one commitment")
154                    .clone();
155                state_snapshot_writer
156                    .write(*epoch, db, state_hash_commitment)
157                    .await?;
158                info!("State snapshot creation successful for epoch: {}", *epoch);
159                // Drops marker in the output directory that upload completed successfully
160                let bytes = Bytes::from_static(b"success");
161                let success_marker = db_path.child(SUCCESS_MARKER);
162                put(&self.snapshot_store, &success_marker, bytes.clone()).await?;
163                let state_snapshot_completed_marker =
164                    db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
165                put(
166                    &self.db_checkpoint_store.clone(),
167                    &state_snapshot_completed_marker,
168                    bytes.clone(),
169                )
170                .await?;
171                info!("State snapshot completed for epoch: {epoch}");
172            } else {
173                let bytes = Bytes::from_static(b"success");
174                let state_snapshot_completed_marker =
175                    db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
176                put(
177                    &self.db_checkpoint_store.clone(),
178                    &state_snapshot_completed_marker,
179                    bytes.clone(),
180                )
181                .await?;
182                info!("State snapshot skipped for epoch: {epoch}");
183            }
184        }
185        Ok(())
186    }
187
188    /// Main loop that checks for missing remote state snapshots and uploads
189    /// them from the local store.
190    async fn run_upload_loop(
191        self: Arc<Self>,
192        mut recv: tokio::sync::broadcast::Receiver<()>,
193    ) -> Result<()> {
194        let mut interval = tokio::time::interval(self.interval);
195        info!("State snapshot uploader loop started");
196        loop {
197            tokio::select! {
198                _now = interval.tick() => {
199                    let missing_epochs = self.get_missing_epochs().await;
200                    if let Ok(epochs) = missing_epochs {
201                        let first_missing_epoch = epochs.first().cloned().unwrap_or(0);
202                        self.metrics.first_missing_state_snapshot_epoch.set(first_missing_epoch as i64);
203                        if let Err(err) = self.upload_state_snapshot_to_object_store(epochs).await {
204                            self.metrics.state_snapshot_upload_err.inc();
205                            error!("Failed to upload state snapshot to remote store with err: {:?}", err);
206                        } else {
207                            debug!("Successfully completed snapshot upload loop");
208                        }
209                    } else {
210                        error!("Failed to find missing state snapshot in remote store");
211                    }
212                },
213                _ = recv.recv() => break,
214            }
215        }
216        Ok(())
217    }
218
219    /// Finds missing epochs in the remote store.
220    async fn get_missing_epochs(&self) -> Result<Vec<u64>> {
221        let missing_epochs = find_missing_epochs_dirs(&self.snapshot_store, SUCCESS_MARKER).await?;
222        Ok(missing_epochs.to_vec())
223    }
224}