1use std::{num::NonZeroUsize, path::PathBuf, sync::Arc, time::Duration};
6
7use anyhow::Result;
8use bytes::Bytes;
9use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
10use iota_core::{
11 authority::authority_store_tables::AuthorityPerpetualTables,
12 checkpoints::CheckpointStore,
13 db_checkpoint_handler::{STATE_SNAPSHOT_COMPLETED_MARKER, SUCCESS_MARKER},
14};
15use iota_storage::{
16 FileCompression,
17 object_store::util::{
18 find_all_dirs_with_epoch_prefix, find_missing_epochs_dirs, path_to_filesystem, put,
19 run_manifest_update_loop,
20 },
21};
22use iota_types::messages_checkpoint::CheckpointCommitment::ECMHLiveObjectSetDigest;
23use object_store::DynObjectStore;
24use prometheus::{
25 IntCounter, IntGauge, Registry, register_int_counter_with_registry,
26 register_int_gauge_with_registry,
27};
28use tracing::{debug, error, info};
29
30use crate::writer::StateSnapshotWriterV1;
31
32pub struct StateSnapshotUploaderMetrics {
33 pub first_missing_state_snapshot_epoch: IntGauge,
34 pub state_snapshot_upload_err: IntCounter,
35}
36
37impl StateSnapshotUploaderMetrics {
38 pub fn new(registry: &Registry) -> Arc<Self> {
39 let this = Self {
40 first_missing_state_snapshot_epoch: register_int_gauge_with_registry!(
41 "first_missing_state_snapshot_epoch",
42 "First epoch for which we have no state snapshot in remote store",
43 registry
44 )
45 .unwrap(),
46 state_snapshot_upload_err: register_int_counter_with_registry!(
47 "state_snapshot_upload_err",
48 "Track upload errors we can alert on",
49 registry
50 )
51 .unwrap(),
52 };
53 Arc::new(this)
54 }
55}
56
57pub struct StateSnapshotUploader {
60 db_checkpoint_path: PathBuf,
62 db_checkpoint_store: Arc<DynObjectStore>,
64 checkpoint_store: Arc<CheckpointStore>,
67 staging_path: PathBuf,
69 staging_store: Arc<DynObjectStore>,
71 snapshot_store: Arc<DynObjectStore>,
73 interval: Duration,
76 metrics: Arc<StateSnapshotUploaderMetrics>,
77}
78
79impl StateSnapshotUploader {
80 pub fn new(
81 db_checkpoint_path: &std::path::Path,
82 staging_path: &std::path::Path,
83 snapshot_store_config: ObjectStoreConfig,
84 interval_s: u64,
85 registry: &Registry,
86 checkpoint_store: Arc<CheckpointStore>,
87 ) -> Result<Arc<Self>> {
88 let db_checkpoint_store_config = ObjectStoreConfig {
89 object_store: Some(ObjectStoreType::File),
90 directory: Some(db_checkpoint_path.to_path_buf()),
91 ..Default::default()
92 };
93 let staging_store_config = ObjectStoreConfig {
94 object_store: Some(ObjectStoreType::File),
95 directory: Some(staging_path.to_path_buf()),
96 ..Default::default()
97 };
98 Ok(Arc::new(StateSnapshotUploader {
99 db_checkpoint_path: db_checkpoint_path.to_path_buf(),
100 db_checkpoint_store: db_checkpoint_store_config.make()?,
101 checkpoint_store,
102 staging_path: staging_path.to_path_buf(),
103 staging_store: staging_store_config.make()?,
104 snapshot_store: snapshot_store_config.make()?,
105 interval: Duration::from_secs(interval_s),
106 metrics: StateSnapshotUploaderMetrics::new(registry),
107 }))
108 }
109
110 pub fn start(self: Arc<Self>) -> tokio::sync::broadcast::Sender<()> {
112 let (kill_sender, _kill_receiver) = tokio::sync::broadcast::channel::<()>(1);
113 tokio::task::spawn(Self::run_upload_loop(self.clone(), kill_sender.subscribe()));
114 tokio::task::spawn(run_manifest_update_loop(
115 self.snapshot_store.clone(),
116 kill_sender.subscribe(),
117 ));
118 kill_sender
119 }
120
121 async fn upload_state_snapshot_to_object_store(&self, missing_epochs: Vec<u64>) -> Result<()> {
123 let last_missing_epoch = missing_epochs.last().cloned().unwrap_or(0);
124 let local_checkpoints_by_epoch =
126 find_all_dirs_with_epoch_prefix(&self.db_checkpoint_store, None).await?;
127 let mut dirs: Vec<_> = local_checkpoints_by_epoch.iter().collect();
128 dirs.sort_by_key(|(epoch_num, _path)| *epoch_num);
129 for (epoch, db_path) in dirs {
130 if missing_epochs.contains(epoch) || *epoch >= last_missing_epoch {
133 info!("Starting state snapshot creation for epoch: {}", *epoch);
134 let state_snapshot_writer = StateSnapshotWriterV1::new_from_store(
135 &self.staging_path,
136 &self.staging_store,
137 &self.snapshot_store,
138 FileCompression::Zstd,
139 NonZeroUsize::new(20).unwrap(),
140 )
141 .await?;
142 let db = Arc::new(AuthorityPerpetualTables::open(
143 &path_to_filesystem(self.db_checkpoint_path.clone(), &db_path.child("store"))?,
144 None,
145 ));
146 let commitments = self
147 .checkpoint_store
148 .get_epoch_state_commitments(*epoch)
149 .expect("Expected last checkpoint of epoch to have end of epoch data")
150 .expect("Expected end of epoch data to be present");
151 let ECMHLiveObjectSetDigest(state_hash_commitment) = commitments
152 .last()
153 .expect("Expected at least one commitment")
154 .clone();
155 state_snapshot_writer
156 .write(*epoch, db, state_hash_commitment)
157 .await?;
158 info!("State snapshot creation successful for epoch: {}", *epoch);
159 let bytes = Bytes::from_static(b"success");
161 let success_marker = db_path.child(SUCCESS_MARKER);
162 put(&self.snapshot_store, &success_marker, bytes.clone()).await?;
163 let state_snapshot_completed_marker =
164 db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
165 put(
166 &self.db_checkpoint_store.clone(),
167 &state_snapshot_completed_marker,
168 bytes.clone(),
169 )
170 .await?;
171 info!("State snapshot completed for epoch: {epoch}");
172 } else {
173 let bytes = Bytes::from_static(b"success");
174 let state_snapshot_completed_marker =
175 db_path.child(STATE_SNAPSHOT_COMPLETED_MARKER);
176 put(
177 &self.db_checkpoint_store.clone(),
178 &state_snapshot_completed_marker,
179 bytes.clone(),
180 )
181 .await?;
182 info!("State snapshot skipped for epoch: {epoch}");
183 }
184 }
185 Ok(())
186 }
187
188 async fn run_upload_loop(
191 self: Arc<Self>,
192 mut recv: tokio::sync::broadcast::Receiver<()>,
193 ) -> Result<()> {
194 let mut interval = tokio::time::interval(self.interval);
195 info!("State snapshot uploader loop started");
196 loop {
197 tokio::select! {
198 _now = interval.tick() => {
199 let missing_epochs = self.get_missing_epochs().await;
200 if let Ok(epochs) = missing_epochs {
201 let first_missing_epoch = epochs.first().cloned().unwrap_or(0);
202 self.metrics.first_missing_state_snapshot_epoch.set(first_missing_epoch as i64);
203 if let Err(err) = self.upload_state_snapshot_to_object_store(epochs).await {
204 self.metrics.state_snapshot_upload_err.inc();
205 error!("Failed to upload state snapshot to remote store with err: {:?}", err);
206 } else {
207 debug!("Successfully completed snapshot upload loop");
208 }
209 } else {
210 error!("Failed to find missing state snapshot in remote store");
211 }
212 },
213 _ = recv.recv() => break,
214 }
215 }
216 Ok(())
217 }
218
219 async fn get_missing_epochs(&self) -> Result<Vec<u64>> {
221 let missing_epochs = find_missing_epochs_dirs(&self.snapshot_store, SUCCESS_MARKER).await?;
222 Ok(missing_epochs.to_vec())
223 }
224}