iota_grpc_client/api/
common.rs

1// Copyright (c) 2026 IOTA Stiftung
2// SPDX-License-Identifier: Apache-2.0
3
4//! Common utilities shared across API modules.
5
6use iota_grpc_types::v1::{
7    bcs::BcsData,
8    ledger_service::{ObjectResult, TransactionResult, object_result, transaction_result},
9    transaction::{ExecutedTransaction, Transaction as ProtoTransaction},
10    transaction_execution_service::{
11        ExecuteTransactionResult, SimulateTransactionResult, SimulatedTransaction,
12        execute_transaction_result, simulate_transaction_result,
13    },
14    types::ObjectId as ProtoObjectId,
15};
16pub use iota_grpc_types::{
17    field::{FieldMask, FieldMaskUtil},
18    google::rpc::Status as RpcStatus,
19    proto::TryFromProtoError,
20};
21use iota_sdk_types::{Digest, ObjectId};
22use serde::Serialize;
23
24use super::MetadataEnvelope;
25
26/// Errors that can occur during gRPC client API operations.
27#[derive(Debug, thiserror::Error)]
28pub enum Error {
29    /// Error converting proto types to SDK types.
30    #[error("proto conversion error: {0}")]
31    ProtoConversion(#[from] TryFromProtoError),
32
33    /// Per-item error returned by the server (preserves code, message,
34    /// details).
35    #[error("server error (code {code}): {msg}", code = .0.code, msg = .0.message)]
36    Server(RpcStatus),
37
38    /// Client-side protocol error (e.g. checkpoint stream reassembly).
39    #[error("protocol error: {0}")]
40    Protocol(String),
41
42    /// Error converting signatures.
43    #[error("signature conversion error: {0}")]
44    Signature(String),
45
46    /// The caller passed an empty request (e.g. no object IDs or digests).
47    #[error("empty request: at least one item must be provided")]
48    EmptyRequest,
49
50    /// The server stream ended unexpectedly while `has_next` was still true.
51    #[error("stream ended unexpectedly: server indicated more results with has_next=true")]
52    UnexpectedEndOfStream,
53
54    /// gRPC transport or protocol error.
55    #[error("grpc error: {0}")]
56    Grpc(#[from] tonic::Status),
57}
58
59impl From<Error> for tonic::Status {
60    fn from(err: Error) -> Self {
61        match err {
62            Error::ProtoConversion(e) => {
63                tonic::Status::internal(format!("proto conversion error: {e}"))
64            }
65            Error::Server(status) => status.to_tonic_status(),
66            Error::Protocol(msg) => tonic::Status::internal(format!("protocol error: {msg}")),
67            Error::Signature(msg) => {
68                tonic::Status::internal(format!("signature conversion error: {msg}"))
69            }
70            Error::EmptyRequest => {
71                tonic::Status::invalid_argument("empty request: at least one item must be provided")
72            }
73            Error::UnexpectedEndOfStream => {
74                tonic::Status::internal("stream ended unexpectedly: has_next was true")
75            }
76            Error::Grpc(status) => status,
77        }
78    }
79}
80
81/// Result type alias for API operations.
82pub type Result<T> = std::result::Result<T, Error>;
83
84// =============================================================================
85// Field Masks
86// =============================================================================
87
88/// Build a field mask with a custom value or default.
89///
90/// This is a convenience helper that handles the common pattern of using
91/// a user-provided field mask or falling back to a default.
92pub fn field_mask_with_default(custom: Option<&str>, default: &str) -> FieldMask {
93    FieldMask::from_str(custom.unwrap_or(default))
94}
95
96/// Safely convert a `usize` to `u32`, saturating at `u32::MAX` instead of
97/// silently truncating on 64-bit platforms.
98pub fn saturating_usize_to_u32(value: usize) -> u32 {
99    u32::try_from(value).unwrap_or(u32::MAX)
100}
101
102/// A trait for proto result types that follow the pattern of having
103/// `Some(Result::Value)`, `Some(Result::Error)`, or `None`.
104///
105/// This allows generic handling of gRPC response results that can be either
106/// a success value, a server error, or missing.
107pub trait ProtoResult {
108    /// The success value type.
109    type Value;
110
111    /// Extract the result, converting to our error types.
112    fn into_result(self) -> Result<Self::Value>;
113}
114
115impl ProtoResult for ObjectResult {
116    type Value = iota_grpc_types::v1::object::Object;
117
118    fn into_result(self) -> Result<Self::Value> {
119        match self.result {
120            Some(object_result::Result::Object(obj)) => Ok(obj),
121            Some(object_result::Result::Error(e)) => Err(Error::Server(e)),
122            None => Err(TryFromProtoError::missing("result").into()),
123            Some(_) => Err(Error::Protocol("Unknown object result type".into())),
124        }
125    }
126}
127
128impl ProtoResult for TransactionResult {
129    type Value = ExecutedTransaction;
130
131    fn into_result(self) -> Result<Self::Value> {
132        match self.result {
133            Some(transaction_result::Result::ExecutedTransaction(tx)) => Ok(tx),
134            Some(transaction_result::Result::Error(e)) => Err(Error::Server(e)),
135            None => Err(TryFromProtoError::missing("result").into()),
136            Some(_) => Err(Error::Protocol("Unknown transaction result type".into())),
137        }
138    }
139}
140
141impl ProtoResult for ExecuteTransactionResult {
142    type Value = ExecutedTransaction;
143
144    fn into_result(self) -> Result<Self::Value> {
145        match self.result {
146            Some(execute_transaction_result::Result::ExecutedTransaction(tx)) => Ok(tx),
147            Some(execute_transaction_result::Result::Error(e)) => Err(Error::Server(e)),
148            None => Err(TryFromProtoError::missing("result").into()),
149            Some(_) => Err(Error::Protocol(
150                "Unknown execute transaction result type".into(),
151            )),
152        }
153    }
154}
155
156impl ProtoResult for SimulateTransactionResult {
157    type Value = SimulatedTransaction;
158
159    fn into_result(self) -> Result<Self::Value> {
160        match self.result {
161            Some(simulate_transaction_result::Result::SimulatedTransaction(tx)) => Ok(tx),
162            Some(simulate_transaction_result::Result::Error(e)) => Err(Error::Server(e)),
163            None => Err(TryFromProtoError::missing("result").into()),
164            Some(_) => Err(Error::Protocol(
165                "Unknown simulate transaction result type".into(),
166            )),
167        }
168    }
169}
170
171/// Collect all items from a paginated gRPC stream into a single `Vec`.
172///
173/// This handles the common pattern of iterating over a `tonic::Streaming<T>`,
174/// extracting items from each message via the `extract` closure, and checking
175/// that the stream was not truncated (i.e. `has_next` is `false` on the last
176/// message).
177///
178/// The `extract` closure receives each stream message and must return
179/// `(has_next, items)`.  Because some streams require fallible per-item
180/// conversion (e.g. via [`ProtoResult`]), the closure itself returns
181/// `Result<…>`.
182pub async fn collect_stream<T, I, F>(
183    mut stream: tonic::Streaming<T>,
184    metadata: tonic::metadata::MetadataMap,
185    extract: F,
186) -> Result<MetadataEnvelope<Vec<I>>>
187where
188    F: Fn(T) -> Result<(bool, Vec<I>)>,
189{
190    let mut results = Vec::new();
191    let mut has_next = false;
192
193    while let Some(response) = stream.message().await? {
194        let (next, items) = extract(response)?;
195        has_next = next;
196        results.extend(items);
197    }
198
199    if has_next {
200        return Err(Error::UnexpectedEndOfStream);
201    }
202
203    Ok(MetadataEnvelope::new(results, metadata))
204}
205
206/// A single page of results from a paginated list endpoint.
207///
208/// Returned when awaiting a list query builder directly (single-page mode).
209/// Contains the items from this page plus an optional continuation token.
210#[derive(Debug, Clone)]
211pub struct Page<T> {
212    /// The items returned in this page.
213    pub items: Vec<T>,
214    /// Token to retrieve the next page. `None` when this is the last page.
215    pub next_page_token: Option<::prost::bytes::Bytes>,
216}
217
218/// Generate a paginated query builder for a list endpoint.
219///
220/// The generated struct implements [`IntoFuture`](std::future::IntoFuture) for
221/// single-page retrieval and provides a [`collect`] method for auto-pagination.
222///
223/// # Parameters
224///
225/// - `$query_name` — name of the generated builder struct
226/// - `$service_client_type` — the tonic service client type
227/// - `$item_type` — the item type in the response vec
228/// - `$rpc_method` — the RPC method name on the service client
229/// - `$items_field` — the field name on the response containing the items vec
230///
231/// # Example
232///
233/// ```ignore
234/// define_list_query! {
235///     pub struct ListOwnedObjectsQuery {
236///         service_client: StateServiceClient<InterceptedChannel>,
237///         request: ListOwnedObjectsRequest,
238///         item: Object,
239///         rpc_method: list_owned_objects,
240///         items_field: objects,
241///     }
242/// }
243/// ```
244macro_rules! define_list_query {
245    (
246        $(#[$meta:meta])*
247        pub struct $query_name:ident {
248            service_client: $service_client_type:ty,
249            request: $request_type:ty,
250            item: $item_type:ty,
251            rpc_method: $rpc_method:ident,
252            items_field: $items_field:ident,
253        }
254    ) => {
255        $(#[$meta])*
256        pub struct $query_name {
257            service_client: $service_client_type,
258            base_request: $request_type,
259            max_message_size: Option<usize>,
260            page_size: Option<u32>,
261            page_token: Option<::prost::bytes::Bytes>,
262        }
263
264        impl $query_name {
265            pub(crate) fn new(
266                service_client: $service_client_type,
267                base_request: $request_type,
268                max_message_size: Option<usize>,
269                page_size: Option<u32>,
270                page_token: Option<::prost::bytes::Bytes>,
271            ) -> Self {
272                Self {
273                    service_client,
274                    base_request,
275                    max_message_size,
276                    page_size,
277                    page_token,
278                }
279            }
280
281            /// Auto-paginate through all pages, collecting up to `limit` items.
282            ///
283            /// If `limit` is `None`, collects all items across all pages.
284            pub async fn collect(
285                self,
286                limit: Option<u32>,
287            ) -> $crate::api::Result<$crate::api::MetadataEnvelope<Vec<$item_type>>> {
288                let mut all_items = Vec::new();
289                let mut next_page_token = self.page_token;
290                let mut result_metadata = None;
291                let mut service_client = self.service_client;
292
293                loop {
294                    let mut request = self.base_request.clone();
295
296                    // Cap page_size to the remaining items needed when a
297                    // limit is set, so we don't over-fetch from the server.
298                    let effective_page_size = match (self.page_size, limit) {
299                        (Some(ps), Some(l)) => {
300                            let remaining = (l as usize).saturating_sub(all_items.len());
301                            Some(ps.min(remaining as u32))
302                        }
303                        (Some(ps), None) => Some(ps),
304                        (None, Some(l)) => {
305                            let remaining = (l as usize).saturating_sub(all_items.len());
306                            Some(remaining as u32)
307                        }
308                        (None, None) => None,
309                    };
310                    if let Some(ps) = effective_page_size {
311                        request = request.with_page_size(ps);
312                    }
313                    if let Some(token) = next_page_token.take() {
314                        request = request.with_page_token(token);
315                    }
316                    if let Some(max_size) = self.max_message_size {
317                        request = request.with_max_message_size_bytes(
318                            $crate::api::saturating_usize_to_u32(max_size),
319                        );
320                    }
321
322                    let response = service_client.$rpc_method(request).await?;
323                    let (body, metadata) =
324                        $crate::api::MetadataEnvelope::from(response).into_parts();
325                    if result_metadata.is_none() {
326                        result_metadata = Some(metadata);
327                    }
328
329                    all_items.extend(body.$items_field);
330
331                    match body.next_page_token {
332                        Some(token) => next_page_token = Some(token),
333                        None => break,
334                    }
335
336                    if limit.is_some_and(|l| all_items.len() >= l as usize) {
337                        break;
338                    }
339                }
340
341                Ok($crate::api::MetadataEnvelope::new(
342                    all_items,
343                    result_metadata.unwrap_or_default(),
344                ))
345            }
346        }
347
348        impl ::std::future::IntoFuture for $query_name {
349            type Output = $crate::api::Result<
350                $crate::api::MetadataEnvelope<$crate::api::Page<$item_type>>,
351            >;
352            type IntoFuture = ::std::pin::Pin<
353                Box<dyn ::std::future::Future<Output = Self::Output> + Send>,
354            >;
355
356            fn into_future(self) -> Self::IntoFuture {
357                Box::pin(async move {
358                    let mut service_client = self.service_client;
359                    let mut request = self.base_request;
360
361                    if let Some(ps) = self.page_size {
362                        request = request.with_page_size(ps);
363                    }
364                    if let Some(token) = self.page_token {
365                        request = request.with_page_token(token);
366                    }
367                    if let Some(max_size) = self.max_message_size {
368                        request = request.with_max_message_size_bytes(
369                            $crate::api::saturating_usize_to_u32(max_size),
370                        );
371                    }
372
373                    let response = service_client.$rpc_method(request).await?;
374                    let (body, metadata) =
375                        $crate::api::MetadataEnvelope::from(response).into_parts();
376
377                    Ok($crate::api::MetadataEnvelope::new(
378                        $crate::api::Page {
379                            items: body.$items_field,
380                            next_page_token: body.next_page_token,
381                        },
382                        metadata,
383                    ))
384                })
385            }
386        }
387    };
388}
389
390pub(crate) use define_list_query;
391
392/// Convert an `ObjectId` to the gRPC proto `ObjectId` type.
393pub fn proto_object_id(id: ObjectId) -> ProtoObjectId {
394    ProtoObjectId::default().with_object_id(id.inner().to_vec())
395}
396
397/// Build a proto Transaction from serializable transaction data and digest.
398pub fn build_proto_transaction<T: Serialize>(data: &T, digest: Digest) -> Result<ProtoTransaction> {
399    let bcs = BcsData::serialize(data)
400        .map_err(|e| Error::from(TryFromProtoError::invalid("transaction", e)))?;
401
402    let proto_transaction = ProtoTransaction::default()
403        .with_digest(digest)
404        .with_bcs(bcs);
405
406    Ok(proto_transaction)
407}