iota_graphql_rpc/server/
builder.rs

1// Copyright (c) Mysten Labs, Inc.
2// Modifications Copyright (c) 2024 IOTA Stiftung
3// SPDX-License-Identifier: Apache-2.0
4use std::{
5    any::Any,
6    convert::Infallible,
7    net::{SocketAddr, TcpStream},
8    sync::Arc,
9    time::{Duration, Instant},
10};
11
12use async_graphql::{
13    Context, Data, Schema, SchemaBuilder,
14    extensions::{ApolloTracing, ExtensionFactory, Tracing},
15    http::ALL_WEBSOCKET_PROTOCOLS,
16};
17use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
18use axum::{
19    Extension, Router,
20    body::Body,
21    extract::{
22        ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
23    },
24    http::{HeaderMap, StatusCode},
25    middleware::{self},
26    response::{IntoResponse, Response as AxumResponse},
27    routing::{MethodRouter, Route, get, post},
28};
29use axum_extra::{TypedHeader, headers::ContentLength};
30use chrono::Utc;
31use http::{HeaderValue, Method, Request};
32use iota_graphql_rpc_headers::LIMITS_HEADER;
33use iota_grpc_client::Client as GrpcClient;
34use iota_indexer::{
35    apis::{OptimisticWriteApi, ReadApi, WriteApi},
36    db::{get_pool_connection, setup_postgres::check_db_migration_consistency},
37    metrics::IndexerMetrics,
38    optimistic_indexing::OptimisticTransactionExecutor,
39    read::IndexerReader,
40    store::PgIndexerStore,
41};
42use iota_metrics::spawn_monitored_task;
43use iota_network_stack::callback::{CallbackLayer, MakeCallbackHandler, ResponseHandler};
44use iota_package_resolver::{PackageStoreWithLruCache, Resolver};
45use tokio::{join, net::TcpListener, sync::OnceCell};
46use tokio_util::sync::CancellationToken;
47use tower::{Layer, Service};
48use tower_http::cors::{AllowOrigin, CorsLayer};
49use tracing::info;
50use uuid::Uuid;
51
52use crate::{
53    config::{ConnectionConfig, ServerConfig, ServiceConfig, Version},
54    context_data::db_data_provider::PgManager,
55    data::{
56        DataLoader, Db,
57        package_resolver::{DbPackageStore, PackageResolver},
58    },
59    error::Error,
60    extensions::{
61        directive_checker::DirectiveChecker,
62        feature_gate::FeatureGate,
63        logger::Logger,
64        query_limits_checker::{PayloadSize, QueryLimitsChecker, ShowUsage},
65        timeout::Timeout,
66    },
67    metrics::Metrics,
68    mutation::Mutation,
69    server::{
70        exchange_rates_task::TriggerExchangeRatesTask,
71        system_package_task::SystemPackageTask,
72        version::{check_version_middleware, set_version_middleware},
73        watermark_task::{Watermark, WatermarkLock, WatermarkTask},
74    },
75    types::{
76        chain_identifier::ChainIdentifierCache,
77        datatype::IMoveDatatype,
78        move_object::IMoveObject,
79        object::IObject,
80        owner::IOwner,
81        query::{IotaGraphQLSchema, Query},
82        subscription::{GraphQLStream, Subscription},
83    },
84};
85
86/// The default allowed maximum lag between the current timestamp and the
87/// checkpoint timestamp.
88const DEFAULT_MAX_CHECKPOINT_LAG: Duration = Duration::from_secs(300);
89/// The maximum complexity allowed for native [`async_graphql`] checks.
90const MAX_COMPLEXITY: usize = 6000;
91
92pub(crate) struct Server {
93    router: Router,
94    address: SocketAddr,
95    watermark_task: WatermarkTask,
96    system_package_task: SystemPackageTask,
97    trigger_exchange_rates_task: TriggerExchangeRatesTask,
98    state: AppState,
99}
100
101impl Server {
102    /// Start the GraphQL service and any background tasks it is dependent on.
103    /// When a cancellation signal is received, the method waits for all
104    /// tasks to complete before returning.
105    pub async fn run(mut self) -> Result<(), Error> {
106        get_or_init_server_start_time().await;
107
108        // A handle that spawns a background task to periodically update the
109        // `Watermark`, which consists of the checkpoint upper bound and current
110        // epoch.
111        let watermark_task = {
112            info!("Starting watermark update task");
113            spawn_monitored_task!(async move {
114                self.watermark_task.run().await;
115            })
116        };
117
118        // A handle that spawns a background task to evict system packages on epoch
119        // changes.
120        let system_package_task = {
121            info!("Starting system package task");
122            spawn_monitored_task!(async move {
123                self.system_package_task.run().await;
124            })
125        };
126
127        let trigger_exchange_rates_task = {
128            info!("Starting trigger exchange rates task");
129            spawn_monitored_task!(async move {
130                self.trigger_exchange_rates_task.run().await;
131            })
132        };
133
134        let server_task = {
135            info!("Starting graphql service");
136            let cancellation_token = self.state.cancellation_token.clone();
137            let address = self.address;
138            let router = self.router;
139            spawn_monitored_task!(async move {
140                axum::serve(
141                    TcpListener::bind(address)
142                        .await
143                        .map_err(|e| Error::Internal(format!("listener bind failed: {e}")))?,
144                    router.into_make_service_with_connect_info::<SocketAddr>(),
145                )
146                .with_graceful_shutdown(async move {
147                    cancellation_token.cancelled().await;
148                    info!("Shutdown signal received, terminating graphql service");
149                })
150                .await
151                .map_err(|e| Error::Internal(format!("Server run failed: {e}")))
152            })
153        };
154
155        // Wait for all tasks to complete. This ensures that the service doesn't fully
156        // shut down until all tasks and the server have completed their
157        // shutdown processes.
158        let _ = join!(
159            watermark_task,
160            system_package_task,
161            trigger_exchange_rates_task,
162            server_task
163        );
164
165        Ok(())
166    }
167}
168
169pub(crate) struct ServerBuilder {
170    state: AppState,
171    schema: SchemaBuilder<Query, Mutation, Subscription>,
172    router: Option<Router>,
173    db_reader: Option<Db>,
174    resolver: Option<PackageResolver>,
175}
176
177#[derive(Clone)]
178pub(crate) struct AppState {
179    connection: ConnectionConfig,
180    service: ServiceConfig,
181    metrics: Metrics,
182    cancellation_token: CancellationToken,
183    pub version: Version,
184}
185
186impl AppState {
187    pub(crate) fn new(
188        connection: ConnectionConfig,
189        service: ServiceConfig,
190        metrics: Metrics,
191        cancellation_token: CancellationToken,
192        version: Version,
193    ) -> Self {
194        Self {
195            connection,
196            service,
197            metrics,
198            cancellation_token,
199            version,
200        }
201    }
202}
203
204impl FromRef<AppState> for ConnectionConfig {
205    fn from_ref(app_state: &AppState) -> ConnectionConfig {
206        app_state.connection.clone()
207    }
208}
209
210impl FromRef<AppState> for Metrics {
211    fn from_ref(app_state: &AppState) -> Metrics {
212        app_state.metrics.clone()
213    }
214}
215
216impl ServerBuilder {
217    pub fn new(state: AppState) -> Self {
218        Self {
219            state,
220            schema: schema_builder(),
221            router: None,
222            db_reader: None,
223            resolver: None,
224        }
225    }
226
227    pub fn address(&self) -> String {
228        format!(
229            "{}:{}",
230            self.state.connection.host, self.state.connection.port
231        )
232    }
233
234    pub fn context_data(mut self, context_data: impl Any + Send + Sync) -> Self {
235        self.schema = self.schema.data(context_data);
236        self
237    }
238
239    pub fn extension(mut self, extension: impl ExtensionFactory) -> Self {
240        self.schema = self.schema.extension(extension);
241        self
242    }
243
244    fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
245        self.schema.finish()
246    }
247
248    /// Prepares the components of the server to be run. Finalizes the graphql
249    /// schema, and expects the `Db` and `Router` to have been initialized.
250    fn build_components(
251        self,
252    ) -> (
253        String,
254        Schema<Query, Mutation, Subscription>,
255        Db,
256        PackageResolver,
257        Router,
258    ) {
259        let address = self.address();
260        let ServerBuilder {
261            state: _,
262            schema,
263            db_reader,
264            resolver,
265            router,
266        } = self;
267        (
268            address,
269            schema.finish(),
270            db_reader.expect("db reader not initialized"),
271            resolver.expect("package resolver not initialized"),
272            router.expect("router not initialized"),
273        )
274    }
275
276    fn init_router(&mut self) {
277        if self.router.is_none() {
278            let router: Router = Router::new()
279                .route("/", post(graphql_handler))
280                .route("/subscriptions", get(subscription_handler))
281                .route("/{version}", post(graphql_handler))
282                .route("/{version}/subscriptions", get(subscription_handler))
283                .route("/graphql", post(graphql_handler))
284                .route("/graphql/subscriptions", get(subscription_handler))
285                .route("/graphql/{version}", post(graphql_handler))
286                .route(
287                    "/graphql/{version}/subscriptions",
288                    get(subscription_handler),
289                )
290                .route("/health", get(health_check))
291                .route("/graphql/health", get(health_check))
292                .route("/graphql/{version}/health", get(health_check))
293                .with_state(self.state.clone())
294                .route_layer(CallbackLayer::new(MetricsMakeCallbackHandler {
295                    metrics: self.state.metrics.clone(),
296                }));
297            self.router = Some(router);
298        }
299    }
300
301    pub fn route(mut self, path: &str, method_handler: MethodRouter) -> Self {
302        self.init_router();
303        self.router = self.router.map(|router| router.route(path, method_handler));
304        self
305    }
306
307    pub fn layer<L>(mut self, layer: L) -> Self
308    where
309        L: Layer<Route> + Clone + Send + Sync + 'static,
310        L::Service: Service<Request<Body>> + Clone + Send + Sync + 'static,
311        <L::Service as Service<Request<Body>>>::Response: IntoResponse + 'static,
312        <L::Service as Service<Request<Body>>>::Error: Into<Infallible> + 'static,
313        <L::Service as Service<Request<Body>>>::Future: Send + 'static,
314    {
315        self.init_router();
316        self.router = self.router.map(|router| router.layer(layer));
317        self
318    }
319
320    fn cors() -> Result<CorsLayer, Error> {
321        let acl = match std::env::var("ACCESS_CONTROL_ALLOW_ORIGIN") {
322            Ok(value) => {
323                let allow_hosts = value
324                    .split(',')
325                    .map(HeaderValue::from_str)
326                    .collect::<Result<Vec<_>, _>>()
327                    .map_err(|_| {
328                        Error::Internal(
329                            "Cannot resolve access control origin env variable".to_string(),
330                        )
331                    })?;
332                AllowOrigin::list(allow_hosts)
333            }
334            _ => AllowOrigin::any(),
335        };
336        info!("Access control allow origin set to: {acl:?}");
337
338        let cors = CorsLayer::new()
339            // Allow `POST` & `GET` when accessing the resource
340            .allow_methods([Method::POST, Method::GET])
341            // Allow requests from any origin
342            .allow_origin(acl)
343            .allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
344        Ok(cors)
345    }
346
347    /// Consumes the `ServerBuilder` to create a `Server` that can be run.
348    pub fn build(self) -> Result<Server, Error> {
349        let state = self.state.clone();
350        let (address, schema, db_reader, resolver, router) = self.build_components();
351
352        // Initialize the watermark background task struct.
353        let watermark_task = WatermarkTask::new(
354            db_reader.clone(),
355            state.metrics.clone(),
356            std::time::Duration::from_millis(state.service.background_tasks.watermark_update_ms),
357            state.cancellation_token.clone(),
358        );
359
360        let system_package_task = SystemPackageTask::new(
361            resolver,
362            watermark_task.epoch_receiver(),
363            state.cancellation_token.clone(),
364        );
365
366        let trigger_exchange_rates_task = TriggerExchangeRatesTask::new(
367            db_reader,
368            watermark_task.epoch_receiver(),
369            state.cancellation_token.clone(),
370        );
371
372        let router = router
373            .route_layer(middleware::from_fn_with_state(
374                state.version,
375                set_version_middleware,
376            ))
377            .route_layer(middleware::from_fn_with_state(
378                state.version,
379                check_version_middleware,
380            ))
381            .layer(axum::extract::Extension(schema))
382            .layer(axum::extract::Extension(watermark_task.lock()))
383            .layer(Self::cors()?);
384
385        Ok(Server {
386            router,
387            address: address
388                .parse()
389                .map_err(|_| Error::Internal(format!("Failed to parse address {address}")))?,
390            watermark_task,
391            system_package_task,
392            trigger_exchange_rates_task,
393            state,
394        })
395    }
396
397    /// Instantiate a `ServerBuilder` from a `ServerConfig`, typically called
398    /// when building the graphql service for production usage.
399    pub async fn from_config(
400        config: &ServerConfig,
401        version: &Version,
402        cancellation_token: CancellationToken,
403    ) -> Result<Self, Error> {
404        // PROMETHEUS
405        let prom_addr: SocketAddr = format!(
406            "{}:{}",
407            config.connection.prom_host, config.connection.prom_port
408        )
409        .parse()
410        .map_err(|_| {
411            Error::Internal(format!(
412                "Failed to parse url {}, port {} into socket address",
413                config.connection.prom_host, config.connection.prom_port
414            ))
415        })?;
416
417        let registry_service = iota_metrics::start_prometheus_server(prom_addr);
418        info!("Starting Prometheus HTTP endpoint at {}", prom_addr);
419        let registry = registry_service.default_registry();
420        registry
421            .register(iota_metrics::uptime_metric(
422                "graphql",
423                version.full,
424                "unknown",
425            ))
426            .unwrap();
427
428        // METRICS
429        let metrics = Metrics::new(&registry);
430        let indexer_metrics = IndexerMetrics::new(&registry);
431        let state = AppState::new(
432            config.connection.clone(),
433            config.service.clone(),
434            metrics.clone(),
435            cancellation_token.clone(),
436            *version,
437        );
438        let mut builder = ServerBuilder::new(state);
439
440        let iota_names_config = config.service.iota_names.clone();
441        let reader = PgManager::reader_with_config(
442            config.connection.db_url.clone(),
443            config.connection.db_pool_size,
444            // Bound each statement in a request with the overall request timeout, to bound DB
445            // utilisation (in the worst case we will use 2x the request timeout time in DB wall
446            // time).
447            config.service.limits.request_timeout_ms.into(),
448            indexer_metrics.clone(),
449            cancellation_token,
450        )
451        .map_err(|e| Error::ServerInit(format!("Failed to create pg connection pool: {e}")))?;
452
453        if !config.connection.skip_migration_consistency_check {
454            // Compatibility check
455            info!("Starting compatibility check");
456            let mut connection = get_pool_connection(&reader.get_pool())?;
457            check_db_migration_consistency(&mut connection)?;
458            info!("Compatibility check passed");
459        }
460
461        // DB
462        let db = Db::new(
463            reader.clone(),
464            config.service.limits.clone(),
465            metrics.clone(),
466            config.connection.max_available_range,
467        );
468        let loader = DataLoader::new(db.clone());
469        let pg_conn_pool = PgManager::new(reader.clone());
470        let package_store = DbPackageStore::new(loader.clone());
471        let resolver = Arc::new(Resolver::new_with_limits(
472            PackageStoreWithLruCache::new(package_store),
473            config.service.limits.package_resolver_limits(),
474        ));
475
476        builder.db_reader = Some(db.clone());
477        builder.resolver = Some(resolver.clone());
478
479        let Some(fullnode_url) = config.tx_exec_full_node.node_rpc_url.as_ref() else {
480            return Err(Error::ServerInit(
481                "No fullnode url found in config".to_string(),
482            ));
483        };
484
485        let graphql_streams = GraphQLStream::new(reader.clone(), &registry).await?;
486
487        let fullnode_grpc_client = GrpcClient::new(fullnode_url)
488            .await
489            .map_err(|e| Error::ServerInit(e.to_string()))?;
490
491        let read_api = ReadApi::new(reader.clone(), fullnode_grpc_client.clone());
492        let write_api = build_write_api(fullnode_grpc_client, reader, indexer_metrics).await?;
493
494        builder = builder
495            .context_data(config.service.clone())
496            .context_data(loader)
497            .context_data(db)
498            .context_data(pg_conn_pool)
499            .context_data(resolver)
500            .context_data(write_api)
501            .context_data(read_api)
502            .context_data(iota_names_config)
503            .context_data(metrics.clone())
504            .context_data(config.clone())
505            .context_data(graphql_streams)
506            .context_data(ChainIdentifierCache::default());
507
508        if config.internal_features.feature_gate {
509            builder = builder.extension(FeatureGate);
510        }
511
512        if config.internal_features.logger {
513            builder = builder.extension(Logger::default());
514        }
515
516        if config.internal_features.query_limits_checker {
517            builder = builder.extension(QueryLimitsChecker);
518        }
519
520        if config.internal_features.directive_checker {
521            builder = builder.extension(DirectiveChecker);
522        }
523
524        if config.internal_features.query_timeout {
525            builder = builder.extension(Timeout);
526        }
527
528        if config.internal_features.tracing {
529            builder = builder.extension(Tracing);
530        }
531
532        if config.internal_features.apollo_tracing {
533            builder = builder.extension(ApolloTracing);
534        }
535
536        // TODO: uncomment once impl
537        // if config.internal_features.open_telemetry { }
538
539        Ok(builder)
540    }
541}
542
543fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
544    async_graphql::Schema::build(Query, Mutation, Subscription)
545        .limit_complexity(MAX_COMPLEXITY)
546        .register_output_type::<IMoveObject>()
547        .register_output_type::<IObject>()
548        .register_output_type::<IOwner>()
549        .register_output_type::<IMoveDatatype>()
550}
551
552/// Return the string representation of the schema used by this server.
553pub fn export_schema() -> String {
554    schema_builder().finish().sdl()
555}
556
557/// Entry point for graphql requests.
558///
559/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
560/// request headers, and the watermark as set by the background task.
561async fn graphql_handler(
562    ConnectInfo(addr): ConnectInfo<SocketAddr>,
563    TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
564    schema: Extension<IotaGraphQLSchema>,
565    Extension(watermark_lock): Extension<WatermarkLock>,
566    headers: HeaderMap,
567    req: GraphQLRequest,
568) -> (axum::http::Extensions, GraphQLResponse) {
569    let mut req = req.into_inner();
570    req.data.insert(PayloadSize(content_length));
571    req.data.insert(Uuid::new_v4());
572    if headers.contains_key(ShowUsage::name()) {
573        req.data.insert(ShowUsage)
574    }
575
576    // Capture the IP address of the client
577    // Note: if a load balancer is used it must be configured to forward the client
578    // IP address
579    req.data.insert(addr);
580    req.data.insert(Watermark::new(watermark_lock).await);
581
582    let result = schema.execute(req).await;
583
584    // If there are errors, insert them as an extension so that the Metrics callback
585    // handler can pull it out later.
586    let mut extensions = axum::http::Extensions::new();
587    if result.is_err() {
588        extensions.insert(GraphqlErrors(std::sync::Arc::new(result.errors.clone())));
589    };
590    (extensions, result.into())
591}
592
593pub(crate) fn get_write_api<'ctx>(
594    ctx: &'ctx Context<'_>,
595) -> Result<&'ctx OptimisticWriteApi, Error> {
596    ctx.data_opt()
597        .ok_or_else(|| Error::Internal("Unable to get node execution interface".to_string()))
598}
599
600async fn build_write_api(
601    fullnode_grpc_client: GrpcClient,
602    reader: IndexerReader,
603    metrics: IndexerMetrics,
604) -> Result<OptimisticWriteApi, Error> {
605    let indexer_store = PgIndexerStore::new(reader.get_pool(), metrics.clone());
606    let optimistic_tx_executor = OptimisticTransactionExecutor::new(
607        fullnode_grpc_client.clone(),
608        reader.clone(),
609        indexer_store,
610        metrics,
611    )
612    .await?;
613    Ok(OptimisticWriteApi::new(
614        WriteApi::new(fullnode_grpc_client, reader),
615        optimistic_tx_executor,
616    ))
617}
618
619/// Entry point for graphql streaming requests.
620///
621/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
622/// request headers and tracks the connection information produced by the
623/// client.
624async fn subscription_handler(
625    ConnectInfo(addr): ConnectInfo<SocketAddr>,
626    Extension(schema): Extension<IotaGraphQLSchema>,
627    req: http::Request<Body>,
628) -> AxumResponse {
629    let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
630    let (mut parts, _body) = req.into_parts();
631
632    // extract GraphQL protocol
633    let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
634        Ok(protocol) => protocol,
635        Err(err) => return err.into_response(),
636    };
637
638    // extract WebSocket upgrade from request
639    let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
640        Ok(upgrade) => upgrade,
641        Err(err) => return err.into_response(),
642    };
643
644    let resp = upgrade
645        .protocols(ALL_WEBSOCKET_PROTOCOLS)
646        .on_upgrade(move |stream| async move {
647            // create connection data with per-connection values
648            let mut connection_data = Data::default();
649            // axum discards body on GET requests, being always 0
650            connection_data.insert(PayloadSize(0));
651            connection_data.insert(Uuid::new_v4());
652            if headers_contains_show_usage {
653                connection_data.insert(ShowUsage)
654            }
655            connection_data.insert(addr);
656
657            let connection =
658                GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
659            connection.serve().await;
660        });
661    resp
662}
663
664#[derive(Clone)]
665struct MetricsMakeCallbackHandler {
666    metrics: Metrics,
667}
668
669impl MakeCallbackHandler for MetricsMakeCallbackHandler {
670    type Handler = MetricsCallbackHandler;
671
672    fn make_handler(&self, _request: &http::request::Parts) -> Self::Handler {
673        let start = Instant::now();
674        let metrics = self.metrics.clone();
675
676        metrics.request_metrics.inflight_requests.inc();
677        metrics.inc_num_queries();
678
679        MetricsCallbackHandler { metrics, start }
680    }
681}
682
683struct MetricsCallbackHandler {
684    metrics: Metrics,
685    start: Instant,
686}
687
688impl ResponseHandler for MetricsCallbackHandler {
689    fn on_response(&mut self, response: &http::response::Parts) {
690        if let Some(errors) = response.extensions.get::<GraphqlErrors>() {
691            self.metrics.inc_errors(&errors.0);
692        }
693    }
694
695    fn on_error<E>(&mut self, _error: &E) {
696        // Do nothing if the whole service errored
697        //
698        // in Axum this isn't possible since all services are required to have
699        // an error type of Infallible
700    }
701}
702
703impl Drop for MetricsCallbackHandler {
704    fn drop(&mut self) {
705        self.metrics.query_latency(self.start.elapsed());
706        self.metrics.request_metrics.inflight_requests.dec();
707    }
708}
709
710#[derive(Debug, Clone)]
711struct GraphqlErrors(std::sync::Arc<Vec<async_graphql::ServerError>>);
712
713/// Connect via a TCPStream to the DB to check if it is alive
714async fn db_health_check(State(connection): State<ConnectionConfig>) -> StatusCode {
715    let Ok(url) = reqwest::Url::parse(connection.db_url.as_str()) else {
716        return StatusCode::INTERNAL_SERVER_ERROR;
717    };
718
719    let Some(host) = url.host_str() else {
720        return StatusCode::INTERNAL_SERVER_ERROR;
721    };
722
723    let tcp_url = if let Some(port) = url.port() {
724        format!("{host}:{port}")
725    } else {
726        host.to_string()
727    };
728
729    if TcpStream::connect(tcp_url).is_err() {
730        StatusCode::INTERNAL_SERVER_ERROR
731    } else {
732        StatusCode::OK
733    }
734}
735
736#[derive(serde::Deserialize)]
737struct HealthParam {
738    max_checkpoint_lag_ms: Option<u64>,
739}
740
741/// Endpoint for querying the health of the service.
742/// It returns 500 for any internal error, including not connecting to the DB,
743/// and 504 if the checkpoint timestamp is too far behind the current timestamp
744/// as per the max checkpoint timestamp lag query parameter, or the default
745/// value if not provided.
746async fn health_check(
747    State(connection): State<ConnectionConfig>,
748    Extension(watermark_lock): Extension<WatermarkLock>,
749    AxumQuery(query_params): AxumQuery<HealthParam>,
750) -> StatusCode {
751    let db_health_check = db_health_check(axum::extract::State(connection)).await;
752    if db_health_check != StatusCode::OK {
753        return db_health_check;
754    }
755
756    let max_checkpoint_lag_ms = query_params
757        .max_checkpoint_lag_ms
758        .map(Duration::from_millis)
759        .unwrap_or_else(|| DEFAULT_MAX_CHECKPOINT_LAG);
760
761    let checkpoint_timestamp =
762        Duration::from_millis(watermark_lock.read().await.checkpoint_timestamp_ms);
763
764    let now_millis = Utc::now().timestamp_millis();
765
766    // Check for negative timestamp or conversion failure
767    let now: Duration = match u64::try_from(now_millis) {
768        Ok(val) => Duration::from_millis(val),
769        Err(_) => return StatusCode::INTERNAL_SERVER_ERROR,
770    };
771
772    if (now - checkpoint_timestamp) > max_checkpoint_lag_ms {
773        return StatusCode::GATEWAY_TIMEOUT;
774    }
775
776    db_health_check
777}
778
779// One server per proc, so this is okay
780async fn get_or_init_server_start_time() -> &'static Instant {
781    static ONCE: OnceCell<Instant> = OnceCell::const_new();
782    ONCE.get_or_init(|| async move { Instant::now() }).await
783}
784
785pub mod tests {
786    use std::{sync::Arc, time::Duration};
787
788    use async_graphql::{
789        Request, Response, Variables,
790        extensions::{Extension, ExtensionContext, NextExecute},
791    };
792    use iota_types::transaction::{TransactionData, TransactionDataAPI};
793    use serde_json::json;
794    use uuid::Uuid;
795
796    use super::*;
797    use crate::{
798        config::{ConnectionConfig, Limits, ServiceConfig, Version},
799        context_data::db_data_provider::PgManager,
800        extensions::{query_limits_checker::QueryLimitsChecker, timeout::Timeout},
801        test_infra::cluster::Cluster,
802    };
803
804    /// Prepares a schema for tests dealing with extensions. Returns a
805    /// `ServerBuilder` that can be further extended with `context_data` and
806    /// `extension` for testing.
807    fn prep_schema(
808        connection_config: Option<ConnectionConfig>,
809        service_config: Option<ServiceConfig>,
810    ) -> ServerBuilder {
811        let connection_config = connection_config.unwrap_or_default();
812        let service_config = service_config.unwrap_or_default();
813
814        let reader = PgManager::reader_with_config(
815            connection_config.db_url.clone(),
816            connection_config.db_pool_size,
817            service_config.limits.request_timeout_ms.into(),
818            IndexerMetrics::new(&prometheus::Registry::new()),
819            CancellationToken::new(),
820        )
821        .expect("failed to create pg connection pool");
822
823        let version = Version::for_testing();
824        let metrics = metrics();
825
826        let db = Db::new(
827            reader.clone(),
828            service_config.limits.clone(),
829            metrics.clone(),
830            connection_config.max_available_range,
831        );
832        let loader = DataLoader::new(db.clone());
833        let pg_conn_pool = PgManager::new(reader);
834        let cancellation_token = CancellationToken::new();
835        let watermark = Watermark {
836            checkpoint: 1,
837            checkpoint_timestamp_ms: 1,
838            epoch: 0,
839        };
840        let state = AppState::new(
841            connection_config,
842            service_config.clone(),
843            metrics.clone(),
844            cancellation_token,
845            version,
846        );
847        ServerBuilder::new(state)
848            .context_data(db)
849            .context_data(loader)
850            .context_data(pg_conn_pool)
851            .context_data(service_config)
852            .context_data(query_id())
853            .context_data(ip_address())
854            .context_data(watermark)
855            .context_data(ChainIdentifierCache::default())
856            .context_data(metrics)
857    }
858
859    fn metrics() -> Metrics {
860        let binding_address: SocketAddr = "0.0.0.0:9185".parse().unwrap();
861        let registry = iota_metrics::start_prometheus_server(binding_address).default_registry();
862        Metrics::new(&registry)
863    }
864
865    fn ip_address() -> SocketAddr {
866        let binding_address: SocketAddr = "0.0.0.0:51515".parse().unwrap();
867        binding_address
868    }
869
870    fn query_id() -> Uuid {
871        Uuid::new_v4()
872    }
873
874    pub async fn test_timeout_impl(cluster: &Cluster) {
875        struct TimedExecuteExt {
876            pub min_req_delay: Duration,
877        }
878
879        impl ExtensionFactory for TimedExecuteExt {
880            fn create(&self) -> Arc<dyn Extension> {
881                Arc::new(TimedExecuteExt {
882                    min_req_delay: self.min_req_delay,
883                })
884            }
885        }
886
887        #[async_trait::async_trait]
888        impl Extension for TimedExecuteExt {
889            async fn execute(
890                &self,
891                ctx: &ExtensionContext<'_>,
892                operation_name: Option<&str>,
893                next: NextExecute<'_>,
894            ) -> Response {
895                tokio::time::sleep(self.min_req_delay).await;
896                next.run(ctx, operation_name).await
897            }
898        }
899
900        async fn test_timeout(
901            delay: Duration,
902            timeout: Duration,
903            query: &str,
904            write_api: OptimisticWriteApi,
905        ) -> Response {
906            let mut cfg = ServiceConfig::default();
907            cfg.limits.request_timeout_ms = timeout.as_millis() as u32;
908            cfg.limits.mutation_timeout_ms = timeout.as_millis() as u32;
909
910            let schema = prep_schema(None, Some(cfg))
911                .context_data(write_api)
912                .extension(Timeout)
913                .extension(TimedExecuteExt {
914                    min_req_delay: delay,
915                })
916                .build_schema();
917
918            schema.execute(query).await
919        }
920
921        let wallet = &cluster.validator_fullnode_handle.wallet;
922        let store = &cluster.indexer_store;
923        let fn_grpc_url = &cluster.validator_fullnode_handle.grpc_url();
924        let indexer_metrics = store.get_metrics();
925
926        // Use reader without watermark cache, test doesn't check pruning behaviour
927        let indexer_reader =
928            iota_indexer::read::IndexerReader::new_without_watermark_cache(store.blocking_cp());
929        let fullnode_gpc_client = GrpcClient::new(fn_grpc_url).await.unwrap();
930
931        let optimistic_tx_executor =
932            iota_indexer::optimistic_indexing::OptimisticTransactionExecutor::new(
933                fullnode_gpc_client.clone(),
934                indexer_reader.clone(),
935                store.clone(),
936                indexer_metrics,
937            )
938            .await
939            .unwrap();
940        let write_api = OptimisticWriteApi::new(
941            WriteApi::new(fullnode_gpc_client, indexer_reader),
942            optimistic_tx_executor,
943        );
944
945        let query = "{ chainIdentifier }";
946        let timeout = Duration::from_millis(1000);
947        let delay = Duration::from_millis(100);
948
949        test_timeout(delay, timeout, query, write_api.clone())
950            .await
951            .into_result()
952            .expect("should complete successfully");
953
954        // Should timeout
955        let errs: Vec<_> = test_timeout(delay, delay, query, write_api.clone())
956            .await
957            .into_result()
958            .unwrap_err()
959            .into_iter()
960            .map(|e| e.message)
961            .collect();
962        let exp = format!("Query request timed out. Limit: {}s", delay.as_secs_f32());
963        assert_eq!(errs, vec![exp]);
964
965        // Should timeout for mutation
966        // Create a transaction and sign it, and use the tx_bytes + signatures for the
967        // GraphQL executeTransactionBlock mutation call.
968        let addresses = wallet.get_addresses();
969        let gas = wallet
970            .get_one_gas_object_owned_by_address(addresses[0])
971            .await
972            .unwrap();
973        let tx_data = TransactionData::new_transfer_iota(
974            addresses[1],
975            addresses[0],
976            Some(1000),
977            gas.unwrap(),
978            1_000_000,
979            wallet.get_reference_gas_price().await.unwrap(),
980        );
981
982        let tx = wallet.sign_transaction(&tx_data);
983        let (tx_bytes, signatures) = tx.to_tx_bytes_and_signatures();
984
985        let signature_base64 = &signatures[0];
986        let query = format!(
987            r#"
988            mutation {{
989              executeTransactionBlock(txBytes: "{}", signatures: "{}") {{
990                effects {{
991                  status
992                }}
993              }}
994            }}"#,
995            tx_bytes.encoded(),
996            signature_base64.encoded()
997        );
998        let errs: Vec<_> = test_timeout(delay, delay, &query, write_api.clone())
999            .await
1000            .into_result()
1001            .unwrap_err()
1002            .into_iter()
1003            .map(|e| e.message)
1004            .collect();
1005        let exp = format!(
1006            "Mutation request timed out. Limit: {}s",
1007            delay.as_secs_f32()
1008        );
1009        assert_eq!(errs, vec![exp]);
1010    }
1011
1012    pub async fn test_query_depth_limit_impl() {
1013        async fn exec_query_depth_limit(depth: u32, query: &str) -> Response {
1014            let service_config = ServiceConfig {
1015                limits: Limits {
1016                    max_query_depth: depth,
1017                    ..Default::default()
1018                },
1019                ..Default::default()
1020            };
1021
1022            let schema = prep_schema(None, Some(service_config))
1023                .context_data(PayloadSize(100))
1024                .extension(QueryLimitsChecker)
1025                .build_schema();
1026            schema.execute(query).await
1027        }
1028
1029        exec_query_depth_limit(1, "{ chainIdentifier }")
1030            .await
1031            .into_result()
1032            .expect("should complete successfully");
1033
1034        exec_query_depth_limit(
1035            5,
1036            "{ chainIdentifier protocolConfig { configs { value key }} }",
1037        )
1038        .await
1039        .into_result()
1040        .expect("should complete successfully");
1041
1042        // Should fail
1043        let errs: Vec<_> = exec_query_depth_limit(0, "{ chainIdentifier }")
1044            .await
1045            .into_result()
1046            .unwrap_err()
1047            .into_iter()
1048            .map(|e| e.message)
1049            .collect();
1050
1051        assert_eq!(errs, vec!["Query nesting is over 0".to_string()]);
1052        let errs: Vec<_> = exec_query_depth_limit(
1053            2,
1054            "{ chainIdentifier protocolConfig { configs { value key }} }",
1055        )
1056        .await
1057        .into_result()
1058        .unwrap_err()
1059        .into_iter()
1060        .map(|e| e.message)
1061        .collect();
1062        assert_eq!(errs, vec!["Query nesting is over 2".to_string()]);
1063    }
1064
1065    pub async fn test_query_node_limit_impl() {
1066        async fn exec_query_node_limit(nodes: u32, query: &str) -> Response {
1067            let service_config = ServiceConfig {
1068                limits: Limits {
1069                    max_query_nodes: nodes,
1070                    ..Default::default()
1071                },
1072                ..Default::default()
1073            };
1074
1075            let schema = prep_schema(None, Some(service_config))
1076                .context_data(PayloadSize(100))
1077                .extension(QueryLimitsChecker)
1078                .build_schema();
1079            schema.execute(query).await
1080        }
1081
1082        exec_query_node_limit(1, "{ chainIdentifier }")
1083            .await
1084            .into_result()
1085            .expect("should complete successfully");
1086
1087        exec_query_node_limit(
1088            5,
1089            "{ chainIdentifier protocolConfig { configs { value key }} }",
1090        )
1091        .await
1092        .into_result()
1093        .expect("should complete successfully");
1094
1095        // Should fail
1096        let err: Vec<_> = exec_query_node_limit(0, "{ chainIdentifier }")
1097            .await
1098            .into_result()
1099            .unwrap_err()
1100            .into_iter()
1101            .map(|e| e.message)
1102            .collect();
1103        assert_eq!(err, vec!["Query has over 0 nodes".to_string()]);
1104
1105        let err: Vec<_> = exec_query_node_limit(
1106            4,
1107            "{ chainIdentifier protocolConfig { configs { value key }} }",
1108        )
1109        .await
1110        .into_result()
1111        .unwrap_err()
1112        .into_iter()
1113        .map(|e| e.message)
1114        .collect();
1115        assert_eq!(err, vec!["Query has over 4 nodes".to_string()]);
1116    }
1117
1118    pub async fn test_query_default_page_limit_impl(connection_config: ConnectionConfig) {
1119        let service_config = ServiceConfig {
1120            limits: Limits {
1121                default_page_size: 1,
1122                ..Default::default()
1123            },
1124            ..Default::default()
1125        };
1126        let schema = prep_schema(Some(connection_config), Some(service_config)).build_schema();
1127
1128        let resp = schema
1129            .execute("{ checkpoints { nodes { sequenceNumber } } }")
1130            .await;
1131        let data = resp.data.clone().into_json().unwrap();
1132        let checkpoints = data
1133            .get("checkpoints")
1134            .unwrap()
1135            .get("nodes")
1136            .unwrap()
1137            .as_array()
1138            .unwrap();
1139        assert_eq!(
1140            checkpoints.len(),
1141            1,
1142            "Checkpoints should have exactly one element"
1143        );
1144
1145        let resp = schema
1146            .execute("{ checkpoints(first: 2) { nodes { sequenceNumber } } }")
1147            .await;
1148        let data = resp.data.into_json().unwrap();
1149        let checkpoints = data
1150            .get("checkpoints")
1151            .unwrap()
1152            .get("nodes")
1153            .unwrap()
1154            .as_array()
1155            .unwrap();
1156        assert_eq!(
1157            checkpoints.len(),
1158            2,
1159            "Checkpoints should return two elements"
1160        );
1161    }
1162
1163    pub async fn test_query_max_page_limit_impl() {
1164        let schema = prep_schema(None, None).build_schema();
1165
1166        schema
1167            .execute("{ objects(first: 1) { nodes { version } } }")
1168            .await
1169            .into_result()
1170            .expect("should complete successfully");
1171
1172        // Should fail
1173        let err: Vec<_> = schema
1174            .execute("{ objects(first: 51) { nodes { version } } }")
1175            .await
1176            .into_result()
1177            .unwrap_err()
1178            .into_iter()
1179            .map(|e| e.message)
1180            .collect();
1181        assert_eq!(
1182            err,
1183            vec!["Connection's page size of 51 exceeds max of 50".to_string()]
1184        );
1185    }
1186
1187    pub async fn test_query_complexity_metrics_impl() {
1188        let server_builder = prep_schema(None, None).context_data(PayloadSize(100));
1189        let metrics = server_builder.state.metrics.clone();
1190        let schema = server_builder
1191            .extension(QueryLimitsChecker) // QueryLimitsChecker is where we actually set the metrics
1192            .build_schema();
1193
1194        schema
1195            .execute("{ chainIdentifier }")
1196            .await
1197            .into_result()
1198            .expect("should complete successfully");
1199
1200        let req_metrics = metrics.request_metrics;
1201        assert_eq!(req_metrics.input_nodes.get_sample_count(), 1);
1202        assert_eq!(req_metrics.output_nodes.get_sample_count(), 1);
1203        assert_eq!(req_metrics.query_depth.get_sample_count(), 1);
1204        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 1.);
1205        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 1.);
1206        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1.);
1207
1208        schema
1209            .execute("{ chainIdentifier protocolConfig { configs { value key }} }")
1210            .await
1211            .into_result()
1212            .expect("should complete successfully");
1213
1214        assert_eq!(req_metrics.input_nodes.get_sample_count(), 2);
1215        assert_eq!(req_metrics.output_nodes.get_sample_count(), 2);
1216        assert_eq!(req_metrics.query_depth.get_sample_count(), 2);
1217        assert_eq!(req_metrics.input_nodes.get_sample_sum(), 2. + 4.);
1218        assert_eq!(req_metrics.output_nodes.get_sample_sum(), 2. + 4.);
1219        assert_eq!(req_metrics.query_depth.get_sample_sum(), 1. + 3.);
1220    }
1221
1222    pub async fn test_health_check_impl() {
1223        let server_builder = prep_schema(None, None);
1224        let url = format!(
1225            "http://{}:{}/health",
1226            server_builder.state.connection.host, server_builder.state.connection.port
1227        );
1228        server_builder.build_schema();
1229
1230        let resp = reqwest::get(&url).await.unwrap();
1231        assert_eq!(resp.status(), reqwest::StatusCode::OK);
1232
1233        let url_with_param = format!("{url}?max_checkpoint_lag_ms=1");
1234        let resp = reqwest::get(&url_with_param).await.unwrap();
1235        assert_eq!(resp.status(), reqwest::StatusCode::GATEWAY_TIMEOUT);
1236    }
1237
1238    /// Execute a GraphQL request with `limits` in place, expecting an error to
1239    /// be returned. Returns a text representation of all the errors triggered.
1240    async fn execute_for_error(limits: Limits, request: Request) -> String {
1241        let service_config = ServiceConfig {
1242            limits,
1243            ..Default::default()
1244        };
1245
1246        let schema = prep_schema(None, Some(service_config))
1247            .context_data(PayloadSize(
1248                // Payload size is usually set per request, and it is the size of the raw HTTP
1249                // request, which includes the query, variables, and surrounding JSON. Simulate for
1250                // testing purposes by serializing the request back into JSON and baking its length
1251                // as context data into the schema.
1252                serde_json::to_string(&request).unwrap().len() as u64,
1253            ))
1254            .extension(QueryLimitsChecker)
1255            .build_schema();
1256
1257        let errs: Vec<_> = schema
1258            .execute(request)
1259            .await
1260            .into_result()
1261            .unwrap_err()
1262            .into_iter()
1263            .map(|e| e.message)
1264            .collect();
1265
1266        errs.join("\n")
1267    }
1268
1269    pub async fn test_payload_read_exceeded_impl() {
1270        assert_eq!(
1271            execute_for_error(
1272                Limits {
1273                    max_tx_payload_size: 400,
1274                    max_query_payload_size: 10,
1275                    ..Default::default()
1276                },
1277                r#"
1278                    mutation {
1279                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1280                            effects {
1281                                status
1282                            }
1283                        }
1284                    }
1285                "#
1286                .into()
1287            )
1288            .await,
1289            "Query part too large: 354 bytes. Requests are limited to 400 bytes or fewer on \
1290             transaction payloads (all inputs to executeTransactionBlock or \
1291             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 \
1292             bytes or fewer."
1293        );
1294    }
1295
1296    pub async fn test_payload_mutation_exceeded_impl() {
1297        assert_eq!(
1298            execute_for_error(
1299                Limits {
1300                    max_tx_payload_size: 10,
1301                    max_query_payload_size: 400,
1302                    ..Default::default()
1303                },
1304                r#"
1305                    mutation {
1306                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1307                            effects {
1308                                status
1309                            }
1310                        }
1311                    }
1312                "#
1313                .into()
1314            )
1315            .await,
1316            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1317             transaction payloads (all inputs to executeTransactionBlock or \
1318             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1319             bytes or fewer."
1320        );
1321    }
1322
1323    pub async fn test_payload_dry_run_exceeded_impl() {
1324        assert_eq!(
1325            execute_for_error(
1326                Limits {
1327                    max_tx_payload_size: 10,
1328                    max_query_payload_size: 400,
1329                    ..Default::default()
1330                },
1331                r#"
1332                    query {
1333                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1334                            error
1335                            transaction {
1336                                digest
1337                            }
1338                        }
1339                    }
1340                "#
1341                .into(),
1342            )
1343            .await,
1344            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1345             transaction payloads (all inputs to executeTransactionBlock or \
1346             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 bytes \
1347             or fewer."
1348        );
1349    }
1350
1351    pub async fn test_payload_total_exceeded_impl() {
1352        assert_eq!(
1353            execute_for_error(
1354                Limits {
1355                    max_tx_payload_size: 10,
1356                    max_query_payload_size: 10,
1357                    ..Default::default()
1358                },
1359                r#"
1360                    query {
1361                        dryRunTransactionBlock(txByte: "AAABBB") {
1362                            error
1363                            transaction {
1364                                digest
1365                            }
1366                        }
1367                    }
1368                "#
1369                .into(),
1370            )
1371            .await,
1372            "Overall request too large: 380 bytes. Requests are limited to 10 bytes or fewer on \
1373             transaction payloads (all inputs to executeTransactionBlock or dryRunTransactionBlock) \
1374             and the rest of the request (the query part) must be 10 bytes or fewer."
1375        );
1376    }
1377
1378    pub async fn test_payload_using_vars_mutation_exceeded_impl() {
1379        assert_eq!(
1380            execute_for_error(
1381                Limits {
1382                    max_tx_payload_size: 10,
1383                    max_query_payload_size: 500,
1384                    ..Default::default()
1385                },
1386                Request::new(
1387                    r#"
1388                    mutation ($tx: String!, $sigs: [String!]!) {
1389                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1390                            effects {
1391                                status
1392                            }
1393                        }
1394                    }
1395                    "#
1396                )
1397                .variables(Variables::from_json(json!({
1398                    "tx": "AAABBBCCC",
1399                    "sigs": ["BBB"]
1400                })))
1401            )
1402            .await,
1403            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1404             transaction payloads (all inputs to executeTransactionBlock or \
1405             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1406             bytes or fewer."
1407        );
1408    }
1409
1410    pub async fn test_payload_using_vars_read_exceeded_impl() {
1411        assert_eq!(
1412            execute_for_error(
1413                Limits {
1414                    max_tx_payload_size: 500,
1415                    max_query_payload_size: 10,
1416                    ..Default::default()
1417                },
1418                Request::new(
1419                    r#"
1420                    mutation ($tx: String!, $sigs: [String!]!) {
1421                        executeTransactionBlock(txBytes: $tx, signatures: $sigs) {
1422                            effects {
1423                                status
1424                            }
1425                        }
1426                    }
1427                    "#
1428                )
1429                .variables(Variables::from_json(json!({
1430                    "tx": "AAA",
1431                    "sigs": ["BBB"]
1432                })))
1433            )
1434            .await,
1435            "Query part too large: 409 bytes. Requests are limited to 500 bytes or fewer on \
1436             transaction payloads (all inputs to executeTransactionBlock or \
1437             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1438             or fewer."
1439        );
1440    }
1441
1442    pub async fn test_payload_using_vars_dry_run_exceeded_impl() {
1443        assert_eq!(
1444            execute_for_error(
1445                Limits {
1446                    max_tx_payload_size: 10,
1447                    max_query_payload_size: 400,
1448                    ..Default::default()
1449                },
1450                Request::new(
1451                    r#"
1452                    query ($tx: String!) {
1453                        dryRunTransactionBlock(txBytes: $tx) {
1454                            error
1455                            transaction {
1456                                digest
1457                            }
1458                        }
1459                    }
1460                    "#
1461                )
1462                .variables(Variables::from_json(json!({
1463                    "tx": "AAABBBCCC"
1464                }))),
1465            )
1466            .await,
1467            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1468             transaction payloads (all inputs to executeTransactionBlock or \
1469             dryRunTransactionBlock) and the rest of the request (the query part) must be 400 \
1470             bytes or fewer."
1471        );
1472    }
1473
1474    pub async fn test_payload_using_vars_dry_run_read_exceeded_impl() {
1475        assert_eq!(
1476            execute_for_error(
1477                Limits {
1478                    max_tx_payload_size: 400,
1479                    max_query_payload_size: 10,
1480                    ..Default::default()
1481                },
1482                Request::new(
1483                    r#"
1484                    query ($tx: String!) {
1485                        dryRunTransactionBlock(txBytes: $tx) {
1486                            error
1487                            transaction {
1488                                digest
1489                            }
1490                        }
1491                    }
1492                    "#
1493                )
1494                .variables(Variables::from_json(json!({
1495                    "tx": "AAABBBCCC"
1496                }))),
1497            )
1498            .await,
1499            "Query part too large: 398 bytes. Requests are limited to 400 bytes or fewer on \
1500             transaction payloads (all inputs to executeTransactionBlock or \
1501             dryRunTransactionBlock) and the rest of the request (the query part) must be 10 bytes \
1502             or fewer."
1503        );
1504    }
1505
1506    pub async fn test_payload_multiple_execution_exceeded_impl() {
1507        // First check that the limit is large enough to hold one transaction's
1508        // parameters (by checking that we hit the read limit).
1509        let err = execute_for_error(
1510            Limits {
1511                max_tx_payload_size: 30,
1512                max_query_payload_size: 320,
1513                ..Default::default()
1514            },
1515            r#"
1516                mutation {
1517                    executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1518                        effects {
1519                            status
1520                        }
1521                    }
1522                }
1523            "#
1524            .into(),
1525        )
1526        .await;
1527        assert!(err.starts_with("Query part too large"), "{err}");
1528
1529        assert_eq!(
1530            execute_for_error(
1531                Limits {
1532                    max_tx_payload_size: 30,
1533                    max_query_payload_size: 800,
1534                    ..Default::default()
1535                },
1536                r#"
1537                    mutation {
1538                        e0: executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["DDD"]) {
1539                            effects {
1540                                status
1541                            }
1542                        }
1543                        e1: executeTransactionBlock(txBytes: "EEEFFFGGG", signatures: ["HHH"]) {
1544                            effects {
1545                                status
1546                            }
1547                        }
1548                    }
1549                "#
1550                .into()
1551            )
1552            .await,
1553            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1554             transaction payloads (all inputs to executeTransactionBlock or \
1555             dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1556             bytes or fewer."
1557        );
1558    }
1559
1560    pub async fn test_payload_multiple_dry_run_exceeded_impl() {
1561        // First check that tx limit is large enough to hold one transaction's
1562        // parameters (by checking that we hit the read limit).
1563        let err = execute_for_error(
1564            Limits {
1565                max_tx_payload_size: 20,
1566                max_query_payload_size: 330,
1567                ..Default::default()
1568            },
1569            r#"
1570                query {
1571                    dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1572                       error
1573                       transaction {
1574                           digest
1575                       }
1576                    }
1577                }
1578            "#
1579            .into(),
1580        )
1581        .await;
1582        assert!(err.starts_with("Query part too large"), "{err}");
1583
1584        assert_eq!(
1585            execute_for_error(
1586                Limits {
1587                    max_tx_payload_size: 20,
1588                    max_query_payload_size: 800,
1589                    ..Default::default()
1590                },
1591                r#"
1592                    query {
1593                        d0: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1594                           error
1595                           transaction {
1596                               digest
1597                           }
1598                        }
1599                        d1: dryRunTransactionBlock(txBytes: "DDDEEEFFF") {
1600                           error
1601                           transaction {
1602                               digest
1603                           }
1604                        }
1605                    }
1606                "#
1607                .into()
1608            )
1609            .await,
1610            "Transaction payload too large. Requests are limited to 20 bytes or fewer on \
1611             transaction payloads (all inputs to executeTransactionBlock or \
1612             dryRunTransactionBlock) and the rest of the request (the query part) must be 800 \
1613             bytes or fewer."
1614        );
1615    }
1616
1617    pub async fn test_payload_execution_multiple_sigs_exceeded_impl() {
1618        // First check that the limit is large enough to hold a transaction with a
1619        // single signature (by checking that we hite the read limit).
1620        let err = execute_for_error(
1621            Limits {
1622                max_tx_payload_size: 30,
1623                max_query_payload_size: 320,
1624                ..Default::default()
1625            },
1626            r#"
1627                mutation {
1628                    executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1629                        effects {
1630                            status
1631                        }
1632                    }
1633                }
1634            "#
1635            .into(),
1636        )
1637        .await;
1638
1639        assert!(err.starts_with("Query part too large"), "{err}");
1640
1641        assert_eq!(
1642            execute_for_error(
1643                Limits {
1644                    max_tx_payload_size: 30,
1645                    max_query_payload_size: 500,
1646                    ..Default::default()
1647                },
1648                r#"
1649                    mutation {
1650                        executeTransactionBlock(
1651                            txBytes: "AAA",
1652                            signatures: ["BBB", "CCC", "DDD", "EEE", "FFF"]
1653                        ) {
1654                            effects {
1655                                status
1656                            }
1657                        }
1658                    }
1659                "#
1660                .into(),
1661            )
1662            .await,
1663            "Transaction payload too large. Requests are limited to 30 bytes or fewer on \
1664             transaction payloads (all inputs to executeTransactionBlock or \
1665             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1666             bytes or fewer.",
1667        )
1668    }
1669
1670    pub async fn test_payload_sig_var_execution_exceeded_impl() {
1671        // Variables can show up in the sub-structure of a GraphQL value as well, and we
1672        // need to count those as well.
1673        assert_eq!(
1674            execute_for_error(
1675                Limits {
1676                    max_tx_payload_size: 10,
1677                    max_query_payload_size: 500,
1678                    ..Default::default()
1679                },
1680                Request::new(
1681                    r#"
1682                    mutation ($tx: String!, $sig: String!) {
1683                        executeTransactionBlock(txBytes: $tx, signatures: [$sig]) {
1684                            effects {
1685                                status
1686                            }
1687                        }
1688                    }
1689                    "#
1690                )
1691                .variables(Variables::from_json(json!({
1692                    "tx": "AAA",
1693                    "sig": "BBB"
1694                })))
1695            )
1696            .await,
1697            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1698             transaction payloads (all inputs to executeTransactionBlock or \
1699             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1700             bytes or fewer."
1701        );
1702    }
1703
1704    /// Check if the error indicates that the request passed the overall size
1705    /// check and the transaction payload check.
1706    fn passed_tx_checks(err: &str) -> bool {
1707        !err.starts_with("Overall request too large")
1708            && !err.starts_with("Transaction payload too large")
1709    }
1710
1711    pub async fn test_payload_reusing_vars_execution_impl() {
1712        // Test that when variables are re-used as execution params, the size of the
1713        // variable is only counted once.
1714
1715        // First, check that `error_passed_tx_checks` is working, by submitting a
1716        // request that will fail the initial payload check.
1717        assert!(!passed_tx_checks(
1718            &execute_for_error(
1719                Limits {
1720                    max_tx_payload_size: 1,
1721                    max_query_payload_size: 1,
1722                    ..Default::default()
1723                },
1724                r#"
1725                    mutation {
1726                        executeTransactionBlock(txBytes: "AAA", signatures: ["BBB"]) {
1727                            effects {
1728                                status
1729                            }
1730                        }
1731                    }
1732                "#
1733                .into()
1734            )
1735            .await
1736        ));
1737
1738        let limits = Limits {
1739            max_tx_payload_size: 20,
1740            max_query_payload_size: 1000,
1741            ..Default::default()
1742        };
1743
1744        // Then check that a request that uses the variable once passes the transaction
1745        // limit check.
1746        assert!(passed_tx_checks(
1747            &execute_for_error(
1748                limits.clone(),
1749                Request::new(
1750                    r#"
1751                    mutation ($sig: String!) {
1752                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig]) {
1753                            effects {
1754                                status
1755                            }
1756                        }
1757                    }
1758                    "#,
1759                )
1760                .variables(Variables::from_json(json!({
1761                    "sig": "BBB"
1762                })))
1763            )
1764            .await
1765        ));
1766
1767        // Then check that a request that introduces an extra signature, but without
1768        // re-using the variable fails the transaction limit.
1769        let execution_result = execute_for_error(
1770            limits.clone(),
1771            Request::new(
1772                r#"
1773                    mutation ($sig: String!) {
1774                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, "BBB"]) {
1775                            effects {
1776                                status
1777                            }
1778                        }
1779                    }
1780                    "#,
1781            )
1782            .variables(Variables::from_json(json!({
1783                "sig": "BBB"
1784            }))),
1785        )
1786        .await;
1787        assert!(!passed_tx_checks(&execution_result), "{execution_result}");
1788
1789        // And then when that use is replaced by re-using the variable, we should be
1790        // under the transaction payload limit again.
1791        let execution_result = execute_for_error(
1792            limits,
1793            Request::new(
1794                r#"
1795                    mutation ($sig: String!) {
1796                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: [$sig, $sig]) {
1797                            effects {
1798                                status
1799                            }
1800                        }
1801                    }
1802                    "#,
1803            )
1804            .variables(Variables::from_json(json!({
1805                "sig": "BBB"
1806            }))),
1807        )
1808        .await;
1809        assert!(passed_tx_checks(&execution_result), "{execution_result}");
1810    }
1811
1812    pub async fn test_payload_reusing_vars_dry_run_impl() {
1813        // Like `test_payload_reusing_vars_execution` but the variable is used in a
1814        // dry-run.
1815
1816        let limits = Limits {
1817            max_tx_payload_size: 20,
1818            max_query_payload_size: 1000,
1819            ..Default::default()
1820        };
1821
1822        // A single dry-run is under the limit.
1823        assert!(passed_tx_checks(
1824            &execute_for_error(
1825                limits.clone(),
1826                Request::new(
1827                    r#"
1828                    query ($tx: String!) {
1829                        dryRunTransactionBlock(txBytes: $tx) {
1830                            error
1831                            transaction {
1832                                digest
1833                            }
1834                        }
1835                    }
1836                    "#,
1837                )
1838                .variables(Variables::from_json(json!({
1839                    "tx": "AAABBBCCC"
1840                })))
1841            )
1842            .await
1843        ));
1844
1845        // Duplicating the dry-run causes us to hit the limit.
1846        assert!(!passed_tx_checks(
1847            &execute_for_error(
1848                limits.clone(),
1849                Request::new(
1850                    r#"
1851                    query ($tx: String!) {
1852                        d0: dryRunTransactionBlock(txBytes: $tx) {
1853                            error
1854                            transaction {
1855                                digest
1856                            }
1857                        }
1858
1859                        d1: dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1860                            error
1861                            transaction {
1862                                digest
1863                            }
1864                        }
1865                    }
1866                    "#,
1867                )
1868                .variables(Variables::from_json(json!({
1869                    "tx": "AAABBBCCC"
1870                })))
1871            )
1872            .await
1873        ));
1874
1875        // And by re-using the variable, we are under the transaction limit again.
1876        assert!(passed_tx_checks(
1877            &execute_for_error(
1878                limits,
1879                Request::new(
1880                    r#"
1881                    query ($tx: String!) {
1882                        d0: dryRunTransactionBlock(txBytes: $tx) {
1883                            error
1884                            transaction {
1885                                digest
1886                            }
1887                        }
1888
1889                        d1: dryRunTransactionBlock(txBytes: $tx) {
1890                            error
1891                            transaction {
1892                                digest
1893                            }
1894                        }
1895                    }
1896                    "#,
1897                )
1898                .variables(Variables::from_json(json!({
1899                    "tx": "AAABBBCCC"
1900                })))
1901            )
1902            .await
1903        ));
1904    }
1905
1906    pub async fn test_payload_named_fragment_execution_exceeded_impl() {
1907        assert_eq!(
1908            execute_for_error(
1909                Limits {
1910                    max_tx_payload_size: 10,
1911                    max_query_payload_size: 500,
1912                    ..Default::default()
1913                },
1914                r#"
1915                    mutation {
1916                        ...Tx
1917                    }
1918
1919                    fragment Tx on Mutation {
1920                        executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1921                            effects {
1922                                status
1923                            }
1924                        }
1925                    }
1926                "#
1927                .into()
1928            )
1929            .await,
1930            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1931             transaction payloads (all inputs to executeTransactionBlock or \
1932             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1933             bytes or fewer."
1934        );
1935    }
1936
1937    pub async fn test_payload_inline_fragment_execution_exceeded_impl() {
1938        assert_eq!(
1939            execute_for_error(
1940                Limits {
1941                    max_tx_payload_size: 10,
1942                    max_query_payload_size: 500,
1943                    ..Default::default()
1944                },
1945                r#"
1946                    mutation {
1947                        ... on Mutation {
1948                            executeTransactionBlock(txBytes: "AAABBBCCC", signatures: ["BBB"]) {
1949                                effects {
1950                                    status
1951                                }
1952                            }
1953                        }
1954                    }
1955                "#
1956                .into()
1957            )
1958            .await,
1959            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1960             transaction payloads (all inputs to executeTransactionBlock or \
1961             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1962             bytes or fewer."
1963        );
1964    }
1965
1966    pub async fn test_payload_named_fragment_dry_run_exceeded_impl() {
1967        assert_eq!(
1968            execute_for_error(
1969                Limits {
1970                    max_tx_payload_size: 10,
1971                    max_query_payload_size: 500,
1972                    ..Default::default()
1973                },
1974                r#"
1975                    query {
1976                        ...DryRun
1977                    }
1978
1979                    fragment DryRun on Query {
1980                        dryRunTransactionBlock(txBytes: "AAABBBCCC") {
1981                            error
1982                            transaction {
1983                                digest
1984                            }
1985                        }
1986                    }
1987                "#
1988                .into(),
1989            )
1990            .await,
1991            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
1992             transaction payloads (all inputs to executeTransactionBlock or \
1993             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
1994             bytes or fewer."
1995        );
1996    }
1997
1998    pub async fn test_payload_inline_fragment_dry_run_exceeded_impl() {
1999        assert_eq!(
2000            execute_for_error(
2001                Limits {
2002                    max_tx_payload_size: 10,
2003                    max_query_payload_size: 500,
2004                    ..Default::default()
2005                },
2006                r#"
2007                    query {
2008                        ... on Query {
2009                            dryRunTransactionBlock(txBytes: "AAABBBCCC") {
2010                                error
2011                                transaction {
2012                                    digest
2013                                }
2014                            }
2015                        }
2016                    }
2017                "#
2018                .into(),
2019            )
2020            .await,
2021            "Transaction payload too large. Requests are limited to 10 bytes or fewer on \
2022             transaction payloads (all inputs to executeTransactionBlock or \
2023             dryRunTransactionBlock) and the rest of the request (the query part) must be 500 \
2024             bytes or fewer."
2025        );
2026    }
2027
2028    pub async fn test_payload_object_input_dry_run_exceeded_impl() {
2029        // Object input arguments (e.g. `txMeta`) and their nested lists of
2030        // objects (e.g. `gasObjects`) must contribute to the transaction
2031        // payload budget. Passing them via a variable does not exempt them
2032        // from the limit.
2033        assert_eq!(
2034            execute_for_error(
2035                Limits {
2036                    max_tx_payload_size: 500,
2037                    max_query_payload_size: 10_000,
2038                    ..Default::default()
2039                },
2040                Request::new(
2041                    r#"
2042                    query DryRunQuery(
2043                        $txBytes: String!,
2044                        $skipChecks: Boolean!,
2045                        $txMeta: TransactionMetadata
2046                    ) {
2047                        dryRunTransactionBlock(
2048                            txBytes: $txBytes,
2049                            skipChecks: $skipChecks,
2050                            txMeta: $txMeta
2051                        ) {
2052                            error
2053                            transaction {
2054                                digest
2055                            }
2056                        }
2057                    }
2058                    "#,
2059                )
2060                .variables(Variables::from_json(json!({
2061                    "txBytes": "AAIAINdb/wdoZKClk3YWgTp2SRpvVYkX9XUMu0Im6dGMv5g1AAjoAwAAAAAAAAICAAEBAQABAQIAAAEAAA==",
2062                    "skipChecks": false,
2063                    "txMeta": {
2064                        "gasBudget": null,
2065                        "gasPrice": 1000,
2066                        "gasSponsor": "0x2c4a2ee4a1cb67c2ae7a6215784a26bb9c5051ea3f9aa5f60ccc58613a94f70b",
2067                        "sender":     "0x2c4a2ee4a1cb67c2ae7a6215784a26bb9c5051ea3f9aa5f60ccc58613a94f70b",
2068                        "gasObjects": [
2069                            { "address": "0x0f49b390361c7b85f50ea436dd2f78d07794450162fa543d9b37d4fd5d3c9884", "digest": "3Hij17jtvK4o7VxarnBJkX31LYLtV7KDecLBhmBJuapU", "version": 3 },
2070                            { "address": "0x115d51e2df3dae3fcb4a8d71ea9e357cec509e2f8e2f604eea53bb7abcd68c2b", "digest": "9t4C9FXKFdpgpZa8L968KtgkCPHwRA2QFHz6gVrymVJu", "version": 3 },
2071                            { "address": "0x18c9d0de646c02aaea280eb4aafabbc6855cf6efb7c423714428318c42fb35be", "digest": "FguxMqj4pLitqCQLhGrEja65HT5r7749h8CPd3JzSnnk", "version": 3 }
2072                        ]
2073                    }
2074                })))
2075            )
2076            .await,
2077            "Transaction payload too large. Requests are limited to 500 bytes or fewer on \
2078             transaction payloads (all inputs to executeTransactionBlock or \
2079             dryRunTransactionBlock) and the rest of the request (the query part) must be 10000 \
2080             bytes or fewer."
2081        );
2082    }
2083}