iota_config/
object_storage_config.rs1use std::{env, fs, path::PathBuf, sync::Arc};
6
7use anyhow::{Context, Result, anyhow};
8use clap::*;
9use object_store::{ClientOptions, DynObjectStore, aws::AmazonS3Builder};
10use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
11use serde::{Deserialize, Serialize};
12use tracing::info;
13
14#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, ValueEnum)]
16pub enum ObjectStoreType {
17 File,
19 S3,
21 GCS,
23 Azure,
25}
26
27#[derive(Default, Debug, Clone, Deserialize, Serialize, Args)]
28#[serde(rename_all = "kebab-case")]
29pub struct ObjectStoreConfig {
30 #[serde(skip_serializing_if = "Option::is_none")]
33 #[arg(value_enum)]
34 pub object_store: Option<ObjectStoreType>,
35 #[serde(skip_serializing_if = "Option::is_none")]
37 #[arg(long)]
38 pub directory: Option<PathBuf>,
39 #[serde(skip_serializing_if = "Option::is_none")]
42 #[arg(long)]
43 pub bucket: Option<String>,
44 #[serde(skip_serializing_if = "Option::is_none")]
47 #[arg(long)]
48 pub aws_access_key_id: Option<String>,
49 #[serde(skip_serializing_if = "Option::is_none")]
52 #[arg(long)]
53 pub aws_secret_access_key: Option<String>,
54 #[serde(skip_serializing_if = "Option::is_none")]
56 #[arg(long)]
57 pub aws_endpoint: Option<String>,
58 #[serde(skip_serializing_if = "Option::is_none")]
61 #[arg(long)]
62 pub aws_region: Option<String>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 #[arg(long)]
65 pub aws_profile: Option<String>,
66 #[serde(default)]
68 #[arg(long, default_value_t = true)]
69 pub aws_virtual_hosted_style_request: bool,
70 #[serde(default)]
72 #[arg(long, default_value_t = true)]
73 pub aws_allow_http: bool,
74 #[serde(skip_serializing_if = "Option::is_none")]
77 #[arg(long)]
78 pub google_service_account: Option<String>,
79 #[serde(skip_serializing_if = "Option::is_none")]
83 #[arg(long)]
84 pub google_project_id: Option<String>,
85 #[serde(skip_serializing_if = "Option::is_none")]
88 #[arg(long)]
89 pub azure_storage_account: Option<String>,
90 #[serde(skip_serializing_if = "Option::is_none")]
93 #[arg(long)]
94 pub azure_storage_access_key: Option<String>,
95 #[serde(default = "default_object_store_connection_limit")]
96 #[arg(long, default_value_t = 20)]
97 pub object_store_connection_limit: usize,
98 #[serde(default)]
99 #[arg(long, default_value_t = false)]
100 pub no_sign_request: bool,
101}
102
103fn default_object_store_connection_limit() -> usize {
104 20
105}
106
107fn no_timeout_options() -> ClientOptions {
108 ClientOptions::new()
109 .with_timeout_disabled()
110 .with_connect_timeout_disabled()
111 .with_pool_idle_timeout(std::time::Duration::from_secs(300))
112}
113
114impl ObjectStoreConfig {
115 fn new_local_fs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
116 info!(directory=?self.directory, object_store_type="File", "Object Store");
117 if let Some(path) = &self.directory {
118 fs::create_dir_all(path).context(anyhow!(
119 "failed to create local directory: {}",
120 path.display()
121 ))?;
122 let store = object_store::local::LocalFileSystem::new_with_prefix(path)
123 .context(anyhow!("failed to create local object store"))?;
124 Ok(Arc::new(store))
125 } else {
126 Err(anyhow!("no directory provided for local fs storage"))
127 }
128 }
129 fn new_s3(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
130 use object_store::limit::LimitStore;
131
132 info!(bucket=?self.bucket, object_store_type="S3", "Object Store");
133
134 let mut builder = AmazonS3Builder::new()
135 .with_client_options(no_timeout_options())
136 .with_imdsv1_fallback();
137
138 if self.aws_virtual_hosted_style_request {
139 builder = builder.with_virtual_hosted_style_request(true);
140 }
141 if self.aws_allow_http {
142 builder = builder.with_allow_http(true);
143 }
144 if let Some(region) = &self.aws_region {
145 builder = builder.with_region(region);
146 }
147 if let Some(bucket) = &self.bucket {
148 builder = builder.with_bucket_name(bucket);
149 }
150
151 if let Some(key_id) = &self.aws_access_key_id {
152 builder = builder.with_access_key_id(key_id);
153 } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_ACCESS_KEY_ID") {
154 builder = builder.with_access_key_id(secret);
155 } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_ACCESS_KEY_ID") {
156 builder = builder.with_access_key_id(secret);
157 } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_ACCESS_KEY_ID") {
158 builder = builder.with_access_key_id(secret);
159 }
160
161 if let Some(secret) = &self.aws_secret_access_key {
162 builder = builder.with_secret_access_key(secret);
163 } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_SECRET_ACCESS_KEY") {
164 builder = builder.with_secret_access_key(secret);
165 } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_SECRET_ACCESS_KEY") {
166 builder = builder.with_secret_access_key(secret);
167 } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_SECRET_ACCESS_KEY") {
168 builder = builder.with_secret_access_key(secret);
169 }
170
171 if let Some(endpoint) = &self.aws_endpoint {
172 builder = builder.with_endpoint(endpoint);
173 }
174 Ok(Arc::new(LimitStore::new(
175 builder.build().context("invalid s3 config")?,
176 self.object_store_connection_limit,
177 )))
178 }
179 fn new_gcs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
180 use object_store::{gcp::GoogleCloudStorageBuilder, limit::LimitStore};
181
182 info!(bucket=?self.bucket, object_store_type="GCS", "Object Store");
183
184 let mut builder = GoogleCloudStorageBuilder::new();
185
186 if let Some(bucket) = &self.bucket {
187 builder = builder.with_bucket_name(bucket);
188 }
189 if let Some(account) = &self.google_service_account {
190 builder = builder.with_service_account_path(account);
191 }
192
193 let mut client_options = no_timeout_options();
194 if let Some(google_project_id) = &self.google_project_id {
195 let x_project_header = HeaderName::from_static("x-goog-user-project");
196 let iam_req_header = HeaderName::from_static("userproject");
197
198 let mut headers = HeaderMap::new();
199 headers.insert(x_project_header, HeaderValue::from_str(google_project_id)?);
200 headers.insert(iam_req_header, HeaderValue::from_str(google_project_id)?);
201 client_options = client_options.with_default_headers(headers);
202 }
203 builder = builder.with_client_options(client_options);
204
205 Ok(Arc::new(LimitStore::new(
206 builder.build().context("invalid gcs config")?,
207 self.object_store_connection_limit,
208 )))
209 }
210 fn new_azure(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
211 use object_store::{azure::MicrosoftAzureBuilder, limit::LimitStore};
212
213 info!(bucket=?self.bucket, account=?self.azure_storage_account,
214 object_store_type="Azure", "Object Store");
215
216 let mut builder = MicrosoftAzureBuilder::new().with_client_options(no_timeout_options());
217
218 if let Some(bucket) = &self.bucket {
219 builder = builder.with_container_name(bucket);
220 }
221 if let Some(account) = &self.azure_storage_account {
222 builder = builder.with_account(account)
223 }
224 if let Some(key) = &self.azure_storage_access_key {
225 builder = builder.with_access_key(key)
226 }
227
228 Ok(Arc::new(LimitStore::new(
229 builder.build().context("invalid azure config")?,
230 self.object_store_connection_limit,
231 )))
232 }
233 pub fn make(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
234 match &self.object_store {
235 Some(ObjectStoreType::File) => self.new_local_fs(),
236 Some(ObjectStoreType::S3) => self.new_s3(),
237 Some(ObjectStoreType::GCS) => self.new_gcs(),
238 Some(ObjectStoreType::Azure) => self.new_azure(),
239 _ => Err(anyhow!("at least one storage backend should be provided")),
240 }
241 }
242}