iota_data_ingestion_core/
util.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{str::FromStr, time::Duration};
6
7use backoff::{ExponentialBackoff, backoff::Backoff};
8use object_store::{
9    ClientOptions, ObjectStore, RetryConfig, aws::AmazonS3ConfigKey, gcp::GoogleConfigKey,
10};
11use url::Url;
12
13use crate::IngestionResult;
14
15/// Creates a remote store client *without* any retry mechanism.
16///
17/// This function constructs a remote store client configured to *not* retry
18/// failed requests. All requests will fail immediately if the underlying
19/// operation encounters an error.  This is a convenience wrapper around
20/// `create_remote_store_client_with_ops` that sets the retry configuration
21/// to disable retries.
22///
23/// # Arguments
24///
25/// * `url`: The URL of the remote store. The scheme of the URL determines the
26///   storage provider:
27///     * `http://` or `https://`: HTTP-based store.
28///     * `gs://`: Google Cloud Storage.
29///     * `s3://` or other AWS S3-compatible URL: Amazon S3.
30/// * `remote_store_options`: A vector of key-value pairs representing
31///   provider-specific options.
32///     * For GCS: See [`object_store::gcp::GoogleConfigKey`] for valid keys.
33///     * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys.
34///     * For HTTP: No options are currently supported. This parameter should be
35///       empty.
36/// * `request_timeout_secs`: The timeout duration (in seconds) for individual
37///   requests. This timeout is used to set a slightly longer retry timeout
38///   (request_timeout_secs + 1) internally, even though retries are disabled.
39///   This is done to ensure that the overall operation doesn't hang
40///   indefinitely.
41///
42/// # Examples
43///
44/// Creating an S3 client without retries:
45///
46/// ```rust,no_run
47/// # use iota_data_ingestion_core::create_remote_store_client;
48/// use object_store::aws::AmazonS3ConfigKey;
49///
50/// let url = "s3://my-bucket";
51/// let options = vec![(
52///     AmazonS3ConfigKey::Region.as_ref().to_owned(),
53///     "us-east-1".to_string(),
54/// )];
55/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap();
56/// ```
57///
58/// Creating a GCS client without retries:
59///
60/// ```rust,no_run
61/// # use iota_data_ingestion_core::create_remote_store_client;
62/// use object_store::gcp::GoogleConfigKey;
63///
64/// let url = "gs://my-bucket";
65/// let options = vec![(
66///     GoogleConfigKey::ServiceAccount.as_ref().to_owned(),
67///     "my-service-account".to_string(),
68/// )];
69/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap();
70/// ```
71///
72/// Creating an HTTP client without retries (no options supported):
73///
74/// ```rust,no_run
75/// # use iota_data_ingestion_core::create_remote_store_client;
76///
77/// let url = "http://example.bucket.com";
78/// let options = vec![]; // No options for HTTP
79/// let client = create_remote_store_client(url.to_string(), options, 30).unwrap();
80/// ```
81pub fn create_remote_store_client(
82    url: String,
83    remote_store_options: Vec<(String, String)>,
84    request_timeout_secs: u64,
85) -> IngestionResult<Box<dyn ObjectStore>> {
86    let retry_config = RetryConfig {
87        max_retries: 0,
88        retry_timeout: Duration::from_secs(request_timeout_secs + 1),
89        ..Default::default()
90    };
91
92    create_remote_store_client_with_ops(
93        url,
94        remote_store_options,
95        request_timeout_secs,
96        retry_config,
97    )
98}
99
100/// Creates a remote store client with configurable retry behavior and options.
101///
102/// This function constructs a remote store client for various cloud storage
103/// providers (HTTP, Google Cloud Storage, Amazon S3) based on the provided URL
104/// and options. It allows configuring retry behavior through the `retry_config`
105/// argument.
106///
107/// # Arguments
108///
109/// * `url`: The URL of the remote store.  The scheme of the URL determines the
110///   storage provider:
111///     * `http://` or `https://`:  HTTP-based store.
112///     * `gs://`: Google Cloud Storage.
113///     * `s3://` or other AWS S3-compatible URL: Amazon S3.
114/// * `remote_store_options`: A vector of key-value pairs representing
115///   provider-specific options.
116///     * For GCS:  See [`object_store::gcp::GoogleConfigKey`] for valid keys.
117///     * For S3: See [`object_store::aws::AmazonS3ConfigKey`] for valid keys.
118///     * For HTTP: No options are currently supported. This parameter should be
119///       empty.
120/// * `request_timeout_secs`: The timeout duration (in seconds) for individual
121///   requests.
122/// * `retry_config`: A [`RetryConfig`] struct defining the retry strategy. This
123///   allows fine-grained control over the number of retries, backoff behavior,
124///   and retry timeouts.  See the documentation for
125///   [`object_store::RetryConfig`] for details.
126///
127/// # Examples
128///
129/// Creating an S3 client with specific options and a retry configuration:
130///
131/// ```text
132/// # use iota_data_ingestion_core::create_remote_store_client_with_ops;
133/// use object_store::{RetryConfig, aws::AmazonS3ConfigKey};
134///
135/// let url = "s3://my-bucket";
136/// let options = vec![(
137///     AmazonS3ConfigKey::Region.as_ref().to_owned(),
138///     "us-east-1".to_string(),
139/// )];
140/// let retry_config = RetryConfig::default(); // Use default retry settings
141/// let client =
142///     create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap();
143/// ```
144///
145/// Creating a GCS client:
146///
147/// ```text
148/// # use iota_data_ingestion_core::create_remote_store_client_with_ops;
149/// use object_store::{RetryConfig, gcp::GoogleConfigKey};
150///
151/// let url = "gs://my-bucket";
152/// let options = vec![(
153///     GoogleConfigKey::ServiceAccount.as_ref().to_owned(),
154///     "my-service-account".to_string(),
155/// )];
156/// let retry_config = RetryConfig::default();
157/// let client =
158///     create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap();
159/// ```
160///
161/// Creating an HTTP client (no options supported):
162///
163/// ```text
164/// # use iota_data_ingestion_core::create_remote_store_client_with_ops;
165/// use object_store::RetryConfig;
166///
167/// let url = "http://example.bucket.com";
168/// let options = vec![]; // No options for HTTP
169/// let retry_config = RetryConfig::default();
170/// let client =
171///     create_remote_store_client_with_ops(url.to_string(), options, 30, retry_config).unwrap();
172/// ```
173pub fn create_remote_store_client_with_ops(
174    url: String,
175    remote_store_options: Vec<(String, String)>,
176    request_timeout_secs: u64,
177    retry_config: RetryConfig,
178) -> IngestionResult<Box<dyn ObjectStore>> {
179    let client_options = ClientOptions::new()
180        .with_timeout(Duration::from_secs(request_timeout_secs))
181        .with_allow_http(true);
182    if remote_store_options.is_empty() {
183        let http_store = object_store::http::HttpBuilder::new()
184            .with_url(url)
185            .with_client_options(client_options)
186            .with_retry(retry_config)
187            .build()?;
188        Ok(Box::new(http_store))
189    } else if Url::parse(&url)?.scheme() == "gs" {
190        let url = Url::parse(&url)?;
191        let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
192            .with_url(url.as_str())
193            .with_retry(retry_config)
194            .with_client_options(client_options);
195        for (key, value) in remote_store_options {
196            builder = builder.with_config(GoogleConfigKey::from_str(&key)?, value);
197        }
198        Ok(Box::new(builder.build()?))
199    } else {
200        let url = Url::parse(&url)?;
201        let mut builder = object_store::aws::AmazonS3Builder::new()
202            .with_url(url.as_str())
203            .with_retry(retry_config)
204            .with_client_options(client_options);
205        for (key, value) in remote_store_options {
206            builder = builder.with_config(AmazonS3ConfigKey::from_str(&key)?, value);
207        }
208        Ok(Box::new(builder.build()?))
209    }
210}
211
212/// Creates a new [`ExponentialBackoff`] instance based on the configured
213/// template.
214///
215/// Returns a fresh backoff instance that has been reset to its initial
216/// state, ensuring consistent retry behavior for each new operation.
217pub(crate) fn reset_backoff(backoff: &ExponentialBackoff) -> ExponentialBackoff {
218    let mut backoff = backoff.clone();
219    backoff.reset();
220    backoff
221}