iota_storage/object_store/
mod.rs1use std::sync::Arc;
6
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::{DynObjectStore, ObjectMeta, ObjectStore, path::Path};
12
13pub mod http;
14pub mod util;
15
16#[async_trait]
17pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
18 async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
20}
21
22macro_rules! as_ref_get_ext_impl {
23 ($type:ty) => {
24 #[async_trait]
25 impl ObjectStoreGetExt for $type {
26 async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
27 self.as_ref().get_bytes(src).await
28 }
29 }
30 };
31}
32
33as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
34as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);
35
36macro_rules! as_ref_get_impl {
37 ($type:ty) => {
38 #[async_trait]
39 impl ObjectStoreGetExt for $type {
40 async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
41 self.get(src)
42 .await
43 .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
44 .bytes()
45 .await
46 .map_err(|e| {
47 anyhow!(
48 "Failed to collect GET result for file {} into bytes with error: {:?}",
49 src,
50 e
51 )
52 })
53 }
54 }
55 };
56}
57
58as_ref_get_impl!(Arc<dyn ObjectStore>);
59as_ref_get_impl!(Box<dyn ObjectStore>);
60
61#[async_trait]
62pub trait ObjectStoreListExt: Send + Sync + 'static {
63 async fn list_objects(
65 &self,
66 src: Option<&Path>,
67 ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
68}
69
70macro_rules! as_ref_list_ext_impl {
71 ($type:ty) => {
72 #[async_trait]
73 impl ObjectStoreListExt for $type {
74 async fn list_objects(
75 &self,
76 src: Option<&Path>,
77 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
78 self.as_ref().list_objects(src).await
79 }
80 }
81 };
82}
83
84as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
85as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);
86
87#[async_trait]
88impl ObjectStoreListExt for Arc<DynObjectStore> {
89 async fn list_objects(
90 &self,
91 src: Option<&Path>,
92 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
93 self.list(src)
94 }
95}
96
97#[async_trait]
98pub trait ObjectStorePutExt: Send + Sync + 'static {
99 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
101}
102
103macro_rules! as_ref_put_ext_impl {
104 ($type:ty) => {
105 #[async_trait]
106 impl ObjectStorePutExt for $type {
107 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
108 self.as_ref().put_bytes(src, bytes).await
109 }
110 }
111 };
112}
113
114as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
115as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);
116
117#[async_trait]
118impl ObjectStorePutExt for Arc<DynObjectStore> {
119 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
120 self.put(src, bytes.into()).await?;
121 Ok(())
122 }
123}
124
125#[async_trait]
126pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
127 async fn delete_object(&self, src: &Path) -> Result<()>;
129}
130
131macro_rules! as_ref_delete_ext_impl {
132 ($type:ty) => {
133 #[async_trait]
134 impl ObjectStoreDeleteExt for $type {
135 async fn delete_object(&self, src: &Path) -> Result<()> {
136 self.as_ref().delete_object(src).await
137 }
138 }
139 };
140}
141
142as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
143as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);
144
145#[async_trait]
146
147impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
148 async fn delete_object(&self, src: &Path) -> Result<()> {
149 self.delete(src).await?;
150 Ok(())
151 }
152}