iota_graphql_rpc/
data.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4
5pub(crate) mod package_resolver;
6pub(crate) mod pg;
7
8use std::sync::Arc;
9
10use async_graphql::dataloader::DataLoader as AGDataLoader;
11use async_trait::async_trait;
12use diesel::{
13    QueryResult,
14    query_builder::{BoxedSelectStatement, FromClause, QueryFragment, QueryId},
15    query_dsl::{LoadQuery, methods::LimitDsl},
16};
17
18use crate::error::Error;
19
20/// Database Backend in use -- abstracting a specific implementation.
21pub(crate) type Db = pg::PgExecutor;
22
23/// Helper types to access associated types on `Db`.
24pub(crate) type Conn<'c> = <Db as QueryExecutor>::DbConnection<'c>;
25pub(crate) type DieselConn = <Db as QueryExecutor>::Connection;
26pub(crate) type DieselBackend = <Db as QueryExecutor>::Backend;
27
28/// Helper types for accessing a shared `DataLoader` instance.
29#[derive(Clone)]
30pub(crate) struct DataLoader(pub Arc<AGDataLoader<Db>>);
31
32/// A generic boxed query (compatible with the return type of `into_boxed` on
33/// diesel's table DSL).
34///
35/// - ST is the SqlType of the rows selected.
36/// - QS is the QuerySource (the table(s) being selected from).
37/// - GB is the GroupBy clause.
38///
39/// These type parameters should usually be inferred by context.
40pub(crate) type Query<ST, QS, GB> =
41    BoxedSelectStatement<'static, ST, FromClause<QS>, DieselBackend, GB>;
42
43/// Interface for accessing relational data written by the Indexer, agnostic of
44/// the database back-end being used.
45#[async_trait]
46pub(crate) trait QueryExecutor {
47    type Backend: diesel::backend::Backend;
48    type Connection: diesel::Connection;
49
50    type DbConnection<'c>: DbConnection<Connection = Self::Connection, Backend = Self::Backend>
51    where
52        Self: 'c;
53
54    /// Execute `txn` with read committed isolation. `txn` is supplied a
55    /// database connection to issue queries over.
56    async fn execute<T, U, E>(&self, txn: T) -> Result<U, Error>
57    where
58        T: FnOnce(&mut Self::DbConnection<'_>) -> Result<U, E>,
59        E: From<diesel::result::Error> + std::error::Error,
60        T: Send + 'static,
61        U: Send + 'static,
62        E: Send + 'static;
63
64    /// Execute `txn` with repeatable reads and no phantom reads -- multiple
65    /// calls to the same query should produce the same results. `txn` is
66    /// supplied a database connection to issue queries over.
67    async fn execute_repeatable<T, U, E>(&self, txn: T) -> Result<U, Error>
68    where
69        T: FnOnce(&mut Self::DbConnection<'_>) -> Result<U, E>,
70        E: From<diesel::result::Error> + std::error::Error,
71        T: Send + 'static,
72        U: Send + 'static,
73        E: Send + 'static;
74}
75
76pub(crate) trait DbConnection {
77    type Backend: diesel::backend::Backend;
78    type Connection: diesel::Connection<Backend = Self::Backend>;
79
80    /// Run a query that fetches a single value. `query` is a thunk that returns
81    /// a query when called.
82    fn result<Q, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<U>
83    where
84        Q: diesel::query_builder::Query,
85        Q: LoadQuery<'static, Self::Connection, U>,
86        Q: QueryId + QueryFragment<Self::Backend>;
87
88    /// Run a query that fetches multiple values. `query` is a thunk that
89    /// returns a query when called.
90    fn results<Q, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<Vec<U>>
91    where
92        Q: diesel::query_builder::Query,
93        Q: LoadQuery<'static, Self::Connection, U>,
94        Q: QueryId + QueryFragment<Self::Backend>;
95
96    /// Helper to limit a query that fetches multiple values to return only its
97    /// first value. `query` is a thunk that returns a query when called.
98    fn first<Q: LimitDsl, U>(&mut self, query: impl Fn() -> Q) -> QueryResult<U>
99    where
100        <Q as LimitDsl>::Output: diesel::query_builder::Query,
101        <Q as LimitDsl>::Output: LoadQuery<'static, Self::Connection, U>,
102        <Q as LimitDsl>::Output: QueryId + QueryFragment<Self::Backend>,
103    {
104        self.result(move || query().limit(1i64))
105    }
106}
107
108impl DataLoader {
109    pub(crate) fn new(db: Db) -> Self {
110        Self(Arc::new(AGDataLoader::new(db, tokio::spawn)))
111    }
112}