iota_storage/object_store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::{DynObjectStore, ObjectMeta, ObjectStore, path::Path};
12
13pub mod http;
14pub mod util;
15
16#[async_trait]
17pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
18    /// Return the bytes at given path in object store
19    async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
20}
21
22macro_rules! as_ref_get_ext_impl {
23    ($type:ty) => {
24        #[async_trait]
25        impl ObjectStoreGetExt for $type {
26            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
27                self.as_ref().get_bytes(src).await
28            }
29        }
30    };
31}
32
33as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
34as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);
35
36macro_rules! as_ref_get_impl {
37    ($type:ty) => {
38        #[async_trait]
39        impl ObjectStoreGetExt for $type {
40            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
41                self.get(src)
42                    .await
43                    .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
44                    .bytes()
45                    .await
46                    .map_err(|e| {
47                        anyhow!(
48                            "Failed to collect GET result for file {} into bytes with error: {:?}",
49                            src,
50                            e
51                        )
52                    })
53            }
54        }
55    };
56}
57
58as_ref_get_impl!(Arc<dyn ObjectStore>);
59as_ref_get_impl!(Box<dyn ObjectStore>);
60
61#[async_trait]
62pub trait ObjectStoreListExt: Send + Sync + 'static {
63    /// List the objects at the given path in object store
64    async fn list_objects(
65        &self,
66        src: Option<&Path>,
67    ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
68}
69
70macro_rules! as_ref_list_ext_impl {
71    ($type:ty) => {
72        #[async_trait]
73        impl ObjectStoreListExt for $type {
74            async fn list_objects(
75                &self,
76                src: Option<&Path>,
77            ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
78                self.as_ref().list_objects(src).await
79            }
80        }
81    };
82}
83
84as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
85as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);
86
87#[async_trait]
88impl ObjectStoreListExt for Arc<DynObjectStore> {
89    async fn list_objects(
90        &self,
91        src: Option<&Path>,
92    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
93        self.list(src)
94    }
95}
96
97#[async_trait]
98pub trait ObjectStorePutExt: Send + Sync + 'static {
99    /// Write the bytes at the given location in object store
100    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
101}
102
103macro_rules! as_ref_put_ext_impl {
104    ($type:ty) => {
105        #[async_trait]
106        impl ObjectStorePutExt for $type {
107            async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
108                self.as_ref().put_bytes(src, bytes).await
109            }
110        }
111    };
112}
113
114as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
115as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);
116
117#[async_trait]
118impl ObjectStorePutExt for Arc<DynObjectStore> {
119    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
120        self.put(src, bytes.into()).await?;
121        Ok(())
122    }
123}
124
125#[async_trait]
126pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
127    /// Delete the object at the given location in object store
128    async fn delete_object(&self, src: &Path) -> Result<()>;
129}
130
131macro_rules! as_ref_delete_ext_impl {
132    ($type:ty) => {
133        #[async_trait]
134        impl ObjectStoreDeleteExt for $type {
135            async fn delete_object(&self, src: &Path) -> Result<()> {
136                self.as_ref().delete_object(src).await
137            }
138        }
139    };
140}
141
142as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
143as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);
144
145#[async_trait]
146
147impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
148    async fn delete_object(&self, src: &Path) -> Result<()> {
149        self.delete(src).await?;
150        Ok(())
151    }
152}