iota_graphql_rpc/server/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use std::{
5    any::Any,
6    convert::Infallible,
7    net::{SocketAddr, TcpStream},
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_graphql::{
13    Context, Data, Schema, SchemaBuilder,
14    extensions::{ApolloTracing, ExtensionFactory, Tracing},
15    http::ALL_WEBSOCKET_PROTOCOLS,
16};
17use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
18use axum::{
19    Extension, Router,
20    body::Body,
21    extract::{
22        ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
23    },
24    http::{HeaderMap, StatusCode},
25    middleware::{self},
26    response::{IntoResponse, Response as AxumResponse},
27    routing::{MethodRouter, Route, get, post},
28};
29use axum_extra::{TypedHeader, headers::ContentLength};
30use chrono::Utc;
31use http::{HeaderValue, Method, Request};
32use iota_graphql_rpc_headers::LIMITS_HEADER;
33use iota_indexer::{
34    apis::{OptimisticWriteApi, WriteApi},
35    db::{get_pool_connection, setup_postgres::check_db_migration_consistency},
36    metrics::IndexerMetrics,
37    optimistic_indexing::OptimisticTransactionExecutor,
38    read::IndexerReader,
39    store::PgIndexerStore,
40};
41use iota_json_rpc_api::CLIENT_SDK_TYPE_HEADER;
42use iota_metrics::spawn_monitored_task;
43use iota_network_stack::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
44use iota_package_resolver::{PackageStoreWithLruCache, Resolver};
45use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
46use tokio::{join, net::TcpListener, sync::OnceCell};
47use tokio_util::sync::CancellationToken;
48use tower::{Layer, Service};
49use tower_http::cors::{AllowOrigin, CorsLayer};
50use tracing::{info, warn};
51use uuid::Uuid;
52
53use crate::{
54    config::{
55        ConnectionConfig, MAX_CONCURRENT_REQUESTS, RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD,
56        ServerConfig, ServiceConfig, Version,
57    },
58    context_data::db_data_provider::PgManager,
59    data::{
60        DataLoader, Db,
61        package_resolver::{DbPackageStore, PackageResolver},
62    },
63    error::Error,
64    extensions::{
65        directive_checker::DirectiveChecker,
66        feature_gate::FeatureGate,
67        logger::Logger,
68        query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
69        timeout::Timeout,
70    },
71    metrics::Metrics,
72    mutation::Mutation,
73    server::{
74        exchange_rates_task::TriggerExchangeRatesTask,
75        system_package_task::SystemPackageTask,
76        version::{check_version_middleware, set_version_middleware},
77        watermark_task::{Watermark, WatermarkLock, WatermarkTask},
78    },
79    types::{
80        chain_identifier::ChainIdentifierCache,
81        datatype::IMoveDatatype,
82        move_object::IMoveObject,
83        object::IObject,
84        owner::IOwner,
85        query::{IotaGraphQLSchema, Query},
86        subscription::{GraphQLStream, Subscription},
87    },
88};
89
90/// The default allowed maximum lag between the current timestamp and the
91/// checkpoint timestamp.
92const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
93
94pub(crate) struct Server {
95    router: Router,
96    address: SocketAddr,
97    watermark_task: WatermarkTask,
98    system_package_task: SystemPackageTask,
99    trigger_exchange_rates_task: TriggerExchangeRatesTask,
100    state: AppState,
101}
102
103impl Server {
104    /// Start the GraphQL service and any background tasks it is dependent on.
105    /// When a cancellation signal is received, the method waits for all
106    /// tasks to complete before returning.
107    pub async fn run(mut self) -> Result<(), Error> {
108        get_or_init_server_start_time().await;
109
110        // A handle that spawns a background task to periodically update the
111        // `Watermark`, which consists of the checkpoint upper bound and current
112        // epoch.
113        let watermark_task = {
114            info!("Starting watermark update task");
115            spawn_monitored_task!(async move {
116                self.watermark_task.run().await;
117            })
118        };
119
120        // A handle that spawns a background task to evict system packages on epoch
121        // changes.
122        let system_package_task = {
123            info!("Starting system package task");
124            spawn_monitored_task!(async move {
125                self.system_package_task.run().await;
126            })
127        };
128
129        let trigger_exchange_rates_task = {
130            info!("Starting trigger exchange rates task");
131            spawn_monitored_task!(async move {
132                self.trigger_exchange_rates_task.run().await;
133            })
134        };
135
136        let server_task = {
137            info!("Starting graphql service");
138            let cancellation_token = self.state.cancellation_token.clone();
139            let address = self.address;
140            let router = self.router;
141            spawn_monitored_task!(async move {
142                axum::serve(
143                    TcpListener::bind(address)
144                        .await
145                        .map_err(|e| Error::Internal(format!("listener bind failed: {e}")))?,
146                    router.into_make_service_with_connect_info::<SocketAddr>(),
147                )
148                .with_graceful_shutdown(async move {
149                    cancellation_token.cancelled().await;
150                    info!("Shutdown signal received, terminating graphql service");
151                })
152                .await
153                .map_err(|e| Error::Internal(format!("Server run failed: {e}")))
154            })
155        };
156
157        // Wait for all tasks to complete. This ensures that the service doesn't fully
158        // shut down until all tasks and the server have completed their
159        // shutdown processes.
160        let _ = join!(
161            watermark_task,
162            system_package_task,
163            trigger_exchange_rates_task,
164            server_task
165        );
166
167        Ok(())
168    }
169}
170
171pub(crate) struct ServerBuilder {
172    state: AppState,
173    schema: SchemaBuilder<Query, Mutation, Subscription>,
174    router: Option<Router>,
175    db_reader: Option<Db>,
176    resolver: Option<PackageResolver>,
177}
178
179#[derive(Clone)]
180pub(crate) struct AppState {
181    connection: ConnectionConfig,
182    service: ServiceConfig,
183    metrics: Metrics,
184    cancellation_token: CancellationToken,
185    pub version: Version,
186}
187
188impl AppState {
189    pub(crate) fn new(
190        connection: ConnectionConfig,
191        service: ServiceConfig,
192        metrics: Metrics,
193        cancellation_token: CancellationToken,
194        version: Version,
195    ) -> Self {
196        Self {
197            connection,
198            service,
199            metrics,
200            cancellation_token,
201            version,
202        }
203    }
204}
205
206impl FromRef<AppState> for ConnectionConfig {
207    fn from_ref(app_state: &AppState) -> ConnectionConfig {
208        app_state.connection.clone()
209    }
210}
211
212impl FromRef<AppState> for Metrics {
213    fn from_ref(app_state: &AppState) -> Metrics {
214        app_state.metrics.clone()
215    }
216}
217
218impl ServerBuilder {
219    pub fn new(state: AppState) -> Self {
220        Self {
221            state,
222            schema: schema_builder(),
223            router: None,
224            db_reader: None,
225            resolver: None,
226        }
227    }
228
229    pub fn address(&self) -> String {
230        format!(
231            "{}:{}",
232            self.state.connection.host, self.state.connection.port
233        )
234    }
235
236    pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
237        self.schema = self.schema.data(context_data);
238        self
239    }
240
241    pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
242        self.schema = self.schema.extension(extension);
243        self
244    }
245
246    fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
247        self.schema.finish()
248    }
249
250    /// Prepares the components of the server to be run. Finalizes the graphql
251    /// schema, and expects the `Db` and `Router` to have been initialized.
252    fn build_components(
253        self,
254    ) -> (
255        String,
256        Schema<Query, Mutation, Subscription>,
257        Db,
258        PackageResolver,
259        Router,
260    ) {
261        let address = self.address();
262        let ServerBuilder {
263            state: _,
264            schema,
265            db_reader,
266            resolver,
267            router,
268        } = self;
269        (
270            address,
271            schema.finish(),
272            db_reader.expect("db reader not initialized"),
273            resolver.expect("package resolver not initialized"),
274            router.expect("router not initialized"),
275        )
276    }
277
278    fn init_router(&mut self) {
279        if self.router.is_none() {
280            let router: Router = Router::new()
281                .route("/", post(graphql_handler))
282                .route("/subscriptions", get(subscription_handler))
283                .route("/{version}", post(graphql_handler))
284                .route("/{version}/subscriptions", get(subscription_handler))
285                .route("/graphql", post(graphql_handler))
286                .route("/graphql/subscriptions", get(subscription_handler))
287                .route("/graphql/{version}", post(graphql_handler))
288                .route(
289                    "/graphql/{version}/subscriptions",
290                    get(subscription_handler),
291                )
292                .route("/health", get(health_check))
293                .route("/graphql/health", get(health_check))
294                .route("/graphql/{version}/health", get(health_check))
295                .with_state(self.state.clone())
296                .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
297                    metrics: self.state.metrics.clone(),
298                }));
299            self.router = Some(router);
300        }
301    }
302
303    pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
304        self.init_router();
305        self.router = self.router.map(|router| router.route(path, method_handler));
306        self
307    }
308
309    pub fn layer<L>(mut self, layer: L) -> Self
310    where
311        L: Layer<Route> + Clone + Send + Sync + 'static,
312        L::Service: Service<Request<Body>> + Clone + Send + Sync + 'static,
313        <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
314        <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
315        <L::Service as Service<Request<Body>>>::Future: Send + 'static,
316    {
317        self.init_router();
318        self.router = self.router.map(|router| router.layer(layer));
319        self
320    }
321
322    fn cors() -> Result<CorsLayer, Error> {
323        let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
324            Ok(value) => {
325                let allow_hosts = value
326                    .split(',')
327                    .map(HeaderValue::from_str)
328                    .collect::<Result<Vec<_>, _>>()
329                    .map_err(|_| {
330                        Error::Internal(
331                            "Cannot resolve access control origin env variable".to_string(),
332                        )
333                    })?;
334                AllowOrigin::list(allow_hosts)
335            }
336            _ => AllowOrigin::any(),
337        };
338        info!("Access control allow origin set to: {acl:?}");
339
340        let cors = CorsLayer::new()
341            // Allow `POST` & `GET` when accessing the resource
342            .allow_methods([Method::POST, Method::GET])
343            // Allow requests from any origin
344            .allow_origin(acl)
345            .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
346        Ok(cors)
347    }
348
349    /// Consumes the `ServerBuilder` to create a `Server` that can be run.
350    pub fn build(self) -> Result<Server, Error> {
351        let state = self.state.clone();
352        let (address, schema, db_reader, resolver, router) = self.build_components();
353
354        // Initialize the watermark background task struct.
355        let watermark_task = WatermarkTask::new(
356            db_reader.clone(),
357            state.metrics.clone(),
358            std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
359            state.cancellation_token.clone(),
360        );
361
362        let system_package_task = SystemPackageTask::new(
363            resolver,
364            watermark_task.epoch_receiver(),
365            state.cancellation_token.clone(),
366        );
367
368        let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
369            db_reader,
370            watermark_task.epoch_receiver(),
371            state.cancellation_token.clone(),
372        );
373
374        let router = router
375            .route_layer(middleware::from_fn_with_state(
376                state.version,
377                set_version_middleware,
378            ))
379            .route_layer(middleware::from_fn_with_state(
380                state.version,
381                check_version_middleware,
382            ))
383            .layer(axum::extract::Extension(schema))
384            .layer(axum::extract::Extension(watermark_task.lock()))
385            .layer(Self::cors()?);
386
387        Ok(Server {
388            router,
389            address: address
390                .parse()
391                .map_err(|_| Error::Internal(format!("Failed to parse address {address}")))?,
392            watermark_task,
393            system_package_task,
394            trigger_exchange_rates_task,
395            state,
396        })
397    }
398
399    /// Instantiate a `ServerBuilder` from a `ServerConfig`, typically called
400    /// when building the graphql service for production usage.
401    pub async fn from_config(
402        config: &ServerConfig,
403        version: &Version,
404        cancellation_token: CancellationToken,
405    ) -> Result<Self, Error> {
406        // PROMETHEUS
407        let prom_addr: SocketAddr = format!(
408            "{}:{}",
409            config.connection.prom_host, config.connection.prom_port
410        )
411        .parse()
412        .map_err(|_| {
413            Error::Internal(format!(
414                "Failed to parse url {}, port {} into socket address",
415                config.connection.prom_host, config.connection.prom_port
416            ))
417        })?;
418
419        let registry_service = iota_metrics::start_prometheus_server(prom_addr);
420        info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
421        let registry = registry_service.default_registry();
422        registry
423            .register(iota_metrics::uptime_metric(
424                "graphql",
425                version.full,
426                "unknown",
427            ))
428            .unwrap();
429
430        // METRICS
431        let metrics = Metrics::new(&registry);
432        let indexer_metrics = IndexerMetrics::new(&registry);
433        let state = AppState::new(
434            config.connection.clone(),
435            config.service.clone(),
436            metrics.clone(),
437            cancellation_token,
438            *version,
439        );
440        let mut builder = ServerBuilder::new(state);
441
442        let iota_names_config = config.service.iota_names.clone();
443        let zklogin_config = config.service.zklogin.clone();
444        let reader = PgManager::reader_with_config(
445            config.connection.db_url.clone(),
446            config.connection.db_pool_size,
447            // Bound each statement in a request with the overall request timeout, to bound DB
448            // utilisation (in the worst case we will use 2x the request timeout time in DB wall
449            // time).
450            config.service.limits.request_timeout_ms.into(),
451        )
452        .map_err(|e| Error::ServerInit(format!("Failed to create pg connection pool: {e}")))?;
453
454        if !config.connection.skip_migration_consistency_check {
455            // Compatibility check
456            info!("Starting compatibility check");
457            let mut connection = get_pool_connection(&reader.get_pool())?;
458            check_db_migration_consistency(&mut connection)?;
459            info!("Compatibility check passed");
460        }
461
462        // DB
463        let db = Db::new(
464            reader.clone(),
465            config.service.limits.clone(),
466            metrics.clone(),
467        );
468        let loader = DataLoader::new(db.clone());
469        let pg_conn_pool = PgManager::new(reader.clone());
470        let package_store = DbPackageStore::new(loader.clone());
471        let resolver = Arc::new(Resolver::new_with_limits(
472            PackageStoreWithLruCache::new(package_store),
473            config.service.limits.package_resolver_limits(),
474        ));
475
476        builder.db_reader = Some(db.clone());
477        builder.resolver = Some(resolver.clone());
478
479        let Some(fullnode_url) = config.tx_exec_full_node.node_rpc_url.as_ref() else {
480            return Err(Error::ServerInit(
481                "No fullnode url found in config".to_string(),
482            ));
483        };
484
485        let graphql_streams =
486            GraphQLStream::new(&config.connection.db_url, reader.clone(), &registry).await?;
487        let write_api = build_write_api(fullnode_url, reader, indexer_metrics)?;
488
489        builder = builder
490            .context_data(config.service.clone())
491            .context_data(loader)
492            .context_data(db)
493            .context_data(pg_conn_pool)
494            .context_data(resolver)
495            .context_data(write_api)
496            .context_data(iota_names_config)
497            .context_data(zklogin_config)
498            .context_data(metrics.clone())
499            .context_data(config.clone())
500            .context_data(graphql_streams)
501            .context_data(ChainIdentifierCache::default());
502
503        if config.internal_features.feature_gate {
504            builder = builder.extension(FeatureGate);
505        }
506
507        if config.internal_features.logger {
508            builder = builder.extension(Logger::default());
509        }
510
511        if config.internal_features.query_limits_checker {
512            builder = builder.extension(QueryLimitsChecker);
513        }
514
515        if config.internal_features.directive_checker {
516            builder = builder.extension(DirectiveChecker);
517        }
518
519        if config.internal_features.query_timeout {
520            builder = builder.extension(Timeout);
521        }
522
523        if config.internal_features.tracing {
524            builder = builder.extension(Tracing);
525        }
526
527        if config.internal_features.apollo_tracing {
528            builder = builder.extension(ApolloTracing);
529        }
530
531        // TODO: uncomment once impl
532        // if config.internal_features.open_telemetry { }
533
534        Ok(builder)
535    }
536}
537
538fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
539    async_graphql::Schema::build(Query, Mutation, Subscription)
540        .register_output_type::<IMoveObject>()
541        .register_output_type::<IObject>()
542        .register_output_type::<IOwner>()
543        .register_output_type::<IMoveDatatype>()
544}
545
546/// Return the string representation of the schema used by this server.
547pub fn export_schema() -> String {
548    schema_builder().finish().sdl()
549}
550
551/// Entry point for graphql requests.
552///
553/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
554/// request headers, and the watermark as set by the background task.
555async fn graphql_handler(
556    ConnectInfo(addr): ConnectInfo<SocketAddr>,
557    TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
558    schema: Extension<IotaGraphQLSchema>,
559    Extension(watermark_lock): Extension<WatermarkLock>,
560    headers: HeaderMap,
561    req: GraphQLRequest,
562) -> (axum::http::Extensions, GraphQLResponse) {
563    let mut req = req.into_inner();
564    req.data.insert(PayloadSize(content_length));
565    req.data.insert(Uuid::new_v4());
566    if headers.contains_key(ShowUsage::name()) {
567        req.data.insert(ShowUsage)
568    }
569
570    // Capture the IP address of the client
571    // Note: if a load balancer is used it must be configured to forward the client
572    // IP address
573    req.data.insert(addr);
574    req.data.insert(Watermark::new(watermark_lock).await);
575
576    let result = schema.execute(req).await;
577
578    // If there are errors, insert them as an extension so that the Metrics callback
579    // handler can pull it out later.
580    let mut extensions = axum::http::Extensions::new();
581    if result.is_err() {
582        extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
583    };
584    (extensions, result.into())
585}
586
587pub(crate) fn get_write_api<'ctx>(
588    ctx: &'ctx Context<'_>,
589) -> Result<&'ctx OptimisticWriteApi, Error> {
590    ctx.data_opt()
591        .ok_or_else(|| Error::Internal("Unable to get node execution interface".to_string()))
592}
593
594fn build_write_api(
595    fullnode_url: &str,
596    reader: IndexerReader,
597    metrics: IndexerMetrics,
598) -> Result<OptimisticWriteApi, Error> {
599    let json_rpc_client = build_json_rpc_client(fullnode_url)?;
600    let indexer_store = PgIndexerStore::new(reader.get_pool(), metrics.clone());
601    let optimistic_tx_executor =
602        OptimisticTransactionExecutor::new(fullnode_url, reader.clone(), indexer_store, metrics);
603    Ok(OptimisticWriteApi::new(
604        WriteApi::new(json_rpc_client, reader),
605        optimistic_tx_executor,
606    ))
607}
608
609fn build_json_rpc_client(rpc_client_url: &str) -> Result<HttpClient, Error> {
610    let mut headers = HeaderMap::new();
611    headers.insert(CLIENT_SDK_TYPE_HEADER, HeaderValue::from_static("graphql"));
612
613    let builder = HttpClientBuilder::default()
614        .max_request_size(2 << 30)
615        .set_headers(headers.clone())
616        .request_timeout(RPC_TIMEOUT_ERR_SLEEP_RETRY_PERIOD)
617        .max_concurrent_requests(MAX_CONCURRENT_REQUESTS);
618
619    builder.build(rpc_client_url).map_err(|e| {
620        warn!("Failed to get new Http client with error: {e:?}");
621        Error::Internal(format!(
622            "Failed to initialize fullnode RPC client with error: {e:?}"
623        ))
624    })
625}
626/// Entry point for graphql streaming requests.
627///
628/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
629/// request headers and tracks the connection information produced by the
630/// client.
631async fn subscription_handler(
632    ConnectInfo(addr): ConnectInfo<SocketAddr>,
633    Extension(schema): Extension<IotaGraphQLSchema>,
634    req: http::Request<Body>,
635) -> AxumResponse {
636    let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
637    let (mut parts, _body) = req.into_parts();
638
639    // extract GraphQL protocol
640    let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
641        Ok(protocol) => protocol,
642        Err(err) => return err.into_response(),
643    };
644
645    // extract WebSocket upgrade from request
646    let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
647        Ok(upgrade) => upgrade,
648        Err(err) => return err.into_response(),
649    };
650
651    let resp = upgrade
652        .protocols(ALL_WEBSOCKET_PROTOCOLS)
653        .on_upgrade(move |stream| async move {
654            // create connection data with per-connection values
655            let mut connection_data = Data::default();
656            // axum discards body on GET requests, being always 0
657            connection_data.insert(PayloadSize(0));
658            connection_data.insert(Uuid::new_v4());
659            if headers_contains_show_usage {
660                connection_data.insert(ShowUsage)
661            }
662            connection_data.insert(addr);
663
664            let connection =
665                GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
666            connection.serve().await;
667        });
668    resp
669}
670
671#[derive(Clone)]
672struct MetricsMakeCallbackHandler {
673    metrics: Metrics,
674}
675
676impl MakeCallbackHandler for MetricsMakeCallbackHandler {
677    type Handler = MetricsCallbackHandler;
678
679    fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
680        let start = Instant::now();
681        let metrics = self.metrics.clone();
682
683        metrics.request_metrics.inflight_requests.inc();
684        metrics.inc_num_queries();
685
686        MetricsCallbackHandler { metrics, start }
687    }
688}
689
690struct MetricsCallbackHandler {
691    metrics: Metrics,
692    start: Instant,
693}
694
695impl ResponseHandler for MetricsCallbackHandler {
696    fn on_response(&mut self, response: &http::response::Parts) {
697        if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
698            self.metrics.inc_errors(&errors.0);
699        }
700    }
701
702    fn on_error<E>(&mut self, _error: &E) {
703        // Do nothing if the whole service errored
704        //
705        // in Axum this isn't possible since all services are required to have
706        // an error type of Infallible
707    }
708}
709
710impl Drop for MetricsCallbackHandler {
711    fn drop(&mut self) {
712        self.metrics.query_latency(self.start.elapsed());
713        self.metrics.request_metrics.inflight_requests.dec();
714    }
715}
716
717#[derive(Debug, Clone)]
718struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
719
720/// Connect via a TCPStream to the DB to check if it is alive
721async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
722    let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
723        return StatusCode::INTERNAL_SERVER_ERROR;
724    };
725
726    let Some(host) = url.host_str() else {
727        return StatusCode::INTERNAL_SERVER_ERROR;
728    };
729
730    let tcp_url = if let Some(port) = url.port() {
731        format!("{host}:{port}")
732    } else {
733        host.to_string()
734    };
735
736    if TcpStream::connect(tcp_url).is_err() {
737        StatusCode::INTERNAL_SERVER_ERROR
738    } else {
739        StatusCode::OK
740    }
741}
742
743#[derive(serde::Deserialize)]
744struct HealthParam {
745    max_checkpoint_lag_ms: Option<u64>,
746}
747
748/// Endpoint for querying the health of the service.
749/// It returns 500 for any internal error, including not connecting to the DB,
750/// and 504 if the checkpoint timestamp is too far behind the current timestamp
751/// as per the max checkpoint timestamp lag query parameter, or the default
752/// value if not provided.
753async fn health_check(
754    State(connection): State<ConnectionConfig>,
755    Extension(watermark_lock): Extension<WatermarkLock>,
756    AxumQuery(query_params): AxumQuery<HealthParam>,
757) -> StatusCode {
758    let db_health_check = db_health_check(axum::extract::State(connection)).await;
759    if db_health_check != StatusCode::OK {
760        return db_health_check;
761    }
762
763    let max_checkpoint_lag_ms = query_params
764        .max_checkpoint_lag_ms
765        .map(Duration::from_millis)
766        .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
767
768    let checkpoint_timestamp =
769        Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);
770
771    let now_millis = Utc::now().timestamp_millis();
772
773    // Check for negative timestamp or conversion failure
774    let now: Duration = match u64::try_from(now_millis) {
775        Ok(val) => Duration::from_millis(val),
776        Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
777    };
778
779    if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
780        return StatusCode::GATEWAY_TIMEOUT;
781    }
782
783    db_health_check
784}
785
786// One server per proc, so this is okay
787async fn get_or_init_server_start_time() -> &'static Instant {
788    static ONCE: OnceCell<Instant> = OnceCell::const_new();
789    ONCE.get_or_init(|| async move { Instant::now() }).await
790}
791
792pub mod tests {
793    use std::{sync::Arc, time::Duration};
794
795    use async_graphql::{
796        Request, Response, Variables,
797        extensions::{Extension, ExtensionContext, NextExecute},
798    };
799    use iota_types::transaction::TransactionData;
800    use serde_json::json;
801    use uuid::Uuid;
802
803    use super::*;
804    use crate::{
805        config::{ConnectionConfig, Limits, ServiceConfig, Version},
806        context_data::db_data_provider::PgManager,
807        extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
808        test_infra::cluster::Cluster,
809    };
810
811    /// Prepares a schema for tests dealing with extensions. Returns a
812    /// `ServerBuilder` that can be further extended with `context_data` and
813    /// `extension` for testing.
814    fn prep_schema(
815        connection_config: Option<ConnectionConfig>,
816        service_config: Option<ServiceConfig>,
817    ) -> ServerBuilder {
818        let connection_config = connection_config.unwrap_or_default();
819        let service_config = service_config.unwrap_or_default();
820
821        let reader = PgManager::reader_with_config(
822            connection_config.db_url.clone(),
823            connection_config.db_pool_size,
824            service_config.limits.request_timeout_ms.into(),
825        )
826        .expect("failed to create pg connection pool");
827
828        let version = Version::for_testing();
829        let metrics = metrics();
830        let db = Db::new(
831            reader.clone(),
832            service_config.limits.clone(),
833            metrics.clone(),
834        );
835        let loader = DataLoader::new(db.clone());
836        let pg_conn_pool = PgManager::new(reader);
837        let cancellation_token = CancellationToken::new();
838        let watermark = Watermark {
839            checkpoint: 1,
840            checkpoint_timestamp_ms: 1,
841            epoch: 0,
842        };
843        let state = AppState::new(
844            connection_config.clone(),
845            service_config.clone(),
846            metrics.clone(),
847            cancellation_token.clone(),
848            version,
849        );
850        ServerBuilder::new(state)
851            .context_data(db)
852            .context_data(loader)
853            .context_data(pg_conn_pool)
854            .context_data(service_config)
855            .context_data(query_id())
856            .context_data(ip_address())
857            .context_data(watermark)
858            .context_data(ChainIdentifierCache::default())
859            .context_data(metrics)
860    }
861
862    fn metrics() -> Metrics {
863        let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap();
864        let registry = iota_metrics::start_prometheus_server(binding_address).default_registry();
865        Metrics::new(&registry)
866    }
867
868    fn ip_address() -> SocketAddr {
869        let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap();
870        binding_address
871    }
872
873    fn query_id() -> Uuid {
874        Uuid::new_v4()
875    }
876
877    pub async fn test_timeout_impl(cluster: &Cluster) {
878        struct TimedExecuteExt {
879            pub min_req_delay: Duration,
880        }
881
882        impl ExtensionFactory for TimedExecuteExt {
883            fn create(&self) -> Arc<dyn Extension> {
884                Arc::new(TimedExecuteExt {
885                    min_req_delay: self.min_req_delay,
886                })
887            }
888        }
889
890        #[async_trait::async_trait]
891        impl Extension for TimedExecuteExt {
892            async fn execute(
893                &self,
894                ctx: &ExtensionContext<'_>,
895                operation_name: Option<&str>,
896                next: NextExecute<'_>,
897            ) -> Response {
898                tokio::time::sleep(self.min_req_delay).await;
899                next.run(ctx, operation_name).await
900            }
901        }
902
903        async fn test_timeout(
904            delay: Duration,
905            timeout: Duration,
906            query: &str,
907            write_api: OptimisticWriteApi,
908        ) -> Response {
909            let mut cfg = ServiceConfig::default();
910            cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
911            cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
912
913            let schema = prep_schema(None, Some(cfg))
914                .context_data(write_api)
915                .extension(Timeout)
916                .extension(TimedExecuteExt {
917                    min_req_delay: delay,
918                })
919                .build_schema();
920
921            schema.execute(query).await
922        }
923
924        let wallet = &cluster.validator_fullnode_handle.wallet;
925        let store = &cluster.indexer_store;
926        let fn_rpc_url = &cluster.validator_fullnode_handle.fullnode_handle.rpc_url;
927        let indexer_metrics = store.get_metrics();
928        let indexer_reader = iota_indexer::read::IndexerReader::new(store.blocking_cp());
929        let json_rpc_client = build_json_rpc_client(fn_rpc_url).unwrap();
930
931        let optimistic_tx_executor =
932            iota_indexer::optimistic_indexing::OptimisticTransactionExecutor::new(
933                fn_rpc_url,
934                indexer_reader.clone(),
935                store.clone(),
936                indexer_metrics,
937            );
938        let write_api = OptimisticWriteApi::new(
939            WriteApi::new(json_rpc_client, indexer_reader),
940            optimistic_tx_executor,
941        );
942
943        let query = "{ chainIdentifier }";
944        let timeout = Duration::from_millis(1000);
945        let delay = Duration::from_millis(100);
946
947        test_timeout(delay, timeout, query, write_api.clone())
948            .await
949            .into_result()
950            .expect("should complete successfully");
951
952        // Should timeout
953        let errs: Vec<_> = test_timeout(delay, delay, query, write_api.clone())
954            .await
955            .into_result()
956            .unwrap_err()
957            .into_iter()
958            .map(|e| e.message)
959            .collect();
960        let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
961        assert_eq!(errs, vec![exp]);
962
963        // Should timeout for mutation
964        // Create a transaction and sign it, and use the tx_bytes + signatures for the
965        // GraphQL executeTransactionBlock mutation call.
966        let addresses = wallet.get_addresses();
967        let gas = wallet
968            .get_one_gas_object_owned_by_address(addresses[0])
969            .await
970            .unwrap();
971        let tx_data = TransactionData::new_transfer_iota(
972            addresses[1],
973            addresses[0],
974            Some(1000),
975            gas.unwrap(),
976            1_000_000,
977            wallet.get_reference_gas_price().await.unwrap(),
978        );
979
980        let tx = wallet.sign_transaction(&tx_data);
981        let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
982
983        let signature_base64 = &signatures[0];
984        let query = format!(
985            r#"
986            mutation {{
987              executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
988                effects {{
989                  status
990                }}
991              }}
992            }}"#,
993            tx_bytes.encoded(),
994            signature_base64.encoded()
995        );
996        let errs: Vec<_> = test_timeout(delay, delay, &query, write_api.clone())
997            .await
998            .into_result()
999            .unwrap_err()
1000            .into_iter()
1001            .map(|e| e.message)
1002            .collect();
1003        let exp = format!(
1004            "Mutation request timed out. Limit: {}s",
1005            delay.as_secs_f32()
1006        );
1007        assert_eq!(errs, vec![exp]);
1008    }
1009
1010    pub async fn test_query_depth_limit_impl() {
1011        async fn exec_query_depth_limit(depth: u32, query: &str) -> Response {
1012            let service_config = ServiceConfig {
1013                limits: Limits {
1014                    max_query_depth: depth,
1015                    ..Default::default()
1016                },
1017                ..Default::default()
1018            };
1019
1020            let schema = prep_schema(None, Some(service_config))
1021                .context_data(PayloadSize(100))
1022                .extension(QueryLimitsChecker)
1023                .build_schema();
1024            schema.execute(query).await
1025        }
1026
1027        exec_query_depth_limit(1, "{ chainIdentifier }")
1028            .await
1029            .into_result()
1030            .expect("should complete successfully");
1031
1032        exec_query_depth_limit(
1033            5,
1034            "{ chainIdentifier protocolConfig { configs { value key }} }",
1035        )
1036        .await
1037        .into_result()
1038        .expect("should complete successfully");
1039
1040        // Should fail
1041        let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }")
1042            .await
1043            .into_result()
1044            .unwrap_err()
1045            .into_iter()
1046            .map(|e| e.message)
1047            .collect();
1048
1049        assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
1050        let errs: Vec<_> = exec_query_depth_limit(
1051            2,
1052            "{ chainIdentifier protocolConfig { configs { value key }} }",
1053        )
1054        .await
1055        .into_result()
1056        .unwrap_err()
1057        .into_iter()
1058        .map(|e| e.message)
1059        .collect();
1060        assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
1061    }
1062
1063    pub async fn test_query_node_limit_impl() {
1064        async fn exec_query_node_limit(nodes: u32, query: &str) -> Response {
1065            let service_config = ServiceConfig {
1066                limits: Limits {
1067                    max_query_nodes: nodes,
1068                    ..Default::default()
1069                },
1070                ..Default::default()
1071            };
1072
1073            let schema = prep_schema(None, Some(service_config))
1074                .context_data(PayloadSize(100))
1075                .extension(QueryLimitsChecker)
1076                .build_schema();
1077            schema.execute(query).await
1078        }
1079
1080        exec_query_node_limit(1, "{ chainIdentifier }")
1081            .await
1082            .into_result()
1083            .expect("should complete successfully");
1084
1085        exec_query_node_limit(
1086            5,
1087            "{ chainIdentifier protocolConfig { configs { value key }} }",
1088        )
1089        .await
1090        .into_result()
1091        .expect("should complete successfully");
1092
1093        // Should fail
1094        let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }")
1095            .await
1096            .into_result()
1097            .unwrap_err()
1098            .into_iter()
1099            .map(|e| e.message)
1100            .collect();
1101        assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1102
1103        let err: Vec<_> = exec_query_node_limit(
1104            4,
1105            "{ chainIdentifier protocolConfig { configs { value key }} }",
1106        )
1107        .await
1108        .into_result()
1109        .unwrap_err()
1110        .into_iter()
1111        .map(|e| e.message)
1112        .collect();
1113        assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1114    }
1115
1116    pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) {
1117        let service_config = ServiceConfig {
1118            limits: Limits {
1119                default_page_size: 1,
1120                ..Default::default()
1121            },
1122            ..Default::default()
1123        };
1124        let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema();
1125
1126        let resp = schema
1127            .execute("{ checkpoints { nodes { sequenceNumber } } }")
1128            .await;
1129        let data = resp.data.clone().into_json().unwrap();
1130        let checkpoints = data
1131            .get("checkpoints")
1132            .unwrap()
1133            .get("nodes")
1134            .unwrap()
1135            .as_array()
1136            .unwrap();
1137        assert_eq!(
1138            checkpoints.len(),
1139            1,
1140            "Checkpoints should have exactly one element"
1141        );
1142
1143        let resp = schema
1144            .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1145            .await;
1146        let data = resp.data.clone().into_json().unwrap();
1147        let checkpoints = data
1148            .get("checkpoints")
1149            .unwrap()
1150            .get("nodes")
1151            .unwrap()
1152            .as_array()
1153            .unwrap();
1154        assert_eq!(
1155            checkpoints.len(),
1156            2,
1157            "Checkpoints should return two elements"
1158        );
1159    }
1160
1161    pub async fn test_query_max_page_limit_impl() {
1162        let schema = prep_schema(None, None).build_schema();
1163
1164        schema
1165            .execute("{ objects(first: 1) { nodes { version } } }")
1166            .await
1167            .into_result()
1168            .expect("should complete successfully");
1169
1170        // Should fail
1171        let err: Vec<_> = schema
1172            .execute("{ objects(first: 51) { nodes { version } } }")
1173            .await
1174            .into_result()
1175            .unwrap_err()
1176            .into_iter()
1177            .map(|e| e.message)
1178            .collect();
1179        assert_eq!(
1180            err,
1181            vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1182        );
1183    }
1184
1185    pub async fn test_query_complexity_metrics_impl() {
1186        let server_builder = prep_schema(None, None).context_data(PayloadSize(100));
1187        let metrics = server_builder.state.metrics.clone();
1188        let schema = server_builder
1189            .extension(QueryLimitsChecker) // QueryLimitsChecker is where we actually set the metrics
1190            .build_schema();
1191
1192        schema
1193            .execute("{ chainIdentifier }")
1194            .await
1195            .into_result()
1196            .expect("should complete successfully");
1197
1198        let req_metrics = metrics.request_metrics;
1199        assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1200        assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1201        assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1202        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1203        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1204        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1205
1206        schema
1207            .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1208            .await
1209            .into_result()
1210            .expect("should complete successfully");
1211
1212        assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1213        assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1214        assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1215        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1216        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1217        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1218    }
1219
1220    pub async fn test_health_check_impl() {
1221        let server_builder = prep_schema(None, None);
1222        let url = format!(
1223            "http://{}:{}/health",
1224            server_builder.state.connection.host, server_builder.state.connection.port
1225        );
1226        server_builder.build_schema();
1227
1228        let resp = reqwest::get(&url).await.unwrap();
1229        assert_eq!(resp.status(), reqwest::StatusCode::OK);
1230
1231        let url_with_param = format!("{url}?max_checkpoint_lag_ms=1");
1232        let resp = reqwest::get(&url_with_param).await.unwrap();
1233        assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1234    }
1235
1236    /// Execute a GraphQL request with `limits` in place, expecting an error to
1237    /// be returned. Returns a text representation of all the errors triggered.
1238    async fn execute_for_error(limits: Limits, request: Request) -> String {
1239        let service_config = ServiceConfig {
1240            limits,
1241            ..Default::default()
1242        };
1243
1244        let schema = prep_schema(None, Some(service_config))
1245            .context_data(PayloadSize(
1246                // Payload size is usually set per request, and it is the size of the raw HTTP
1247                // request, which includes the query, variables, and surrounding JSON. Simulate for
1248                // testing purposes by serializing the request back into JSON and baking its length
1249                // as context data into the schema.
1250                serde_json::to_string(&request).unwrap().len() as u64,
1251            ))
1252            .extension(QueryLimitsChecker)
1253            .build_schema();
1254
1255        let errs: Vec<_> = schema
1256            .execute(request)
1257            .await
1258            .into_result()
1259            .unwrap_err()
1260            .into_iter()
1261            .map(|e| e.message)
1262            .collect();
1263
1264        errs.join("\n")
1265    }
1266
1267    pub async fn test_payload_read_exceeded_impl() {
1268        assert_eq!(
1269            execute_for_error(
1270                Limits {
1271                    max_tx_payload_size: 400,
1272                    max_query_payload_size: 10,
1273                    ..Default::default()
1274                },
1275                r#"
1276                    mutation {
1277                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1278                            effects {
1279                                status
1280                            }
1281                        }
1282                    }
1283                "#
1284                .into()
1285            )
1286            .await,
1287            "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1288             transaction payloads (all inputs to executeTransactionBlock or \
1289             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 \
1290             bytes or fewer."
1291        );
1292    }
1293
1294    pub async fn test_payload_mutation_exceeded_impl() {
1295        assert_eq!(
1296            execute_for_error(
1297                Limits {
1298                    max_tx_payload_size: 10,
1299                    max_query_payload_size: 400,
1300                    ..Default::default()
1301                },
1302                r#"
1303                    mutation {
1304                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1305                            effects {
1306                                status
1307                            }
1308                        }
1309                    }
1310                "#
1311                .into()
1312            )
1313            .await,
1314            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1315             transaction payloads (all inputs to executeTransactionBlock or \
1316             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1317             bytes or fewer."
1318        );
1319    }
1320
1321    pub async fn test_payload_dry_run_exceeded_impl() {
1322        assert_eq!(
1323            execute_for_error(
1324                Limits {
1325                    max_tx_payload_size: 10,
1326                    max_query_payload_size: 400,
1327                    ..Default::default()
1328                },
1329                r#"
1330                    query {
1331                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1332                            error
1333                            transaction {
1334                                digest
1335                            }
1336                        }
1337                    }
1338                "#
1339                .into(),
1340            )
1341            .await,
1342            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1343             transaction payloads (all inputs to executeTransactionBlock or \
1344             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 bytes \
1345             or fewer."
1346        );
1347    }
1348
1349    pub async fn test_payload_total_exceeded_impl() {
1350        assert_eq!(
1351            execute_for_error(
1352                Limits {
1353                    max_tx_payload_size: 10,
1354                    max_query_payload_size: 10,
1355                    ..Default::default()
1356                },
1357                r#"
1358                    query {
1359                        dryRunTransactionBlock(txByte: "AAABBB") {
1360                            error
1361                            transaction {
1362                                digest
1363                            }
1364                        }
1365                    }
1366                "#
1367                .into(),
1368            )
1369            .await,
1370            "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1371             transaction payloads (all inputs to executeTransactionBlock or dryRunTransactionBlock) \
1372             and the rest of the request (the query part) must be 10 bytes or fewer."
1373        );
1374    }
1375
1376    pub async fn test_payload_using_vars_mutation_exceeded_impl() {
1377        assert_eq!(
1378            execute_for_error(
1379                Limits {
1380                    max_tx_payload_size: 10,
1381                    max_query_payload_size: 500,
1382                    ..Default::default()
1383                },
1384                Request::new(
1385                    r#"
1386                    mutation ($tx: String!, $sigs: [String!]!) {
1387                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1388                            effects {
1389                                status
1390                            }
1391                        }
1392                    }
1393                    "#
1394                )
1395                .variables(Variables::from_json(json!({
1396                    "tx": "AAABBBCCC",
1397                    "sigs": ["BBB"]
1398                })))
1399            )
1400            .await,
1401            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1402             transaction payloads (all inputs to executeTransactionBlock or \
1403             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1404             bytes or fewer."
1405        );
1406    }
1407
1408    pub async fn test_payload_using_vars_read_exceeded_impl() {
1409        assert_eq!(
1410            execute_for_error(
1411                Limits {
1412                    max_tx_payload_size: 500,
1413                    max_query_payload_size: 10,
1414                    ..Default::default()
1415                },
1416                Request::new(
1417                    r#"
1418                    mutation ($tx: String!, $sigs: [String!]!) {
1419                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1420                            effects {
1421                                status
1422                            }
1423                        }
1424                    }
1425                    "#
1426                )
1427                .variables(Variables::from_json(json!({
1428                    "tx": "AAA",
1429                    "sigs": ["BBB"]
1430                })))
1431            )
1432            .await,
1433            "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1434             transaction payloads (all inputs to executeTransactionBlock or \
1435             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1436             or fewer."
1437        );
1438    }
1439
1440    pub async fn test_payload_using_vars_dry_run_exceeded_impl() {
1441        assert_eq!(
1442            execute_for_error(
1443                Limits {
1444                    max_tx_payload_size: 10,
1445                    max_query_payload_size: 400,
1446                    ..Default::default()
1447                },
1448                Request::new(
1449                    r#"
1450                    query ($tx: String!) {
1451                        dryRunTransactionBlock(txBytes: $tx) {
1452                            error
1453                            transaction {
1454                                digest
1455                            }
1456                        }
1457                    }
1458                    "#
1459                )
1460                .variables(Variables::from_json(json!({
1461                    "tx": "AAABBBCCC"
1462                }))),
1463            )
1464            .await,
1465            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1466             transaction payloads (all inputs to executeTransactionBlock or \
1467             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1468             bytes or fewer."
1469        );
1470    }
1471
1472    pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() {
1473        assert_eq!(
1474            execute_for_error(
1475                Limits {
1476                    max_tx_payload_size: 400,
1477                    max_query_payload_size: 10,
1478                    ..Default::default()
1479                },
1480                Request::new(
1481                    r#"
1482                    query ($tx: String!) {
1483                        dryRunTransactionBlock(txBytes: $tx) {
1484                            error
1485                            transaction {
1486                                digest
1487                            }
1488                        }
1489                    }
1490                    "#
1491                )
1492                .variables(Variables::from_json(json!({
1493                    "tx": "AAABBBCCC"
1494                }))),
1495            )
1496            .await,
1497            "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1498             transaction payloads (all inputs to executeTransactionBlock or \
1499             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1500             or fewer."
1501        );
1502    }
1503
1504    pub async fn test_payload_multiple_execution_exceeded_impl() {
1505        // First check that the limit is large enough to hold one transaction's
1506        // parameters (by checking that we hit the read limit).
1507        let err = execute_for_error(
1508            Limits {
1509                max_tx_payload_size: 30,
1510                max_query_payload_size: 320,
1511                ..Default::default()
1512            },
1513            r#"
1514                mutation {
1515                    executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1516                        effects {
1517                            status
1518                        }
1519                    }
1520                }
1521            "#
1522            .into(),
1523        )
1524        .await;
1525        assert!(err.starts_with("Query part too large"), "{err}");
1526
1527        assert_eq!(
1528            execute_for_error(
1529                Limits {
1530                    max_tx_payload_size: 30,
1531                    max_query_payload_size: 800,
1532                    ..Default::default()
1533                },
1534                r#"
1535                    mutation {
1536                        e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1537                            effects {
1538                                status
1539                            }
1540                        }
1541                        e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1542                            effects {
1543                                status
1544                            }
1545                        }
1546                    }
1547                "#
1548                .into()
1549            )
1550            .await,
1551            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1552             transaction payloads (all inputs to executeTransactionBlock or \
1553             dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1554             bytes or fewer."
1555        );
1556    }
1557
1558    pub async fn test_payload_multiple_dry_run_exceeded_impl() {
1559        // First check that tx limit is large enough to hold one transaction's
1560        // parameters (by checking that we hit the read limit).
1561        let err = execute_for_error(
1562            Limits {
1563                max_tx_payload_size: 20,
1564                max_query_payload_size: 330,
1565                ..Default::default()
1566            },
1567            r#"
1568                query {
1569                    dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1570                       error
1571                       transaction {
1572                           digest
1573                       }
1574                    }
1575                }
1576            "#
1577            .into(),
1578        )
1579        .await;
1580        assert!(err.starts_with("Query part too large"), "{err}");
1581
1582        assert_eq!(
1583            execute_for_error(
1584                Limits {
1585                    max_tx_payload_size: 20,
1586                    max_query_payload_size: 800,
1587                    ..Default::default()
1588                },
1589                r#"
1590                    query {
1591                        d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1592                           error
1593                           transaction {
1594                               digest
1595                           }
1596                        }
1597                        d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1598                           error
1599                           transaction {
1600                               digest
1601                           }
1602                        }
1603                    }
1604                "#
1605                .into()
1606            )
1607            .await,
1608            "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1609             transaction payloads (all inputs to executeTransactionBlock or \
1610             dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1611             bytes or fewer."
1612        );
1613    }
1614
1615    pub async fn test_payload_execution_multiple_sigs_exceeded_impl() {
1616        // First check that the limit is large enough to hold a transaction with a
1617        // single signature (by checking that we hite the read limit).
1618        let err = execute_for_error(
1619            Limits {
1620                max_tx_payload_size: 30,
1621                max_query_payload_size: 320,
1622                ..Default::default()
1623            },
1624            r#"
1625                mutation {
1626                    executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1627                        effects {
1628                            status
1629                        }
1630                    }
1631                }
1632            "#
1633            .into(),
1634        )
1635        .await;
1636
1637        assert!(err.starts_with("Query part too large"), "{err}");
1638
1639        assert_eq!(
1640            execute_for_error(
1641                Limits {
1642                    max_tx_payload_size: 30,
1643                    max_query_payload_size: 500,
1644                    ..Default::default()
1645                },
1646                r#"
1647                    mutation {
1648                        executeTransactionBlock(
1649                            txBytes: "AAA",
1650                            signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1651                        ) {
1652                            effects {
1653                                status
1654                            }
1655                        }
1656                    }
1657                "#
1658                .into(),
1659            )
1660            .await,
1661            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1662             transaction payloads (all inputs to executeTransactionBlock or \
1663             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1664             bytes or fewer.",
1665        )
1666    }
1667
1668    pub async fn test_payload_sig_var_execution_exceeded_impl() {
1669        // Variables can show up in the sub-structure of a GraphQL value as well, and we
1670        // need to count those as well.
1671        assert_eq!(
1672            execute_for_error(
1673                Limits {
1674                    max_tx_payload_size: 10,
1675                    max_query_payload_size: 500,
1676                    ..Default::default()
1677                },
1678                Request::new(
1679                    r#"
1680                    mutation ($tx: String!, $sig: String!) {
1681                        executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1682                            effects {
1683                                status
1684                            }
1685                        }
1686                    }
1687                    "#
1688                )
1689                .variables(Variables::from_json(json!({
1690                    "tx": "AAA",
1691                    "sig": "BBB"
1692                })))
1693            )
1694            .await,
1695            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1696             transaction payloads (all inputs to executeTransactionBlock or \
1697             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1698             bytes or fewer."
1699        );
1700    }
1701
1702    /// Check if the error indicates that the request passed the overall size
1703    /// check and the transaction payload check.
1704    fn passed_tx_checks(err: &str) -> bool {
1705        !err.starts_with("Overall request too large")
1706            && !err.starts_with("Transaction payload too large")
1707    }
1708
1709    pub async fn test_payload_reusing_vars_execution_impl() {
1710        // Test that when variables are re-used as execution params, the size of the
1711        // variable is only counted once.
1712
1713        // First, check that `error_passed_tx_checks` is working, by submitting a
1714        // request that will fail the initial payload check.
1715        assert!(!passed_tx_checks(
1716            &execute_for_error(
1717                Limits {
1718                    max_tx_payload_size: 1,
1719                    max_query_payload_size: 1,
1720                    ..Default::default()
1721                },
1722                r#"
1723                    mutation {
1724                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1725                            effects {
1726                                status
1727                            }
1728                        }
1729                    }
1730                "#
1731                .into()
1732            )
1733            .await
1734        ));
1735
1736        let limits = Limits {
1737            max_tx_payload_size: 20,
1738            max_query_payload_size: 1000,
1739            ..Default::default()
1740        };
1741
1742        // Then check that a request that uses the variable once passes the transaction
1743        // limit check.
1744        assert!(passed_tx_checks(
1745            &execute_for_error(
1746                limits.clone(),
1747                Request::new(
1748                    r#"
1749                    mutation ($sig: String!) {
1750                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1751                            effects {
1752                                status
1753                            }
1754                        }
1755                    }
1756                    "#,
1757                )
1758                .variables(Variables::from_json(json!({
1759                    "sig": "BBB"
1760                })))
1761            )
1762            .await
1763        ));
1764
1765        // Then check that a request that introduces an extra signature, but without
1766        // re-using the variable fails the transaction limit.
1767        let execution_result = execute_for_error(
1768            limits.clone(),
1769            Request::new(
1770                r#"
1771                    mutation ($sig: String!) {
1772                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1773                            effects {
1774                                status
1775                            }
1776                        }
1777                    }
1778                    "#,
1779            )
1780            .variables(Variables::from_json(json!({
1781                "sig": "BBB"
1782            }))),
1783        )
1784        .await;
1785        assert!(!passed_tx_checks(&execution_result), "{execution_result}");
1786
1787        // And then when that use is replaced by re-using the variable, we should be
1788        // under the transaction payload limit again.
1789        let execution_result = execute_for_error(
1790            limits,
1791            Request::new(
1792                r#"
1793                    mutation ($sig: String!) {
1794                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1795                            effects {
1796                                status
1797                            }
1798                        }
1799                    }
1800                    "#,
1801            )
1802            .variables(Variables::from_json(json!({
1803                "sig": "BBB"
1804            }))),
1805        )
1806        .await;
1807        assert!(passed_tx_checks(&execution_result), "{execution_result}");
1808    }
1809
1810    pub async fn test_payload_reusing_vars_dry_run_impl() {
1811        // Like `test_payload_reusing_vars_execution` but the variable is used in a
1812        // dry-run.
1813
1814        let limits = Limits {
1815            max_tx_payload_size: 20,
1816            max_query_payload_size: 1000,
1817            ..Default::default()
1818        };
1819
1820        // A single dry-run is under the limit.
1821        assert!(passed_tx_checks(
1822            &execute_for_error(
1823                limits.clone(),
1824                Request::new(
1825                    r#"
1826                    query ($tx: String!) {
1827                        dryRunTransactionBlock(txBytes: $tx) {
1828                            error
1829                            transaction {
1830                                digest
1831                            }
1832                        }
1833                    }
1834                    "#,
1835                )
1836                .variables(Variables::from_json(json!({
1837                    "tx": "AAABBBCCC"
1838                })))
1839            )
1840            .await
1841        ));
1842
1843        // Duplicating the dry-run causes us to hit the limit.
1844        assert!(!passed_tx_checks(
1845            &execute_for_error(
1846                limits.clone(),
1847                Request::new(
1848                    r#"
1849                    query ($tx: String!) {
1850                        d0: dryRunTransactionBlock(txBytes: $tx) {
1851                            error
1852                            transaction {
1853                                digest
1854                            }
1855                        }
1856
1857                        d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1858                            error
1859                            transaction {
1860                                digest
1861                            }
1862                        }
1863                    }
1864                    "#,
1865                )
1866                .variables(Variables::from_json(json!({
1867                    "tx": "AAABBBCCC"
1868                })))
1869            )
1870            .await
1871        ));
1872
1873        // And by re-using the variable, we are under the transaction limit again.
1874        assert!(passed_tx_checks(
1875            &execute_for_error(
1876                limits,
1877                Request::new(
1878                    r#"
1879                    query ($tx: String!) {
1880                        d0: dryRunTransactionBlock(txBytes: $tx) {
1881                            error
1882                            transaction {
1883                                digest
1884                            }
1885                        }
1886
1887                        d1: dryRunTransactionBlock(txBytes: $tx) {
1888                            error
1889                            transaction {
1890                                digest
1891                            }
1892                        }
1893                    }
1894                    "#,
1895                )
1896                .variables(Variables::from_json(json!({
1897                    "tx": "AAABBBCCC"
1898                })))
1899            )
1900            .await
1901        ));
1902    }
1903
1904    pub async fn test_payload_named_fragment_execution_exceeded_impl() {
1905        assert_eq!(
1906            execute_for_error(
1907                Limits {
1908                    max_tx_payload_size: 10,
1909                    max_query_payload_size: 500,
1910                    ..Default::default()
1911                },
1912                r#"
1913                    mutation {
1914                        ...Tx
1915                    }
1916
1917                    fragment Tx on Mutation {
1918                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1919                            effects {
1920                                status
1921                            }
1922                        }
1923                    }
1924                "#
1925                .into()
1926            )
1927            .await,
1928            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1929             transaction payloads (all inputs to executeTransactionBlock or \
1930             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1931             bytes or fewer."
1932        );
1933    }
1934
1935    pub async fn test_payload_inline_fragment_execution_exceeded_impl() {
1936        assert_eq!(
1937            execute_for_error(
1938                Limits {
1939                    max_tx_payload_size: 10,
1940                    max_query_payload_size: 500,
1941                    ..Default::default()
1942                },
1943                r#"
1944                    mutation {
1945                        ... on Mutation {
1946                            executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1947                                effects {
1948                                    status
1949                                }
1950                            }
1951                        }
1952                    }
1953                "#
1954                .into()
1955            )
1956            .await,
1957            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1958             transaction payloads (all inputs to executeTransactionBlock or \
1959             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1960             bytes or fewer."
1961        );
1962    }
1963
1964    pub async fn test_payload_named_fragment_dry_run_exceeded_impl() {
1965        assert_eq!(
1966            execute_for_error(
1967                Limits {
1968                    max_tx_payload_size: 10,
1969                    max_query_payload_size: 500,
1970                    ..Default::default()
1971                },
1972                r#"
1973                    query {
1974                        ...DryRun
1975                    }
1976
1977                    fragment DryRun on Query {
1978                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1979                            error
1980                            transaction {
1981                                digest
1982                            }
1983                        }
1984                    }
1985                "#
1986                .into(),
1987            )
1988            .await,
1989            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1990             transaction payloads (all inputs to executeTransactionBlock or \
1991             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1992             bytes or fewer."
1993        );
1994    }
1995
1996    pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() {
1997        assert_eq!(
1998            execute_for_error(
1999                Limits {
2000                    max_tx_payload_size: 10,
2001                    max_query_payload_size: 500,
2002                    ..Default::default()
2003                },
2004                r#"
2005                    query {
2006                        ... on Query {
2007                            dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2008                                error
2009                                transaction {
2010                                    digest
2011                                }
2012                            }
2013                        }
2014                    }
2015                "#
2016                .into(),
2017            )
2018            .await,
2019            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2020             transaction payloads (all inputs to executeTransactionBlock or \
2021             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
2022             bytes or fewer."
2023        );
2024    }
2025}