iota_storage/object_store/
mod.rs1use std::sync::Arc;
6
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::{DynObjectStore, ObjectMeta, path::Path};
12
13pub mod http;
14pub mod util;
15
16#[async_trait]
17pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
18 async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
20}
21
22macro_rules! as_ref_get_ext_impl {
23 ($type:ty) => {
24 #[async_trait]
25 impl ObjectStoreGetExt for $type {
26 async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
27 self.as_ref().get_bytes(src).await
28 }
29 }
30 };
31}
32
33as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
34as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);
35
36#[async_trait]
37impl ObjectStoreGetExt for Arc<DynObjectStore> {
38 async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
39 self.get(src)
40 .await
41 .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
42 .bytes()
43 .await
44 .map_err(|e| {
45 anyhow!(
46 "Failed to collect GET result for file {} into bytes with error: {:?}",
47 src,
48 e
49 )
50 })
51 }
52}
53
54#[async_trait]
55pub trait ObjectStoreListExt: Send + Sync + 'static {
56 async fn list_objects(
58 &self,
59 src: Option<&Path>,
60 ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
61}
62
63macro_rules! as_ref_list_ext_impl {
64 ($type:ty) => {
65 #[async_trait]
66 impl ObjectStoreListExt for $type {
67 async fn list_objects(
68 &self,
69 src: Option<&Path>,
70 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
71 self.as_ref().list_objects(src).await
72 }
73 }
74 };
75}
76
77as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
78as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);
79
80#[async_trait]
81impl ObjectStoreListExt for Arc<DynObjectStore> {
82 async fn list_objects(
83 &self,
84 src: Option<&Path>,
85 ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
86 self.list(src)
87 }
88}
89
90#[async_trait]
91pub trait ObjectStorePutExt: Send + Sync + 'static {
92 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
94}
95
96macro_rules! as_ref_put_ext_impl {
97 ($type:ty) => {
98 #[async_trait]
99 impl ObjectStorePutExt for $type {
100 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
101 self.as_ref().put_bytes(src, bytes).await
102 }
103 }
104 };
105}
106
107as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
108as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);
109
110#[async_trait]
111impl ObjectStorePutExt for Arc<DynObjectStore> {
112 async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
113 self.put(src, bytes.into()).await?;
114 Ok(())
115 }
116}
117
118#[async_trait]
119pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
120 async fn delete_object(&self, src: &Path) -> Result<()>;
122}
123
124macro_rules! as_ref_delete_ext_impl {
125 ($type:ty) => {
126 #[async_trait]
127 impl ObjectStoreDeleteExt for $type {
128 async fn delete_object(&self, src: &Path) -> Result<()> {
129 self.as_ref().delete_object(src).await
130 }
131 }
132 };
133}
134
135as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
136as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);
137
138#[async_trait]
139
140impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
141 async fn delete_object(&self, src: &Path) -> Result<()> {
142 self.delete(src).await?;
143 Ok(())
144 }
145}