iota_storage/object_store/
mod.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5use std::sync::Arc;
6
7use anyhow::{Result, anyhow};
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::{DynObjectStore, ObjectMeta, path::Path};
12
13pub mod http;
14pub mod util;
15
16#[async_trait]
17pub trait ObjectStoreGetExt: std::fmt::Display + Send + Sync + 'static {
18    /// Return the bytes at given path in object store
19    async fn get_bytes(&self, src: &Path) -> Result<Bytes>;
20}
21
22macro_rules! as_ref_get_ext_impl {
23    ($type:ty) => {
24        #[async_trait]
25        impl ObjectStoreGetExt for $type {
26            async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
27                self.as_ref().get_bytes(src).await
28            }
29        }
30    };
31}
32
33as_ref_get_ext_impl!(Arc<dyn ObjectStoreGetExt>);
34as_ref_get_ext_impl!(Box<dyn ObjectStoreGetExt>);
35
36#[async_trait]
37impl ObjectStoreGetExt for Arc<DynObjectStore> {
38    async fn get_bytes(&self, src: &Path) -> Result<Bytes> {
39        self.get(src)
40            .await
41            .map_err(|e| anyhow!("Failed to get file {} with error: {:?}", src, e))?
42            .bytes()
43            .await
44            .map_err(|e| {
45                anyhow!(
46                    "Failed to collect GET result for file {} into bytes with error: {:?}",
47                    src,
48                    e
49                )
50            })
51    }
52}
53
54#[async_trait]
55pub trait ObjectStoreListExt: Send + Sync + 'static {
56    /// List the objects at the given path in object store
57    async fn list_objects(
58        &self,
59        src: Option<&Path>,
60    ) -> BoxStream<'_, object_store::Result<ObjectMeta>>;
61}
62
63macro_rules! as_ref_list_ext_impl {
64    ($type:ty) => {
65        #[async_trait]
66        impl ObjectStoreListExt for $type {
67            async fn list_objects(
68                &self,
69                src: Option<&Path>,
70            ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
71                self.as_ref().list_objects(src).await
72            }
73        }
74    };
75}
76
77as_ref_list_ext_impl!(Arc<dyn ObjectStoreListExt>);
78as_ref_list_ext_impl!(Box<dyn ObjectStoreListExt>);
79
80#[async_trait]
81impl ObjectStoreListExt for Arc<DynObjectStore> {
82    async fn list_objects(
83        &self,
84        src: Option<&Path>,
85    ) -> BoxStream<'_, object_store::Result<ObjectMeta>> {
86        self.list(src)
87    }
88}
89
90#[async_trait]
91pub trait ObjectStorePutExt: Send + Sync + 'static {
92    /// Write the bytes at the given location in object store
93    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()>;
94}
95
96macro_rules! as_ref_put_ext_impl {
97    ($type:ty) => {
98        #[async_trait]
99        impl ObjectStorePutExt for $type {
100            async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
101                self.as_ref().put_bytes(src, bytes).await
102            }
103        }
104    };
105}
106
107as_ref_put_ext_impl!(Arc<dyn ObjectStorePutExt>);
108as_ref_put_ext_impl!(Box<dyn ObjectStorePutExt>);
109
110#[async_trait]
111impl ObjectStorePutExt for Arc<DynObjectStore> {
112    async fn put_bytes(&self, src: &Path, bytes: Bytes) -> Result<()> {
113        self.put(src, bytes.into()).await?;
114        Ok(())
115    }
116}
117
118#[async_trait]
119pub trait ObjectStoreDeleteExt: Send + Sync + 'static {
120    /// Delete the object at the given location in object store
121    async fn delete_object(&self, src: &Path) -> Result<()>;
122}
123
124macro_rules! as_ref_delete_ext_impl {
125    ($type:ty) => {
126        #[async_trait]
127        impl ObjectStoreDeleteExt for $type {
128            async fn delete_object(&self, src: &Path) -> Result<()> {
129                self.as_ref().delete_object(src).await
130            }
131        }
132    };
133}
134
135as_ref_delete_ext_impl!(Arc<dyn ObjectStoreDeleteExt>);
136as_ref_delete_ext_impl!(Box<dyn ObjectStoreDeleteExt>);
137
138#[async_trait]
139
140impl ObjectStoreDeleteExt for Arc<DynObjectStore> {
141    async fn delete_object(&self, src: &Path) -> Result<()> {
142        self.delete(src).await?;
143        Ok(())
144    }
145}