iota_storage/object_store/http/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5mod gcs;
6mod local;
7mod s3;
8
9use std::sync::Arc;
10
11use anyhow::{Context, Result, anyhow};
12use chrono::{DateTime, Utc};
13use futures::{StreamExt, TryStreamExt};
14use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
15use object_store::{Error, GetResult, GetResultPayload, ObjectMeta, path::Path};
16use reqwest::{
17    Client, Method,
18    header::{CONTENT_LENGTH, ETAG, HeaderMap, LAST_MODIFIED},
19};
20
21use crate::object_store::{
22    ObjectStoreGetExt,
23    http::{gcs::GoogleCloudStorage, local::LocalStorage, s3::AmazonS3},
24};
25
26// https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
27//
28// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
29// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ).
30pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
31    .remove(b'-')
32    .remove(b'.')
33    .remove(b'_')
34    .remove(b'~');
35const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
36static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
37
38pub trait HttpDownloaderBuilder {
39    fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>>;
40}
41
42impl HttpDownloaderBuilder for ObjectStoreConfig {
43    fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>> {
44        match self.object_store {
45            Some(ObjectStoreType::File) => {
46                Ok(LocalStorage::new(self.directory.as_ref().unwrap()).map(Arc::new)?)
47            }
48            Some(ObjectStoreType::S3) => {
49                let bucket_endpoint = if let Some(endpoint) = &self.aws_endpoint {
50                    if self.aws_virtual_hosted_style_request {
51                        endpoint.clone()
52                    } else {
53                        let bucket = self.bucket.as_ref().unwrap();
54                        format!("{endpoint}/{bucket}")
55                    }
56                } else {
57                    let bucket = self.bucket.as_ref().unwrap();
58                    let region = self.aws_region.as_ref().unwrap();
59                    if self.aws_virtual_hosted_style_request {
60                        format!("https://{bucket}.s3.{region}.amazonaws.com")
61                    } else {
62                        format!("https://s3.{region}.amazonaws.com/{bucket}")
63                    }
64                };
65                Ok(AmazonS3::new(&bucket_endpoint).map(Arc::new)?)
66            }
67            Some(ObjectStoreType::GCS) => {
68                Ok(GoogleCloudStorage::new(self.bucket.as_ref().unwrap()).map(Arc::new)?)
69            }
70            _ => Err(anyhow!("At least one storage backend should be provided")),
71        }
72    }
73}
74
75async fn get(
76    url: &str,
77    store: &'static str,
78    location: &Path,
79    client: &Client,
80) -> Result<GetResult> {
81    let request = client.request(Method::GET, url);
82    let response = request.send().await.context("failed to get")?;
83    let meta = header_meta(location, response.headers()).context("Failed to get header")?;
84    let stream = response
85        .bytes_stream()
86        .map_err(|source| Error::Generic {
87            store,
88            source: Box::new(source),
89        })
90        .boxed();
91    Ok(GetResult {
92        range: 0..meta.size,
93        payload: GetResultPayload::Stream(stream),
94        meta,
95        attributes: object_store::Attributes::new(),
96    })
97}
98
99fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta> {
100    let last_modified = headers
101        .get(LAST_MODIFIED)
102        .context("Missing last modified")?;
103
104    let content_length = headers
105        .get(CONTENT_LENGTH)
106        .context("Missing content length")?;
107
108    let last_modified = last_modified.to_str().context("bad header")?;
109    let last_modified = DateTime::parse_from_rfc2822(last_modified)
110        .context("invalid last modified")?
111        .with_timezone(&Utc);
112
113    let content_length = content_length.to_str().context("bad header")?;
114    let content_length = content_length.parse().context("invalid content length")?;
115
116    let e_tag = headers.get(ETAG).context("missing etag")?;
117    let e_tag = e_tag.to_str().context("bad header")?;
118
119    Ok(ObjectMeta {
120        location: location.clone(),
121        last_modified,
122        size: content_length,
123        e_tag: Some(e_tag.to_string()),
124        version: None,
125    })
126}
127
128#[cfg(test)]
129mod tests {
130    use std::fs;
131
132    use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
133    use object_store::path::Path;
134    use tempfile::TempDir;
135
136    use crate::object_store::http::HttpDownloaderBuilder;
137
138    #[tokio::test]
139    pub async fn test_local_download() -> anyhow::Result<()> {
140        let input = TempDir::new()?;
141        let input_path = input.path();
142        let child = input_path.join("child");
143        fs::create_dir(&child)?;
144        let file1 = child.join("file1");
145        fs::write(file1, b"Lorem ipsum")?;
146        let grandchild = child.join("grand_child");
147        fs::create_dir(&grandchild)?;
148        let file2 = grandchild.join("file2");
149        fs::write(file2, b"Lorem ipsum")?;
150
151        let input_store = ObjectStoreConfig {
152            object_store: Some(ObjectStoreType::File),
153            directory: Some(input_path.to_path_buf()),
154            ..Default::default()
155        }
156        .make_http()?;
157
158        let downloaded = input_store.get_bytes(&Path::from("child/file1")).await?;
159        assert_eq!(downloaded.to_vec(), b"Lorem ipsum");
160        Ok(())
161    }
162}