iota_config/
object_storage_config.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{env, fs, path::PathBuf, sync::Arc};
6
7use anyhow::{Context, Result, anyhow};
8use clap::*;
9use object_store::{ClientOptions, DynObjectStore, aws::AmazonS3Builder};
10use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
11use serde::{Deserialize, Serialize};
12use tracing::info;
13
14/// Object-store type.
15#[derive(Debug, Copy, Clone, PartialEq, Eq, Deserialize, Serialize, ValueEnum)]
16pub enum ObjectStoreType {
17    /// Local file system
18    File,
19    /// AWS S3
20    S3,
21    /// Google Cloud Store
22    GCS,
23    /// Azure Blob Store
24    Azure,
25}
26
27#[derive(Default, Debug, Clone, Deserialize, Serialize, Args)]
28#[serde(rename_all = "kebab-case")]
29pub struct ObjectStoreConfig {
30    /// Which object storage to use. If not specified, defaults to local file
31    /// system.
32    #[serde(skip_serializing_if = "Option::is_none")]
33    #[arg(value_enum)]
34    pub object_store: Option<ObjectStoreType>,
35    /// Path of the local directory. Only relevant is `--object-store` is File
36    #[serde(skip_serializing_if = "Option::is_none")]
37    #[arg(long)]
38    pub directory: Option<PathBuf>,
39    /// Name of the bucket to use for the object store. Must also set
40    /// `--object-store` to a cloud object storage to have any effect.
41    #[serde(skip_serializing_if = "Option::is_none")]
42    #[arg(long)]
43    pub bucket: Option<String>,
44    /// When using Amazon S3 as the object store, set this to an access key that
45    /// has permission to read from and write to the specified S3 bucket.
46    #[serde(skip_serializing_if = "Option::is_none")]
47    #[arg(long)]
48    pub aws_access_key_id: Option<String>,
49    /// When using Amazon S3 as the object store, set this to the secret access
50    /// key that goes with the specified access key ID.
51    #[serde(skip_serializing_if = "Option::is_none")]
52    #[arg(long)]
53    pub aws_secret_access_key: Option<String>,
54    /// When using Amazon S3 as the object store, set this to bucket endpoint
55    #[serde(skip_serializing_if = "Option::is_none")]
56    #[arg(long)]
57    pub aws_endpoint: Option<String>,
58    /// When using Amazon S3 as the object store, set this to the region
59    /// that goes with the specified bucket
60    #[serde(skip_serializing_if = "Option::is_none")]
61    #[arg(long)]
62    pub aws_region: Option<String>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    #[arg(long)]
65    pub aws_profile: Option<String>,
66    /// Enable virtual hosted style requests
67    #[serde(default)]
68    #[arg(long, default_value_t = true)]
69    pub aws_virtual_hosted_style_request: bool,
70    /// Allow unencrypted HTTP connection to AWS.
71    #[serde(default)]
72    #[arg(long, default_value_t = true)]
73    pub aws_allow_http: bool,
74    /// When using Google Cloud Storage as the object store, set this to the
75    /// path to the JSON file that contains the Google credentials.
76    #[serde(skip_serializing_if = "Option::is_none")]
77    #[arg(long)]
78    pub google_service_account: Option<String>,
79    /// When using Google Cloud Storage as the object store and writing to a
80    /// bucket with Requester Pays enabled, set this to the project_id
81    /// you want to associate the write cost with.
82    #[serde(skip_serializing_if = "Option::is_none")]
83    #[arg(long)]
84    pub google_project_id: Option<String>,
85    /// When using Microsoft Azure as the object store, set this to the
86    /// azure account name
87    #[serde(skip_serializing_if = "Option::is_none")]
88    #[arg(long)]
89    pub azure_storage_account: Option<String>,
90    /// When using Microsoft Azure as the object store, set this to one of the
91    /// keys in storage account settings
92    #[serde(skip_serializing_if = "Option::is_none")]
93    #[arg(long)]
94    pub azure_storage_access_key: Option<String>,
95    #[serde(default = "default_object_store_connection_limit")]
96    #[arg(long, default_value_t = 20)]
97    pub object_store_connection_limit: usize,
98    #[serde(default)]
99    #[arg(long, default_value_t = false)]
100    pub no_sign_request: bool,
101}
102
103fn default_object_store_connection_limit() -> usize {
104    20
105}
106
107fn no_timeout_options() -> ClientOptions {
108    ClientOptions::new()
109        .with_timeout_disabled()
110        .with_connect_timeout_disabled()
111        .with_pool_idle_timeout(std::time::Duration::from_secs(300))
112}
113
114impl ObjectStoreConfig {
115    fn new_local_fs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
116        info!(directory=?self.directory, object_store_type="File", "Object Store");
117        if let Some(path) = &self.directory {
118            fs::create_dir_all(path).context(anyhow!(
119                "failed to create local directory: {}",
120                path.display()
121            ))?;
122            let store = object_store::local::LocalFileSystem::new_with_prefix(path)
123                .context(anyhow!("failed to create local object store"))?;
124            Ok(Arc::new(store))
125        } else {
126            Err(anyhow!("no directory provided for local fs storage"))
127        }
128    }
129    fn new_s3(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
130        use object_store::limit::LimitStore;
131
132        info!(bucket=?self.bucket, object_store_type="S3", "Object Store");
133
134        let mut builder = AmazonS3Builder::new()
135            .with_client_options(no_timeout_options())
136            .with_imdsv1_fallback();
137
138        if self.aws_virtual_hosted_style_request {
139            builder = builder.with_virtual_hosted_style_request(true);
140        }
141        if self.aws_allow_http {
142            builder = builder.with_allow_http(true);
143        }
144        if let Some(region) = &self.aws_region {
145            builder = builder.with_region(region);
146        }
147        if let Some(bucket) = &self.bucket {
148            builder = builder.with_bucket_name(bucket);
149        }
150
151        if let Some(key_id) = &self.aws_access_key_id {
152            builder = builder.with_access_key_id(key_id);
153        } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_ACCESS_KEY_ID") {
154            builder = builder.with_access_key_id(secret);
155        } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_ACCESS_KEY_ID") {
156            builder = builder.with_access_key_id(secret);
157        } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_ACCESS_KEY_ID") {
158            builder = builder.with_access_key_id(secret);
159        }
160
161        if let Some(secret) = &self.aws_secret_access_key {
162            builder = builder.with_secret_access_key(secret);
163        } else if let Ok(secret) = env::var("ARCHIVE_READ_AWS_SECRET_ACCESS_KEY") {
164            builder = builder.with_secret_access_key(secret);
165        } else if let Ok(secret) = env::var("FORMAL_SNAPSHOT_WRITE_AWS_SECRET_ACCESS_KEY") {
166            builder = builder.with_secret_access_key(secret);
167        } else if let Ok(secret) = env::var("DB_SNAPSHOT_READ_AWS_SECRET_ACCESS_KEY") {
168            builder = builder.with_secret_access_key(secret);
169        }
170
171        if let Some(endpoint) = &self.aws_endpoint {
172            builder = builder.with_endpoint(endpoint);
173        }
174        Ok(Arc::new(LimitStore::new(
175            builder.build().context("invalid s3 config")?,
176            self.object_store_connection_limit,
177        )))
178    }
179    fn new_gcs(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
180        use object_store::{gcp::GoogleCloudStorageBuilder, limit::LimitStore};
181
182        info!(bucket=?self.bucket, object_store_type="GCS", "Object Store");
183
184        let mut builder = GoogleCloudStorageBuilder::new();
185
186        if let Some(bucket) = &self.bucket {
187            builder = builder.with_bucket_name(bucket);
188        }
189        if let Some(account) = &self.google_service_account {
190            builder = builder.with_service_account_path(account);
191        }
192
193        let mut client_options = no_timeout_options();
194        if let Some(google_project_id) = &self.google_project_id {
195            let x_project_header = HeaderName::from_static("x-goog-user-project");
196            let iam_req_header = HeaderName::from_static("userproject");
197
198            let mut headers = HeaderMap::new();
199            headers.insert(x_project_header, HeaderValue::from_str(google_project_id)?);
200            headers.insert(iam_req_header, HeaderValue::from_str(google_project_id)?);
201            client_options = client_options.with_default_headers(headers);
202        }
203        builder = builder.with_client_options(client_options);
204
205        Ok(Arc::new(LimitStore::new(
206            builder.build().context("invalid gcs config")?,
207            self.object_store_connection_limit,
208        )))
209    }
210    fn new_azure(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
211        use object_store::{azure::MicrosoftAzureBuilder, limit::LimitStore};
212
213        info!(bucket=?self.bucket, account=?self.azure_storage_account,
214          object_store_type="Azure", "Object Store");
215
216        let mut builder = MicrosoftAzureBuilder::new().with_client_options(no_timeout_options());
217
218        if let Some(bucket) = &self.bucket {
219            builder = builder.with_container_name(bucket);
220        }
221        if let Some(account) = &self.azure_storage_account {
222            builder = builder.with_account(account)
223        }
224        if let Some(key) = &self.azure_storage_access_key {
225            builder = builder.with_access_key(key)
226        }
227
228        Ok(Arc::new(LimitStore::new(
229            builder.build().context("invalid azure config")?,
230            self.object_store_connection_limit,
231        )))
232    }
233    pub fn make(&self) -> Result<Arc<DynObjectStore>, anyhow::Error> {
234        match &self.object_store {
235            Some(ObjectStoreType::File) => self.new_local_fs(),
236            Some(ObjectStoreType::S3) => self.new_s3(),
237            Some(ObjectStoreType::GCS) => self.new_gcs(),
238            Some(ObjectStoreType::Azure) => self.new_azure(),
239            _ => Err(anyhow!("at least one storage backend should be provided")),
240        }
241    }
242}