iota_storage/object_store/http/
mod.rs1mod gcs;
6mod local;
7mod s3;
8
9use std::sync::Arc;
10
11use anyhow::{Context, Result, anyhow};
12use chrono::{DateTime, Utc};
13use futures::{StreamExt, TryStreamExt};
14use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
15use object_store::{Error, GetResult, GetResultPayload, ObjectMeta, path::Path};
16use reqwest::{
17 Client, Method,
18 header::{CONTENT_LENGTH, ETAG, HeaderMap, LAST_MODIFIED},
19};
20
21use crate::object_store::{
22 ObjectStoreGetExt,
23 http::{gcs::GoogleCloudStorage, local::LocalStorage, s3::AmazonS3},
24};
25
26pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet = percent_encoding::NON_ALPHANUMERIC
31 .remove(b'-')
32 .remove(b'.')
33 .remove(b'_')
34 .remove(b'~');
35const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');
36static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
37
38pub trait HttpDownloaderBuilder {
39 fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>>;
40}
41
42impl HttpDownloaderBuilder for ObjectStoreConfig {
43 fn make_http(&self) -> Result<Arc<dyn ObjectStoreGetExt>> {
44 match self.object_store {
45 Some(ObjectStoreType::File) => {
46 Ok(LocalStorage::new(self.directory.as_ref().unwrap()).map(Arc::new)?)
47 }
48 Some(ObjectStoreType::S3) => {
49 let bucket_endpoint = if let Some(endpoint) = &self.aws_endpoint {
50 if self.aws_virtual_hosted_style_request {
51 endpoint.clone()
52 } else {
53 let bucket = self.bucket.as_ref().unwrap();
54 format!("{endpoint}/{bucket}")
55 }
56 } else {
57 let bucket = self.bucket.as_ref().unwrap();
58 let region = self.aws_region.as_ref().unwrap();
59 if self.aws_virtual_hosted_style_request {
60 format!("https://{bucket}.s3.{region}.amazonaws.com")
61 } else {
62 format!("https://s3.{region}.amazonaws.com/{bucket}")
63 }
64 };
65 Ok(AmazonS3::new(&bucket_endpoint).map(Arc::new)?)
66 }
67 Some(ObjectStoreType::GCS) => {
68 Ok(GoogleCloudStorage::new(self.bucket.as_ref().unwrap()).map(Arc::new)?)
69 }
70 _ => Err(anyhow!("At least one storage backend should be provided")),
71 }
72 }
73}
74
75async fn get(
76 url: &str,
77 store: &'static str,
78 location: &Path,
79 client: &Client,
80) -> Result<GetResult> {
81 let request = client.request(Method::GET, url);
82 let response = request.send().await.context("failed to get")?;
83 let meta = header_meta(location, response.headers()).context("Failed to get header")?;
84 let stream = response
85 .bytes_stream()
86 .map_err(|source| Error::Generic {
87 store,
88 source: Box::new(source),
89 })
90 .boxed();
91 Ok(GetResult {
92 range: 0..meta.size,
93 payload: GetResultPayload::Stream(stream),
94 meta,
95 attributes: object_store::Attributes::new(),
96 })
97}
98
99fn header_meta(location: &Path, headers: &HeaderMap) -> Result<ObjectMeta> {
100 let last_modified = headers
101 .get(LAST_MODIFIED)
102 .context("Missing last modified")?;
103
104 let content_length = headers
105 .get(CONTENT_LENGTH)
106 .context("Missing content length")?;
107
108 let last_modified = last_modified.to_str().context("bad header")?;
109 let last_modified = DateTime::parse_from_rfc2822(last_modified)
110 .context("invalid last modified")?
111 .with_timezone(&Utc);
112
113 let content_length = content_length.to_str().context("bad header")?;
114 let content_length = content_length.parse().context("invalid content length")?;
115
116 let e_tag = headers.get(ETAG).context("missing etag")?;
117 let e_tag = e_tag.to_str().context("bad header")?;
118
119 Ok(ObjectMeta {
120 location: location.clone(),
121 last_modified,
122 size: content_length,
123 e_tag: Some(e_tag.to_string()),
124 version: None,
125 })
126}
127
128#[cfg(test)]
129mod tests {
130 use std::fs;
131
132 use iota_config::object_storage_config::{ObjectStoreConfig, ObjectStoreType};
133 use object_store::path::Path;
134 use tempfile::TempDir;
135
136 use crate::object_store::http::HttpDownloaderBuilder;
137
138 #[tokio::test]
139 pub async fn test_local_download() -> anyhow::Result<()> {
140 let input = TempDir::new()?;
141 let input_path = input.path();
142 let child = input_path.join("child");
143 fs::create_dir(&child)?;
144 let file1 = child.join("file1");
145 fs::write(file1, b"Lorem ipsum")?;
146 let grandchild = child.join("grand_child");
147 fs::create_dir(&grandchild)?;
148 let file2 = grandchild.join("file2");
149 fs::write(file2, b"Lorem ipsum")?;
150
151 let input_store = ObjectStoreConfig {
152 object_store: Some(ObjectStoreType::File),
153 directory: Some(input_path.to_path_buf()),
154 ..Default::default()
155 }
156 .make_http()?;
157
158 let downloaded = input_store.get_bytes(&Path::from("child/file1")).await?;
159 assert_eq!(downloaded.to_vec(), b"Lorem ipsum");
160 Ok(())
161 }
162}