iota_graphql_rpc/server/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use std::{
5    any::Any,
6    convert::Infallible,
7    net::{SocketAddr, TcpStream},
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_graphql::{
13    Context, Data, Schema, SchemaBuilder,
14    extensions::{ApolloTracing, ExtensionFactory, Tracing},
15    http::ALL_WEBSOCKET_PROTOCOLS,
16};
17use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
18use axum::{
19    Extension, Router,
20    body::Body,
21    extract::{
22        ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
23    },
24    http::{HeaderMap, StatusCode},
25    middleware::{self},
26    response::{IntoResponse, Response as AxumResponse},
27    routing::{MethodRouter, Route, get, post},
28};
29use axum_extra::{TypedHeader, headers::ContentLength};
30use chrono::Utc;
31use http::{HeaderValue, Method, Request};
32use iota_graphql_rpc_headers::LIMITS_HEADER;
33use iota_indexer::{
34    apis::{OptimisticWriteApi, WriteApi},
35    db::{get_pool_connection, setup_postgres::check_db_migration_consistency},
36    metrics::IndexerMetrics,
37    optimistic_indexing::OptimisticTransactionExecutor,
38    read::IndexerReader,
39    store::PgIndexerStore,
40};
41use iota_json_rpc_api::CLIENT_SDK_TYPE_HEADER;
42use iota_metrics::spawn_monitored_task;
43use iota_network_stack::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
44use iota_package_resolver::{PackageStoreWithLruCache, Resolver};
45use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
46use tokio::{join, net::TcpListener, sync::OnceCell};
47use tokio_util::sync::CancellationToken;
48use tower::{Layer, Service};
49use tower_http::cors::{AllowOrigin, CorsLayer};
50use tracing::{info, warn};
51use uuid::Uuid;
52
53use crate::{
54    config::{
55        ConnectionConfig, MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
56        ServerConfig, ServiceConfig, Version,
57    },
58    context_data::db_data_provider::PgManager,
59    data::{
60        DataLoader, Db,
61        package_resolver::{DbPackageStore, PackageResolver},
62    },
63    error::Error,
64    extensions::{
65        directive_checker::DirectiveChecker,
66        feature_gate::FeatureGate,
67        logger::Logger,
68        query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
69        timeout::Timeout,
70    },
71    metrics::Metrics,
72    mutation::Mutation,
73    server::{
74        exchange_rates_task::TriggerExchangeRatesTask,
75        system_package_task::SystemPackageTask,
76        version::{check_version_middleware, set_version_middleware},
77        watermark_task::{Watermark, WatermarkLock, WatermarkTask},
78    },
79    types::{
80        chain_identifier::ChainIdentifierCache,
81        datatype::IMoveDatatype,
82        move_object::IMoveObject,
83        object::IObject,
84        owner::IOwner,
85        query::{IotaGraphQLSchema, Query},
86        subscription::{GraphQLStream, Subscription},
87    },
88};
89
90/// The default allowed maximum lag between the current timestamp and the
91/// checkpoint timestamp.
92const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
93/// The maximum complexity allowed for native [`async_graphql`] checks.
94const MAX_COMPLEXITY: usize = 6000;
95
96pub(crate) struct Server {
97    router: Router,
98    address: SocketAddr,
99    watermark_task: WatermarkTask,
100    system_package_task: SystemPackageTask,
101    trigger_exchange_rates_task: TriggerExchangeRatesTask,
102    state: AppState,
103}
104
105impl Server {
106    /// Start the GraphQL service and any background tasks it is dependent on.
107    /// When a cancellation signal is received, the method waits for all
108    /// tasks to complete before returning.
109    pub async fn run(mut self) -> Result<(), Error> {
110        get_or_init_server_start_time().await;
111
112        // A handle that spawns a background task to periodically update the
113        // `Watermark`, which consists of the checkpoint upper bound and current
114        // epoch.
115        let watermark_task = {
116            info!("Starting watermark update task");
117            spawn_monitored_task!(async move {
118                self.watermark_task.run().await;
119            })
120        };
121
122        // A handle that spawns a background task to evict system packages on epoch
123        // changes.
124        let system_package_task = {
125            info!("Starting system package task");
126            spawn_monitored_task!(async move {
127                self.system_package_task.run().await;
128            })
129        };
130
131        let trigger_exchange_rates_task = {
132            info!("Starting trigger exchange rates task");
133            spawn_monitored_task!(async move {
134                self.trigger_exchange_rates_task.run().await;
135            })
136        };
137
138        let server_task = {
139            info!("Starting graphql service");
140            let cancellation_token = self.state.cancellation_token.clone();
141            let address = self.address;
142            let router = self.router;
143            spawn_monitored_task!(async move {
144                axum::serve(
145                    TcpListener::bind(address)
146                        .await
147                        .map_err(|e| Error::Internal(format!("listener bind failed: {e}")))?,
148                    router.into_make_service_with_connect_info::<SocketAddr>(),
149                )
150                .with_graceful_shutdown(async move {
151                    cancellation_token.cancelled().await;
152                    info!("Shutdown signal received, terminating graphql service");
153                })
154                .await
155                .map_err(|e| Error::Internal(format!("Server run failed: {e}")))
156            })
157        };
158
159        // Wait for all tasks to complete. This ensures that the service doesn't fully
160        // shut down until all tasks and the server have completed their
161        // shutdown processes.
162        let _ = join!(
163            watermark_task,
164            system_package_task,
165            trigger_exchange_rates_task,
166            server_task
167        );
168
169        Ok(())
170    }
171}
172
173pub(crate) struct ServerBuilder {
174    state: AppState,
175    schema: SchemaBuilder<Query, Mutation, Subscription>,
176    router: Option<Router>,
177    db_reader: Option<Db>,
178    resolver: Option<PackageResolver>,
179}
180
181#[derive(Clone)]
182pub(crate) struct AppState {
183    connection: ConnectionConfig,
184    service: ServiceConfig,
185    metrics: Metrics,
186    cancellation_token: CancellationToken,
187    pub version: Version,
188}
189
190impl AppState {
191    pub(crate) fn new(
192        connection: ConnectionConfig,
193        service: ServiceConfig,
194        metrics: Metrics,
195        cancellation_token: CancellationToken,
196        version: Version,
197    ) -> Self {
198        Self {
199            connection,
200            service,
201            metrics,
202            cancellation_token,
203            version,
204        }
205    }
206}
207
208impl FromRef<AppState> for ConnectionConfig {
209    fn from_ref(app_state: &AppState) -> ConnectionConfig {
210        app_state.connection.clone()
211    }
212}
213
214impl FromRef<AppState> for Metrics {
215    fn from_ref(app_state: &AppState) -> Metrics {
216        app_state.metrics.clone()
217    }
218}
219
220impl ServerBuilder {
221    pub fn new(state: AppState) -> Self {
222        Self {
223            state,
224            schema: schema_builder(),
225            router: None,
226            db_reader: None,
227            resolver: None,
228        }
229    }
230
231    pub fn address(&self) -> String {
232        format!(
233            "{}:{}",
234            self.state.connection.host, self.state.connection.port
235        )
236    }
237
238    pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
239        self.schema = self.schema.data(context_data);
240        self
241    }
242
243    pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
244        self.schema = self.schema.extension(extension);
245        self
246    }
247
248    fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
249        self.schema.finish()
250    }
251
252    /// Prepares the components of the server to be run. Finalizes the graphql
253    /// schema, and expects the `Db` and `Router` to have been initialized.
254    fn build_components(
255        self,
256    ) -> (
257        String,
258        Schema<Query, Mutation, Subscription>,
259        Db,
260        PackageResolver,
261        Router,
262    ) {
263        let address = self.address();
264        let ServerBuilder {
265            state: _,
266            schema,
267            db_reader,
268            resolver,
269            router,
270        } = self;
271        (
272            address,
273            schema.finish(),
274            db_reader.expect("db reader not initialized"),
275            resolver.expect("package resolver not initialized"),
276            router.expect("router not initialized"),
277        )
278    }
279
280    fn init_router(&mut self) {
281        if self.router.is_none() {
282            let router: Router = Router::new()
283                .route("/", post(graphql_handler))
284                .route("/subscriptions", get(subscription_handler))
285                .route("/{version}", post(graphql_handler))
286                .route("/{version}/subscriptions", get(subscription_handler))
287                .route("/graphql", post(graphql_handler))
288                .route("/graphql/subscriptions", get(subscription_handler))
289                .route("/graphql/{version}", post(graphql_handler))
290                .route(
291                    "/graphql/{version}/subscriptions",
292                    get(subscription_handler),
293                )
294                .route("/health", get(health_check))
295                .route("/graphql/health", get(health_check))
296                .route("/graphql/{version}/health", get(health_check))
297                .with_state(self.state.clone())
298                .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
299                    metrics: self.state.metrics.clone(),
300                }));
301            self.router = Some(router);
302        }
303    }
304
305    pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
306        self.init_router();
307        self.router = self.router.map(|router| router.route(path, method_handler));
308        self
309    }
310
311    pub fn layer<L>(mut self, layer: L) -> Self
312    where
313        L: Layer<Route> + Clone + Send + Sync + 'static,
314        L::Service: Service<Request<Body>> + Clone + Send + Sync + 'static,
315        <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
316        <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
317        <L::Service as Service<Request<Body>>>::Future: Send + 'static,
318    {
319        self.init_router();
320        self.router = self.router.map(|router| router.layer(layer));
321        self
322    }
323
324    fn cors() -> Result<CorsLayer, Error> {
325        let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
326            Ok(value) => {
327                let allow_hosts = value
328                    .split(',')
329                    .map(HeaderValue::from_str)
330                    .collect::<Result<Vec<_>, _>>()
331                    .map_err(|_| {
332                        Error::Internal(
333                            "Cannot resolve access control origin env variable".to_string(),
334                        )
335                    })?;
336                AllowOrigin::list(allow_hosts)
337            }
338            _ => AllowOrigin::any(),
339        };
340        info!("Access control allow origin set to: {acl:?}");
341
342        let cors = CorsLayer::new()
343            // Allow `POST` & `GET` when accessing the resource
344            .allow_methods([Method::POST, Method::GET])
345            // Allow requests from any origin
346            .allow_origin(acl)
347            .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
348        Ok(cors)
349    }
350
351    /// Consumes the `ServerBuilder` to create a `Server` that can be run.
352    pub fn build(self) -> Result<Server, Error> {
353        let state = self.state.clone();
354        let (address, schema, db_reader, resolver, router) = self.build_components();
355
356        // Initialize the watermark background task struct.
357        let watermark_task = WatermarkTask::new(
358            db_reader.clone(),
359            state.metrics.clone(),
360            std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
361            state.cancellation_token.clone(),
362        );
363
364        let system_package_task = SystemPackageTask::new(
365            resolver,
366            watermark_task.epoch_receiver(),
367            state.cancellation_token.clone(),
368        );
369
370        let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
371            db_reader,
372            watermark_task.epoch_receiver(),
373            state.cancellation_token.clone(),
374        );
375
376        let router = router
377            .route_layer(middleware::from_fn_with_state(
378                state.version,
379                set_version_middleware,
380            ))
381            .route_layer(middleware::from_fn_with_state(
382                state.version,
383                check_version_middleware,
384            ))
385            .layer(axum::extract::Extension(schema))
386            .layer(axum::extract::Extension(watermark_task.lock()))
387            .layer(Self::cors()?);
388
389        Ok(Server {
390            router,
391            address: address
392                .parse()
393                .map_err(|_| Error::Internal(format!("Failed to parse address {address}")))?,
394            watermark_task,
395            system_package_task,
396            trigger_exchange_rates_task,
397            state,
398        })
399    }
400
401    /// Instantiate a `ServerBuilder` from a `ServerConfig`, typically called
402    /// when building the graphql service for production usage.
403    pub async fn from_config(
404        config: &ServerConfig,
405        version: &Version,
406        cancellation_token: CancellationToken,
407    ) -> Result<Self, Error> {
408        // PROMETHEUS
409        let prom_addr: SocketAddr = format!(
410            "{}:{}",
411            config.connection.prom_host, config.connection.prom_port
412        )
413        .parse()
414        .map_err(|_| {
415            Error::Internal(format!(
416                "Failed to parse url {}, port {} into socket address",
417                config.connection.prom_host, config.connection.prom_port
418            ))
419        })?;
420
421        let registry_service = iota_metrics::start_prometheus_server(prom_addr);
422        info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
423        let registry = registry_service.default_registry();
424        registry
425            .register(iota_metrics::uptime_metric(
426                "graphql",
427                version.full,
428                "unknown",
429            ))
430            .unwrap();
431
432        // METRICS
433        let metrics = Metrics::new(&registry);
434        let indexer_metrics = IndexerMetrics::new(&registry);
435        let state = AppState::new(
436            config.connection.clone(),
437            config.service.clone(),
438            metrics.clone(),
439            cancellation_token,
440            *version,
441        );
442        let mut builder = ServerBuilder::new(state);
443
444        let iota_names_config = config.service.iota_names.clone();
445        let zklogin_config = config.service.zklogin.clone();
446        let reader = PgManager::reader_with_config(
447            config.connection.db_url.clone(),
448            config.connection.db_pool_size,
449            // Bound each statement in a request with the overall request timeout, to bound DB
450            // utilisation (in the worst case we will use 2x the request timeout time in DB wall
451            // time).
452            config.service.limits.request_timeout_ms.into(),
453        )
454        .map_err(|e| Error::ServerInit(format!("Failed to create pg connection pool: {e}")))?;
455
456        if !config.connection.skip_migration_consistency_check {
457            // Compatibility check
458            info!("Starting compatibility check");
459            let mut connection = get_pool_connection(&reader.get_pool())?;
460            check_db_migration_consistency(&mut connection)?;
461            info!("Compatibility check passed");
462        }
463
464        // DB
465        let db = Db::new(
466            reader.clone(),
467            config.service.limits.clone(),
468            metrics.clone(),
469        );
470        let loader = DataLoader::new(db.clone());
471        let pg_conn_pool = PgManager::new(reader.clone());
472        let package_store = DbPackageStore::new(loader.clone());
473        let resolver = Arc::new(Resolver::new_with_limits(
474            PackageStoreWithLruCache::new(package_store),
475            config.service.limits.package_resolver_limits(),
476        ));
477
478        builder.db_reader = Some(db.clone());
479        builder.resolver = Some(resolver.clone());
480
481        let Some(fullnode_url) = config.tx_exec_full_node.node_rpc_url.as_ref() else {
482            return Err(Error::ServerInit(
483                "No fullnode url found in config".to_string(),
484            ));
485        };
486
487        let graphql_streams =
488            GraphQLStream::new(&config.connection.db_url, reader.clone(), &registry).await?;
489        let write_api = build_write_api(fullnode_url, reader, indexer_metrics)?;
490
491        builder = builder
492            .context_data(config.service.clone())
493            .context_data(loader)
494            .context_data(db)
495            .context_data(pg_conn_pool)
496            .context_data(resolver)
497            .context_data(write_api)
498            .context_data(iota_names_config)
499            .context_data(zklogin_config)
500            .context_data(metrics.clone())
501            .context_data(config.clone())
502            .context_data(graphql_streams)
503            .context_data(ChainIdentifierCache::default());
504
505        if config.internal_features.feature_gate {
506            builder = builder.extension(FeatureGate);
507        }
508
509        if config.internal_features.logger {
510            builder = builder.extension(Logger::default());
511        }
512
513        if config.internal_features.query_limits_checker {
514            builder = builder.extension(QueryLimitsChecker);
515        }
516
517        if config.internal_features.directive_checker {
518            builder = builder.extension(DirectiveChecker);
519        }
520
521        if config.internal_features.query_timeout {
522            builder = builder.extension(Timeout);
523        }
524
525        if config.internal_features.tracing {
526            builder = builder.extension(Tracing);
527        }
528
529        if config.internal_features.apollo_tracing {
530            builder = builder.extension(ApolloTracing);
531        }
532
533        // TODO: uncomment once impl
534        // if config.internal_features.open_telemetry { }
535
536        Ok(builder)
537    }
538}
539
540fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
541    async_graphql::Schema::build(Query, Mutation, Subscription)
542        .limit_complexity(MAX_COMPLEXITY)
543        .register_output_type::<IMoveObject>()
544        .register_output_type::<IObject>()
545        .register_output_type::<IOwner>()
546        .register_output_type::<IMoveDatatype>()
547}
548
549/// Return the string representation of the schema used by this server.
550pub fn export_schema() -> String {
551    schema_builder().finish().sdl()
552}
553
554/// Entry point for graphql requests.
555///
556/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
557/// request headers, and the watermark as set by the background task.
558async fn graphql_handler(
559    ConnectInfo(addr): ConnectInfo<SocketAddr>,
560    TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
561    schema: Extension<IotaGraphQLSchema>,
562    Extension(watermark_lock): Extension<WatermarkLock>,
563    headers: HeaderMap,
564    req: GraphQLRequest,
565) -> (axum::http::Extensions, GraphQLResponse) {
566    let mut req = req.into_inner();
567    req.data.insert(PayloadSize(content_length));
568    req.data.insert(Uuid::new_v4());
569    if headers.contains_key(ShowUsage::name()) {
570        req.data.insert(ShowUsage)
571    }
572
573    // Capture the IP address of the client
574    // Note: if a load balancer is used it must be configured to forward the client
575    // IP address
576    req.data.insert(addr);
577    req.data.insert(Watermark::new(watermark_lock).await);
578
579    let result = schema.execute(req).await;
580
581    // If there are errors, insert them as an extension so that the Metrics callback
582    // handler can pull it out later.
583    let mut extensions = axum::http::Extensions::new();
584    if result.is_err() {
585        extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
586    };
587    (extensions, result.into())
588}
589
590pub(crate) fn get_write_api<'ctx>(
591    ctx: &'ctx Context<'_>,
592) -> Result<&'ctx OptimisticWriteApi, Error> {
593    ctx.data_opt()
594        .ok_or_else(|| Error::Internal("Unable to get node execution interface".to_string()))
595}
596
597fn build_write_api(
598    fullnode_url: &str,
599    reader: IndexerReader,
600    metrics: IndexerMetrics,
601) -> Result<OptimisticWriteApi, Error> {
602    let json_rpc_client = build_json_rpc_client(fullnode_url)?;
603    let indexer_store = PgIndexerStore::new(reader.get_pool(), metrics.clone());
604    let optimistic_tx_executor =
605        OptimisticTransactionExecutor::new(fullnode_url, reader.clone(), indexer_store, metrics);
606    Ok(OptimisticWriteApi::new(
607        WriteApi::new(json_rpc_client, reader),
608        optimistic_tx_executor,
609    ))
610}
611
612fn build_json_rpc_client(rpc_client_url: &str) -> Result<HttpClient, Error> {
613    let mut headers = HeaderMap::new();
614    headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("graphql"));
615
616    let builder = HttpClientBuilder::default()
617        .max_request_size(2 << 30)
618        .set_headers(headers.clone())
619        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
620        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS);
621
622    builder.build(rpc_client_url).map_err(|e| {
623        warn!("Failed to get new Http client with error: {e:?}");
624        Error::Internal(format!(
625            "Failed to initialize fullnode RPC client with error: {e:?}"
626        ))
627    })
628}
629/// Entry point for graphql streaming requests.
630///
631/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
632/// request headers and tracks the connection information produced by the
633/// client.
634async fn subscription_handler(
635    ConnectInfo(addr): ConnectInfo<SocketAddr>,
636    Extension(schema): Extension<IotaGraphQLSchema>,
637    req: http::Request<Body>,
638) -> AxumResponse {
639    let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
640    let (mut parts, _body) = req.into_parts();
641
642    // extract GraphQL protocol
643    let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
644        Ok(protocol) => protocol,
645        Err(err) => return err.into_response(),
646    };
647
648    // extract WebSocket upgrade from request
649    let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
650        Ok(upgrade) => upgrade,
651        Err(err) => return err.into_response(),
652    };
653
654    let resp = upgrade
655        .protocols(ALL_WEBSOCKET_PROTOCOLS)
656        .on_upgrade(move |stream| async move {
657            // create connection data with per-connection values
658            let mut connection_data = Data::default();
659            // axum discards body on GET requests, being always 0
660            connection_data.insert(PayloadSize(0));
661            connection_data.insert(Uuid::new_v4());
662            if headers_contains_show_usage {
663                connection_data.insert(ShowUsage)
664            }
665            connection_data.insert(addr);
666
667            let connection =
668                GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
669            connection.serve().await;
670        });
671    resp
672}
673
674#[derive(Clone)]
675struct MetricsMakeCallbackHandler {
676    metrics: Metrics,
677}
678
679impl MakeCallbackHandler for MetricsMakeCallbackHandler {
680    type Handler = MetricsCallbackHandler;
681
682    fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
683        let start = Instant::now();
684        let metrics = self.metrics.clone();
685
686        metrics.request_metrics.inflight_requests.inc();
687        metrics.inc_num_queries();
688
689        MetricsCallbackHandler { metrics, start }
690    }
691}
692
693struct MetricsCallbackHandler {
694    metrics: Metrics,
695    start: Instant,
696}
697
698impl ResponseHandler for MetricsCallbackHandler {
699    fn on_response(&mut self, response: &http::response::Parts) {
700        if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
701            self.metrics.inc_errors(&errors.0);
702        }
703    }
704
705    fn on_error<E>(&mut self, _error: &E) {
706        // Do nothing if the whole service errored
707        //
708        // in Axum this isn't possible since all services are required to have
709        // an error type of Infallible
710    }
711}
712
713impl Drop for MetricsCallbackHandler {
714    fn drop(&mut self) {
715        self.metrics.query_latency(self.start.elapsed());
716        self.metrics.request_metrics.inflight_requests.dec();
717    }
718}
719
720#[derive(Debug, Clone)]
721struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
722
723/// Connect via a TCPStream to the DB to check if it is alive
724async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
725    let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
726        return StatusCode::INTERNAL_SERVER_ERROR;
727    };
728
729    let Some(host) = url.host_str() else {
730        return StatusCode::INTERNAL_SERVER_ERROR;
731    };
732
733    let tcp_url = if let Some(port) = url.port() {
734        format!("{host}:{port}")
735    } else {
736        host.to_string()
737    };
738
739    if TcpStream::connect(tcp_url).is_err() {
740        StatusCode::INTERNAL_SERVER_ERROR
741    } else {
742        StatusCode::OK
743    }
744}
745
746#[derive(serde::Deserialize)]
747struct HealthParam {
748    max_checkpoint_lag_ms: Option<u64>,
749}
750
751/// Endpoint for querying the health of the service.
752/// It returns 500 for any internal error, including not connecting to the DB,
753/// and 504 if the checkpoint timestamp is too far behind the current timestamp
754/// as per the max checkpoint timestamp lag query parameter, or the default
755/// value if not provided.
756async fn health_check(
757    State(connection): State<ConnectionConfig>,
758    Extension(watermark_lock): Extension<WatermarkLock>,
759    AxumQuery(query_params): AxumQuery<HealthParam>,
760) -> StatusCode {
761    let db_health_check = db_health_check(axum::extract::State(connection)).await;
762    if db_health_check != StatusCode::OK {
763        return db_health_check;
764    }
765
766    let max_checkpoint_lag_ms = query_params
767        .max_checkpoint_lag_ms
768        .map(Duration::from_millis)
769        .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
770
771    let checkpoint_timestamp =
772        Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);
773
774    let now_millis = Utc::now().timestamp_millis();
775
776    // Check for negative timestamp or conversion failure
777    let now: Duration = match u64::try_from(now_millis) {
778        Ok(val) => Duration::from_millis(val),
779        Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
780    };
781
782    if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
783        return StatusCode::GATEWAY_TIMEOUT;
784    }
785
786    db_health_check
787}
788
789// One server per proc, so this is okay
790async fn get_or_init_server_start_time() -> &'static Instant {
791    static ONCE: OnceCell<Instant> = OnceCell::const_new();
792    ONCE.get_or_init(|| async move { Instant::now() }).await
793}
794
795pub mod tests {
796    use std::{sync::Arc, time::Duration};
797
798    use async_graphql::{
799        Request, Response, Variables,
800        extensions::{Extension, ExtensionContext, NextExecute},
801    };
802    use iota_types::transaction::TransactionData;
803    use serde_json::json;
804    use uuid::Uuid;
805
806    use super::*;
807    use crate::{
808        config::{ConnectionConfig, Limits, ServiceConfig, Version},
809        context_data::db_data_provider::PgManager,
810        extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
811        test_infra::cluster::Cluster,
812    };
813
814    /// Prepares a schema for tests dealing with extensions. Returns a
815    /// `ServerBuilder` that can be further extended with `context_data` and
816    /// `extension` for testing.
817    fn prep_schema(
818        connection_config: Option<ConnectionConfig>,
819        service_config: Option<ServiceConfig>,
820    ) -> ServerBuilder {
821        let connection_config = connection_config.unwrap_or_default();
822        let service_config = service_config.unwrap_or_default();
823
824        let reader = PgManager::reader_with_config(
825            connection_config.db_url.clone(),
826            connection_config.db_pool_size,
827            service_config.limits.request_timeout_ms.into(),
828        )
829        .expect("failed to create pg connection pool");
830
831        let version = Version::for_testing();
832        let metrics = metrics();
833        let db = Db::new(
834            reader.clone(),
835            service_config.limits.clone(),
836            metrics.clone(),
837        );
838        let loader = DataLoader::new(db.clone());
839        let pg_conn_pool = PgManager::new(reader);
840        let cancellation_token = CancellationToken::new();
841        let watermark = Watermark {
842            checkpoint: 1,
843            checkpoint_timestamp_ms: 1,
844            epoch: 0,
845        };
846        let state = AppState::new(
847            connection_config.clone(),
848            service_config.clone(),
849            metrics.clone(),
850            cancellation_token.clone(),
851            version,
852        );
853        ServerBuilder::new(state)
854            .context_data(db)
855            .context_data(loader)
856            .context_data(pg_conn_pool)
857            .context_data(service_config)
858            .context_data(query_id())
859            .context_data(ip_address())
860            .context_data(watermark)
861            .context_data(ChainIdentifierCache::default())
862            .context_data(metrics)
863    }
864
865    fn metrics() -> Metrics {
866        let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap();
867        let registry = iota_metrics::start_prometheus_server(binding_address).default_registry();
868        Metrics::new(&registry)
869    }
870
871    fn ip_address() -> SocketAddr {
872        let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap();
873        binding_address
874    }
875
876    fn query_id() -> Uuid {
877        Uuid::new_v4()
878    }
879
880    pub async fn test_timeout_impl(cluster: &Cluster) {
881        struct TimedExecuteExt {
882            pub min_req_delay: Duration,
883        }
884
885        impl ExtensionFactory for TimedExecuteExt {
886            fn create(&self) -> Arc<dyn Extension> {
887                Arc::new(TimedExecuteExt {
888                    min_req_delay: self.min_req_delay,
889                })
890            }
891        }
892
893        #[async_trait::async_trait]
894        impl Extension for TimedExecuteExt {
895            async fn execute(
896                &self,
897                ctx: &ExtensionContext<'_>,
898                operation_name: Option<&str>,
899                next: NextExecute<'_>,
900            ) -> Response {
901                tokio::time::sleep(self.min_req_delay).await;
902                next.run(ctx, operation_name).await
903            }
904        }
905
906        async fn test_timeout(
907            delay: Duration,
908            timeout: Duration,
909            query: &str,
910            write_api: OptimisticWriteApi,
911        ) -> Response {
912            let mut cfg = ServiceConfig::default();
913            cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
914            cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
915
916            let schema = prep_schema(None, Some(cfg))
917                .context_data(write_api)
918                .extension(Timeout)
919                .extension(TimedExecuteExt {
920                    min_req_delay: delay,
921                })
922                .build_schema();
923
924            schema.execute(query).await
925        }
926
927        let wallet = &cluster.validator_fullnode_handle.wallet;
928        let store = &cluster.indexer_store;
929        let fn_rpc_url = &cluster.validator_fullnode_handle.fullnode_handle.rpc_url;
930        let indexer_metrics = store.get_metrics();
931        let indexer_reader = iota_indexer::read::IndexerReader::new(store.blocking_cp());
932        let json_rpc_client = build_json_rpc_client(fn_rpc_url).unwrap();
933
934        let optimistic_tx_executor =
935            iota_indexer::optimistic_indexing::OptimisticTransactionExecutor::new(
936                fn_rpc_url,
937                indexer_reader.clone(),
938                store.clone(),
939                indexer_metrics,
940            );
941        let write_api = OptimisticWriteApi::new(
942            WriteApi::new(json_rpc_client, indexer_reader),
943            optimistic_tx_executor,
944        );
945
946        let query = "{ chainIdentifier }";
947        let timeout = Duration::from_millis(1000);
948        let delay = Duration::from_millis(100);
949
950        test_timeout(delay, timeout, query, write_api.clone())
951            .await
952            .into_result()
953            .expect("should complete successfully");
954
955        // Should timeout
956        let errs: Vec<_> = test_timeout(delay, delay, query, write_api.clone())
957            .await
958            .into_result()
959            .unwrap_err()
960            .into_iter()
961            .map(|e| e.message)
962            .collect();
963        let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
964        assert_eq!(errs, vec![exp]);
965
966        // Should timeout for mutation
967        // Create a transaction and sign it, and use the tx_bytes + signatures for the
968        // GraphQL executeTransactionBlock mutation call.
969        let addresses = wallet.get_addresses();
970        let gas = wallet
971            .get_one_gas_object_owned_by_address(addresses[0])
972            .await
973            .unwrap();
974        let tx_data = TransactionData::new_transfer_iota(
975            addresses[1],
976            addresses[0],
977            Some(1000),
978            gas.unwrap(),
979            1_000_000,
980            wallet.get_reference_gas_price().await.unwrap(),
981        );
982
983        let tx = wallet.sign_transaction(&tx_data);
984        let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
985
986        let signature_base64 = &signatures[0];
987        let query = format!(
988            r#"
989            mutation {{
990              executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
991                effects {{
992                  status
993                }}
994              }}
995            }}"#,
996            tx_bytes.encoded(),
997            signature_base64.encoded()
998        );
999        let errs: Vec<_> = test_timeout(delay, delay, &query, write_api.clone())
1000            .await
1001            .into_result()
1002            .unwrap_err()
1003            .into_iter()
1004            .map(|e| e.message)
1005            .collect();
1006        let exp = format!(
1007            "Mutation request timed out. Limit: {}s",
1008            delay.as_secs_f32()
1009        );
1010        assert_eq!(errs, vec![exp]);
1011    }
1012
1013    pub async fn test_query_depth_limit_impl() {
1014        async fn exec_query_depth_limit(depth: u32, query: &str) -> Response {
1015            let service_config = ServiceConfig {
1016                limits: Limits {
1017                    max_query_depth: depth,
1018                    ..Default::default()
1019                },
1020                ..Default::default()
1021            };
1022
1023            let schema = prep_schema(None, Some(service_config))
1024                .context_data(PayloadSize(100))
1025                .extension(QueryLimitsChecker)
1026                .build_schema();
1027            schema.execute(query).await
1028        }
1029
1030        exec_query_depth_limit(1, "{ chainIdentifier }")
1031            .await
1032            .into_result()
1033            .expect("should complete successfully");
1034
1035        exec_query_depth_limit(
1036            5,
1037            "{ chainIdentifier protocolConfig { configs { value key }} }",
1038        )
1039        .await
1040        .into_result()
1041        .expect("should complete successfully");
1042
1043        // Should fail
1044        let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }")
1045            .await
1046            .into_result()
1047            .unwrap_err()
1048            .into_iter()
1049            .map(|e| e.message)
1050            .collect();
1051
1052        assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
1053        let errs: Vec<_> = exec_query_depth_limit(
1054            2,
1055            "{ chainIdentifier protocolConfig { configs { value key }} }",
1056        )
1057        .await
1058        .into_result()
1059        .unwrap_err()
1060        .into_iter()
1061        .map(|e| e.message)
1062        .collect();
1063        assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
1064    }
1065
1066    pub async fn test_query_node_limit_impl() {
1067        async fn exec_query_node_limit(nodes: u32, query: &str) -> Response {
1068            let service_config = ServiceConfig {
1069                limits: Limits {
1070                    max_query_nodes: nodes,
1071                    ..Default::default()
1072                },
1073                ..Default::default()
1074            };
1075
1076            let schema = prep_schema(None, Some(service_config))
1077                .context_data(PayloadSize(100))
1078                .extension(QueryLimitsChecker)
1079                .build_schema();
1080            schema.execute(query).await
1081        }
1082
1083        exec_query_node_limit(1, "{ chainIdentifier }")
1084            .await
1085            .into_result()
1086            .expect("should complete successfully");
1087
1088        exec_query_node_limit(
1089            5,
1090            "{ chainIdentifier protocolConfig { configs { value key }} }",
1091        )
1092        .await
1093        .into_result()
1094        .expect("should complete successfully");
1095
1096        // Should fail
1097        let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }")
1098            .await
1099            .into_result()
1100            .unwrap_err()
1101            .into_iter()
1102            .map(|e| e.message)
1103            .collect();
1104        assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1105
1106        let err: Vec<_> = exec_query_node_limit(
1107            4,
1108            "{ chainIdentifier protocolConfig { configs { value key }} }",
1109        )
1110        .await
1111        .into_result()
1112        .unwrap_err()
1113        .into_iter()
1114        .map(|e| e.message)
1115        .collect();
1116        assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1117    }
1118
1119    pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) {
1120        let service_config = ServiceConfig {
1121            limits: Limits {
1122                default_page_size: 1,
1123                ..Default::default()
1124            },
1125            ..Default::default()
1126        };
1127        let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema();
1128
1129        let resp = schema
1130            .execute("{ checkpoints { nodes { sequenceNumber } } }")
1131            .await;
1132        let data = resp.data.clone().into_json().unwrap();
1133        let checkpoints = data
1134            .get("checkpoints")
1135            .unwrap()
1136            .get("nodes")
1137            .unwrap()
1138            .as_array()
1139            .unwrap();
1140        assert_eq!(
1141            checkpoints.len(),
1142            1,
1143            "Checkpoints should have exactly one element"
1144        );
1145
1146        let resp = schema
1147            .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1148            .await;
1149        let data = resp.data.clone().into_json().unwrap();
1150        let checkpoints = data
1151            .get("checkpoints")
1152            .unwrap()
1153            .get("nodes")
1154            .unwrap()
1155            .as_array()
1156            .unwrap();
1157        assert_eq!(
1158            checkpoints.len(),
1159            2,
1160            "Checkpoints should return two elements"
1161        );
1162    }
1163
1164    pub async fn test_query_max_page_limit_impl() {
1165        let schema = prep_schema(None, None).build_schema();
1166
1167        schema
1168            .execute("{ objects(first: 1) { nodes { version } } }")
1169            .await
1170            .into_result()
1171            .expect("should complete successfully");
1172
1173        // Should fail
1174        let err: Vec<_> = schema
1175            .execute("{ objects(first: 51) { nodes { version } } }")
1176            .await
1177            .into_result()
1178            .unwrap_err()
1179            .into_iter()
1180            .map(|e| e.message)
1181            .collect();
1182        assert_eq!(
1183            err,
1184            vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1185        );
1186    }
1187
1188    pub async fn test_query_complexity_metrics_impl() {
1189        let server_builder = prep_schema(None, None).context_data(PayloadSize(100));
1190        let metrics = server_builder.state.metrics.clone();
1191        let schema = server_builder
1192            .extension(QueryLimitsChecker) // QueryLimitsChecker is where we actually set the metrics
1193            .build_schema();
1194
1195        schema
1196            .execute("{ chainIdentifier }")
1197            .await
1198            .into_result()
1199            .expect("should complete successfully");
1200
1201        let req_metrics = metrics.request_metrics;
1202        assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1203        assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1204        assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1205        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1206        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1207        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1208
1209        schema
1210            .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1211            .await
1212            .into_result()
1213            .expect("should complete successfully");
1214
1215        assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1216        assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1217        assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1218        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1219        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1220        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1221    }
1222
1223    pub async fn test_health_check_impl() {
1224        let server_builder = prep_schema(None, None);
1225        let url = format!(
1226            "http://{}:{}/health",
1227            server_builder.state.connection.host, server_builder.state.connection.port
1228        );
1229        server_builder.build_schema();
1230
1231        let resp = reqwest::get(&url).await.unwrap();
1232        assert_eq!(resp.status(), reqwest::StatusCode::OK);
1233
1234        let url_with_param = format!("{url}?max_checkpoint_lag_ms=1");
1235        let resp = reqwest::get(&url_with_param).await.unwrap();
1236        assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1237    }
1238
1239    /// Execute a GraphQL request with `limits` in place, expecting an error to
1240    /// be returned. Returns a text representation of all the errors triggered.
1241    async fn execute_for_error(limits: Limits, request: Request) -> String {
1242        let service_config = ServiceConfig {
1243            limits,
1244            ..Default::default()
1245        };
1246
1247        let schema = prep_schema(None, Some(service_config))
1248            .context_data(PayloadSize(
1249                // Payload size is usually set per request, and it is the size of the raw HTTP
1250                // request, which includes the query, variables, and surrounding JSON. Simulate for
1251                // testing purposes by serializing the request back into JSON and baking its length
1252                // as context data into the schema.
1253                serde_json::to_string(&request).unwrap().len() as u64,
1254            ))
1255            .extension(QueryLimitsChecker)
1256            .build_schema();
1257
1258        let errs: Vec<_> = schema
1259            .execute(request)
1260            .await
1261            .into_result()
1262            .unwrap_err()
1263            .into_iter()
1264            .map(|e| e.message)
1265            .collect();
1266
1267        errs.join("\n")
1268    }
1269
1270    pub async fn test_payload_read_exceeded_impl() {
1271        assert_eq!(
1272            execute_for_error(
1273                Limits {
1274                    max_tx_payload_size: 400,
1275                    max_query_payload_size: 10,
1276                    ..Default::default()
1277                },
1278                r#"
1279                    mutation {
1280                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1281                            effects {
1282                                status
1283                            }
1284                        }
1285                    }
1286                "#
1287                .into()
1288            )
1289            .await,
1290            "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1291             transaction payloads (all inputs to executeTransactionBlock or \
1292             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 \
1293             bytes or fewer."
1294        );
1295    }
1296
1297    pub async fn test_payload_mutation_exceeded_impl() {
1298        assert_eq!(
1299            execute_for_error(
1300                Limits {
1301                    max_tx_payload_size: 10,
1302                    max_query_payload_size: 400,
1303                    ..Default::default()
1304                },
1305                r#"
1306                    mutation {
1307                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1308                            effects {
1309                                status
1310                            }
1311                        }
1312                    }
1313                "#
1314                .into()
1315            )
1316            .await,
1317            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1318             transaction payloads (all inputs to executeTransactionBlock or \
1319             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1320             bytes or fewer."
1321        );
1322    }
1323
1324    pub async fn test_payload_dry_run_exceeded_impl() {
1325        assert_eq!(
1326            execute_for_error(
1327                Limits {
1328                    max_tx_payload_size: 10,
1329                    max_query_payload_size: 400,
1330                    ..Default::default()
1331                },
1332                r#"
1333                    query {
1334                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1335                            error
1336                            transaction {
1337                                digest
1338                            }
1339                        }
1340                    }
1341                "#
1342                .into(),
1343            )
1344            .await,
1345            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1346             transaction payloads (all inputs to executeTransactionBlock or \
1347             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 bytes \
1348             or fewer."
1349        );
1350    }
1351
1352    pub async fn test_payload_total_exceeded_impl() {
1353        assert_eq!(
1354            execute_for_error(
1355                Limits {
1356                    max_tx_payload_size: 10,
1357                    max_query_payload_size: 10,
1358                    ..Default::default()
1359                },
1360                r#"
1361                    query {
1362                        dryRunTransactionBlock(txByte: "AAABBB") {
1363                            error
1364                            transaction {
1365                                digest
1366                            }
1367                        }
1368                    }
1369                "#
1370                .into(),
1371            )
1372            .await,
1373            "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1374             transaction payloads (all inputs to executeTransactionBlock or dryRunTransactionBlock) \
1375             and the rest of the request (the query part) must be 10 bytes or fewer."
1376        );
1377    }
1378
1379    pub async fn test_payload_using_vars_mutation_exceeded_impl() {
1380        assert_eq!(
1381            execute_for_error(
1382                Limits {
1383                    max_tx_payload_size: 10,
1384                    max_query_payload_size: 500,
1385                    ..Default::default()
1386                },
1387                Request::new(
1388                    r#"
1389                    mutation ($tx: String!, $sigs: [String!]!) {
1390                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1391                            effects {
1392                                status
1393                            }
1394                        }
1395                    }
1396                    "#
1397                )
1398                .variables(Variables::from_json(json!({
1399                    "tx": "AAABBBCCC",
1400                    "sigs": ["BBB"]
1401                })))
1402            )
1403            .await,
1404            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1405             transaction payloads (all inputs to executeTransactionBlock or \
1406             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1407             bytes or fewer."
1408        );
1409    }
1410
1411    pub async fn test_payload_using_vars_read_exceeded_impl() {
1412        assert_eq!(
1413            execute_for_error(
1414                Limits {
1415                    max_tx_payload_size: 500,
1416                    max_query_payload_size: 10,
1417                    ..Default::default()
1418                },
1419                Request::new(
1420                    r#"
1421                    mutation ($tx: String!, $sigs: [String!]!) {
1422                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1423                            effects {
1424                                status
1425                            }
1426                        }
1427                    }
1428                    "#
1429                )
1430                .variables(Variables::from_json(json!({
1431                    "tx": "AAA",
1432                    "sigs": ["BBB"]
1433                })))
1434            )
1435            .await,
1436            "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1437             transaction payloads (all inputs to executeTransactionBlock or \
1438             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1439             or fewer."
1440        );
1441    }
1442
1443    pub async fn test_payload_using_vars_dry_run_exceeded_impl() {
1444        assert_eq!(
1445            execute_for_error(
1446                Limits {
1447                    max_tx_payload_size: 10,
1448                    max_query_payload_size: 400,
1449                    ..Default::default()
1450                },
1451                Request::new(
1452                    r#"
1453                    query ($tx: String!) {
1454                        dryRunTransactionBlock(txBytes: $tx) {
1455                            error
1456                            transaction {
1457                                digest
1458                            }
1459                        }
1460                    }
1461                    "#
1462                )
1463                .variables(Variables::from_json(json!({
1464                    "tx": "AAABBBCCC"
1465                }))),
1466            )
1467            .await,
1468            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1469             transaction payloads (all inputs to executeTransactionBlock or \
1470             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1471             bytes or fewer."
1472        );
1473    }
1474
1475    pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() {
1476        assert_eq!(
1477            execute_for_error(
1478                Limits {
1479                    max_tx_payload_size: 400,
1480                    max_query_payload_size: 10,
1481                    ..Default::default()
1482                },
1483                Request::new(
1484                    r#"
1485                    query ($tx: String!) {
1486                        dryRunTransactionBlock(txBytes: $tx) {
1487                            error
1488                            transaction {
1489                                digest
1490                            }
1491                        }
1492                    }
1493                    "#
1494                )
1495                .variables(Variables::from_json(json!({
1496                    "tx": "AAABBBCCC"
1497                }))),
1498            )
1499            .await,
1500            "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1501             transaction payloads (all inputs to executeTransactionBlock or \
1502             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1503             or fewer."
1504        );
1505    }
1506
1507    pub async fn test_payload_multiple_execution_exceeded_impl() {
1508        // First check that the limit is large enough to hold one transaction's
1509        // parameters (by checking that we hit the read limit).
1510        let err = execute_for_error(
1511            Limits {
1512                max_tx_payload_size: 30,
1513                max_query_payload_size: 320,
1514                ..Default::default()
1515            },
1516            r#"
1517                mutation {
1518                    executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1519                        effects {
1520                            status
1521                        }
1522                    }
1523                }
1524            "#
1525            .into(),
1526        )
1527        .await;
1528        assert!(err.starts_with("Query part too large"), "{err}");
1529
1530        assert_eq!(
1531            execute_for_error(
1532                Limits {
1533                    max_tx_payload_size: 30,
1534                    max_query_payload_size: 800,
1535                    ..Default::default()
1536                },
1537                r#"
1538                    mutation {
1539                        e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1540                            effects {
1541                                status
1542                            }
1543                        }
1544                        e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1545                            effects {
1546                                status
1547                            }
1548                        }
1549                    }
1550                "#
1551                .into()
1552            )
1553            .await,
1554            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1555             transaction payloads (all inputs to executeTransactionBlock or \
1556             dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1557             bytes or fewer."
1558        );
1559    }
1560
1561    pub async fn test_payload_multiple_dry_run_exceeded_impl() {
1562        // First check that tx limit is large enough to hold one transaction's
1563        // parameters (by checking that we hit the read limit).
1564        let err = execute_for_error(
1565            Limits {
1566                max_tx_payload_size: 20,
1567                max_query_payload_size: 330,
1568                ..Default::default()
1569            },
1570            r#"
1571                query {
1572                    dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1573                       error
1574                       transaction {
1575                           digest
1576                       }
1577                    }
1578                }
1579            "#
1580            .into(),
1581        )
1582        .await;
1583        assert!(err.starts_with("Query part too large"), "{err}");
1584
1585        assert_eq!(
1586            execute_for_error(
1587                Limits {
1588                    max_tx_payload_size: 20,
1589                    max_query_payload_size: 800,
1590                    ..Default::default()
1591                },
1592                r#"
1593                    query {
1594                        d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1595                           error
1596                           transaction {
1597                               digest
1598                           }
1599                        }
1600                        d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1601                           error
1602                           transaction {
1603                               digest
1604                           }
1605                        }
1606                    }
1607                "#
1608                .into()
1609            )
1610            .await,
1611            "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1612             transaction payloads (all inputs to executeTransactionBlock or \
1613             dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1614             bytes or fewer."
1615        );
1616    }
1617
1618    pub async fn test_payload_execution_multiple_sigs_exceeded_impl() {
1619        // First check that the limit is large enough to hold a transaction with a
1620        // single signature (by checking that we hite the read limit).
1621        let err = execute_for_error(
1622            Limits {
1623                max_tx_payload_size: 30,
1624                max_query_payload_size: 320,
1625                ..Default::default()
1626            },
1627            r#"
1628                mutation {
1629                    executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1630                        effects {
1631                            status
1632                        }
1633                    }
1634                }
1635            "#
1636            .into(),
1637        )
1638        .await;
1639
1640        assert!(err.starts_with("Query part too large"), "{err}");
1641
1642        assert_eq!(
1643            execute_for_error(
1644                Limits {
1645                    max_tx_payload_size: 30,
1646                    max_query_payload_size: 500,
1647                    ..Default::default()
1648                },
1649                r#"
1650                    mutation {
1651                        executeTransactionBlock(
1652                            txBytes: "AAA",
1653                            signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1654                        ) {
1655                            effects {
1656                                status
1657                            }
1658                        }
1659                    }
1660                "#
1661                .into(),
1662            )
1663            .await,
1664            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1665             transaction payloads (all inputs to executeTransactionBlock or \
1666             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1667             bytes or fewer.",
1668        )
1669    }
1670
1671    pub async fn test_payload_sig_var_execution_exceeded_impl() {
1672        // Variables can show up in the sub-structure of a GraphQL value as well, and we
1673        // need to count those as well.
1674        assert_eq!(
1675            execute_for_error(
1676                Limits {
1677                    max_tx_payload_size: 10,
1678                    max_query_payload_size: 500,
1679                    ..Default::default()
1680                },
1681                Request::new(
1682                    r#"
1683                    mutation ($tx: String!, $sig: String!) {
1684                        executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1685                            effects {
1686                                status
1687                            }
1688                        }
1689                    }
1690                    "#
1691                )
1692                .variables(Variables::from_json(json!({
1693                    "tx": "AAA",
1694                    "sig": "BBB"
1695                })))
1696            )
1697            .await,
1698            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1699             transaction payloads (all inputs to executeTransactionBlock or \
1700             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1701             bytes or fewer."
1702        );
1703    }
1704
1705    /// Check if the error indicates that the request passed the overall size
1706    /// check and the transaction payload check.
1707    fn passed_tx_checks(err: &str) -> bool {
1708        !err.starts_with("Overall request too large")
1709            && !err.starts_with("Transaction payload too large")
1710    }
1711
1712    pub async fn test_payload_reusing_vars_execution_impl() {
1713        // Test that when variables are re-used as execution params, the size of the
1714        // variable is only counted once.
1715
1716        // First, check that `error_passed_tx_checks` is working, by submitting a
1717        // request that will fail the initial payload check.
1718        assert!(!passed_tx_checks(
1719            &execute_for_error(
1720                Limits {
1721                    max_tx_payload_size: 1,
1722                    max_query_payload_size: 1,
1723                    ..Default::default()
1724                },
1725                r#"
1726                    mutation {
1727                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1728                            effects {
1729                                status
1730                            }
1731                        }
1732                    }
1733                "#
1734                .into()
1735            )
1736            .await
1737        ));
1738
1739        let limits = Limits {
1740            max_tx_payload_size: 20,
1741            max_query_payload_size: 1000,
1742            ..Default::default()
1743        };
1744
1745        // Then check that a request that uses the variable once passes the transaction
1746        // limit check.
1747        assert!(passed_tx_checks(
1748            &execute_for_error(
1749                limits.clone(),
1750                Request::new(
1751                    r#"
1752                    mutation ($sig: String!) {
1753                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1754                            effects {
1755                                status
1756                            }
1757                        }
1758                    }
1759                    "#,
1760                )
1761                .variables(Variables::from_json(json!({
1762                    "sig": "BBB"
1763                })))
1764            )
1765            .await
1766        ));
1767
1768        // Then check that a request that introduces an extra signature, but without
1769        // re-using the variable fails the transaction limit.
1770        let execution_result = execute_for_error(
1771            limits.clone(),
1772            Request::new(
1773                r#"
1774                    mutation ($sig: String!) {
1775                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1776                            effects {
1777                                status
1778                            }
1779                        }
1780                    }
1781                    "#,
1782            )
1783            .variables(Variables::from_json(json!({
1784                "sig": "BBB"
1785            }))),
1786        )
1787        .await;
1788        assert!(!passed_tx_checks(&execution_result), "{execution_result}");
1789
1790        // And then when that use is replaced by re-using the variable, we should be
1791        // under the transaction payload limit again.
1792        let execution_result = execute_for_error(
1793            limits,
1794            Request::new(
1795                r#"
1796                    mutation ($sig: String!) {
1797                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1798                            effects {
1799                                status
1800                            }
1801                        }
1802                    }
1803                    "#,
1804            )
1805            .variables(Variables::from_json(json!({
1806                "sig": "BBB"
1807            }))),
1808        )
1809        .await;
1810        assert!(passed_tx_checks(&execution_result), "{execution_result}");
1811    }
1812
1813    pub async fn test_payload_reusing_vars_dry_run_impl() {
1814        // Like `test_payload_reusing_vars_execution` but the variable is used in a
1815        // dry-run.
1816
1817        let limits = Limits {
1818            max_tx_payload_size: 20,
1819            max_query_payload_size: 1000,
1820            ..Default::default()
1821        };
1822
1823        // A single dry-run is under the limit.
1824        assert!(passed_tx_checks(
1825            &execute_for_error(
1826                limits.clone(),
1827                Request::new(
1828                    r#"
1829                    query ($tx: String!) {
1830                        dryRunTransactionBlock(txBytes: $tx) {
1831                            error
1832                            transaction {
1833                                digest
1834                            }
1835                        }
1836                    }
1837                    "#,
1838                )
1839                .variables(Variables::from_json(json!({
1840                    "tx": "AAABBBCCC"
1841                })))
1842            )
1843            .await
1844        ));
1845
1846        // Duplicating the dry-run causes us to hit the limit.
1847        assert!(!passed_tx_checks(
1848            &execute_for_error(
1849                limits.clone(),
1850                Request::new(
1851                    r#"
1852                    query ($tx: String!) {
1853                        d0: dryRunTransactionBlock(txBytes: $tx) {
1854                            error
1855                            transaction {
1856                                digest
1857                            }
1858                        }
1859
1860                        d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1861                            error
1862                            transaction {
1863                                digest
1864                            }
1865                        }
1866                    }
1867                    "#,
1868                )
1869                .variables(Variables::from_json(json!({
1870                    "tx": "AAABBBCCC"
1871                })))
1872            )
1873            .await
1874        ));
1875
1876        // And by re-using the variable, we are under the transaction limit again.
1877        assert!(passed_tx_checks(
1878            &execute_for_error(
1879                limits,
1880                Request::new(
1881                    r#"
1882                    query ($tx: String!) {
1883                        d0: dryRunTransactionBlock(txBytes: $tx) {
1884                            error
1885                            transaction {
1886                                digest
1887                            }
1888                        }
1889
1890                        d1: dryRunTransactionBlock(txBytes: $tx) {
1891                            error
1892                            transaction {
1893                                digest
1894                            }
1895                        }
1896                    }
1897                    "#,
1898                )
1899                .variables(Variables::from_json(json!({
1900                    "tx": "AAABBBCCC"
1901                })))
1902            )
1903            .await
1904        ));
1905    }
1906
1907    pub async fn test_payload_named_fragment_execution_exceeded_impl() {
1908        assert_eq!(
1909            execute_for_error(
1910                Limits {
1911                    max_tx_payload_size: 10,
1912                    max_query_payload_size: 500,
1913                    ..Default::default()
1914                },
1915                r#"
1916                    mutation {
1917                        ...Tx
1918                    }
1919
1920                    fragment Tx on Mutation {
1921                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1922                            effects {
1923                                status
1924                            }
1925                        }
1926                    }
1927                "#
1928                .into()
1929            )
1930            .await,
1931            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1932             transaction payloads (all inputs to executeTransactionBlock or \
1933             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1934             bytes or fewer."
1935        );
1936    }
1937
1938    pub async fn test_payload_inline_fragment_execution_exceeded_impl() {
1939        assert_eq!(
1940            execute_for_error(
1941                Limits {
1942                    max_tx_payload_size: 10,
1943                    max_query_payload_size: 500,
1944                    ..Default::default()
1945                },
1946                r#"
1947                    mutation {
1948                        ... on Mutation {
1949                            executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1950                                effects {
1951                                    status
1952                                }
1953                            }
1954                        }
1955                    }
1956                "#
1957                .into()
1958            )
1959            .await,
1960            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1961             transaction payloads (all inputs to executeTransactionBlock or \
1962             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1963             bytes or fewer."
1964        );
1965    }
1966
1967    pub async fn test_payload_named_fragment_dry_run_exceeded_impl() {
1968        assert_eq!(
1969            execute_for_error(
1970                Limits {
1971                    max_tx_payload_size: 10,
1972                    max_query_payload_size: 500,
1973                    ..Default::default()
1974                },
1975                r#"
1976                    query {
1977                        ...DryRun
1978                    }
1979
1980                    fragment DryRun on Query {
1981                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1982                            error
1983                            transaction {
1984                                digest
1985                            }
1986                        }
1987                    }
1988                "#
1989                .into(),
1990            )
1991            .await,
1992            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1993             transaction payloads (all inputs to executeTransactionBlock or \
1994             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1995             bytes or fewer."
1996        );
1997    }
1998
1999    pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() {
2000        assert_eq!(
2001            execute_for_error(
2002                Limits {
2003                    max_tx_payload_size: 10,
2004                    max_query_payload_size: 500,
2005                    ..Default::default()
2006                },
2007                r#"
2008                    query {
2009                        ... on Query {
2010                            dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2011                                error
2012                                transaction {
2013                                    digest
2014                                }
2015                            }
2016                        }
2017                    }
2018                "#
2019                .into(),
2020            )
2021            .await,
2022            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2023             transaction payloads (all inputs to executeTransactionBlock or \
2024             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
2025             bytes or fewer."
2026        );
2027    }
2028}